Thursday Apr 26, 2012

Testing with OCEP Shell

OCEP Shell

The OCEP shell is a new profile of the OCEP server 11g PS5 that has been created to facilitate the testing of OCEP applications.

To enable this profile, you need to unzip OCEP Shell to the modules directory of your domain.

For example, if you are using the default domain that comes with the product installation at Oracle/Middleware/user_projects/domains/ocep_domain/defaultserver, you would unzip the referenced file so that the content of defaultserver/modules becomes:





Start the server as usual, and you will get a shell prompt, as follows (in a Unix environment):

<Mar 13, 2012 8:02:16 AM BRT> <Notice> <Server> <BEA-2046000> <Server STARTED>


Oracle CEP Shell (using Apache Felix Gogo)


The OCEP shell uses Apache Felix Gogo shell. Please, take a quick look at Gogo’s documentation before proceeding. It can be found at: .Felix Gogo is based upon OSGi RFC 147, which provides a standard shell command prompt for OSGi frameworks.

Using the OCEP shell, you can invoke commands to test, and manage the OCEP server, as well as to test and manage the OSGi framework itself. The commands are organized in categories, named by a prefix. You can get a list of all commands and their categories by invoking the help command. Currently, the following categories of commands are supported:

epn, felix, gogo, mngt, obr

In the next section, we take a look at how to create a simple EPN containing CQL queries for testing.

EPN Commands

We start by creating a new EPN session. You can think of an EPN session as a stand-alone EPN that can be manipulated dynamically. When you are done, you can simply end the session, and create a new one if we want to test different things.

You begin a session by invoking the command:

shell> begin

Because the command begin is unique across all categories, you can omit the epn prefix. You can get descriptive help for a command by invoking help <command>, such as in the following example:
shell> help begin

begin - Begins new session for invoking EPN commands

scope: epn


By default, an EPN session has an implicit CQL processor that is connected to an event sink that prints all outputs to the shell console. So all we need to do to test CQL queries is to create an input channel, define the queries, and send events.

The following example does this:


channel MyChannel [msg=String]

query "select * from MyChannel"

send MyChannel [msg='Hi']


The result is:
1:19:39 PM -> insert event: msg=Hi

The channel command creates a channel named “MyChannel”, whose event-type has a single event property named msg of type String. You could also have specified a Java Class name as the event-type name.

The query command registers the query “select * from MyChannel” in the session’s implicit CQL processor. You can remove the query by using the remove command.

The send command dispatches an insert event to a channel. If the session only has a single channel, then the name of the channel is optional. In other words, for this particular example, the following two commands would be equivalent: “send MyChannel [msg=’Hi’]” and “send [msg=’Hi’]”.

As it should be obvious from the Gogo documentation, the syntax “[msg=’Hi’]” creates a map containing a single key-value pair, whose key is “msg” and value is “Hi”. This matches with the event-type we created previously while defining the channel.

Further, you can use the update command and the delete command to respectively send an update event and a delete event to a relation-based channel. The insert command is equivalent to the send command.

Finally, you can check the current registered statements in a session with the statement command. Likewise, you can find out the channels you have created in a session with the channels command. Finally, the eventtypes commands allows you to find out the structure of all the event-types currently in the server.

And that’s it, by running these simple set of commands, you can fully construct an EPN and try out CQL statements on the fly. In the next section, we take a look at how to manage the OCEP server and to test an existing OCEP application using the OCEP shell.

Management Commands

In this section, we take a look at the management category of commands. Let’s start by finding out all the deployed OCEP applications in the running server. You can do this by invoking the command mngt:listapps. Here is an example:

shell> listapps


Further, you can list all the OCEP libraries using the command listlibs:
shell> listlibs




Noticed how this command lists all the bundles we have unzipped in the modules directory to run the OCEP shell.

Next, you can install a new application using the deployapp <URL> command:

shell> deployapp file:///Users/ocepapps/helloworld.jar

The application is deployed and started immediately, as if it had been deployed by using the Visualizer management web-console. Conversely, you can un-deploy an application by invoking the command undeployapp.

You can send events to an existing OCEP application by specifying the full name of the channel when using the send, insert, update, and delete commands. For example, to send an event to the helloworldInputChannel in the helloworld application, you should use the name “helloworld:helloworldInputChannel”, as in the following example:

shell> event1 = createevent HelloWorldEvent

shell> $event1 message “Hi Shell!”

shell> send helloworld:helloworldInputChannel $event1

First, we create an event instance of the event type HelloWorldEvent. This is accomplished using the createevent command. Next, we populate the event with the value “Hi Shell!”, and then send it using the send command.

There is one caveat, the channel must have been advertised, otherwise the shell won’t be able to find it, so make sure that the advertise attribute is set to true in the application’s EPN assembly, as follows:

<wlevs:channel id="helloworldInputChannel"

event-type="HelloWorldEvent" advertise="true" >

You can also subscribe to receive events from a channel. You can do this using the subscribe command. The events subscribed will be send either to the shell console or to a file, if a file-name is specified. For example, the following command subscribes to all output from the helloworld application:
shell> subscribe helloworld:helloworldOutputChannel

As in the previous case, make sure that the channel being subscribed has also been advertised.

You can use the introspect command to list all the stages that are public, that is, have been advertised, in an application. The following example shows the result you get when invoking the introspect command in the out-of-the-box helloworld application:

shell> introspect helloworld

Application 'helloworld' provides the following OCEP services:

Event Channel 'helloworldOutputChannel' for Event Type 'HelloWorldEvent'

Using the Gogo shell facilities you can retrieve any OSGi service and invoke standard Java methods. In addition, the OCEP shell adds the mbean command that allows you to retrieve a EPN JMX MBean and invoke its operations. In the following example, we retrieve the CQLProcessorMBean for the helloworld processor, and invoke its operation “getAllQueries”.
shell> proc = mbean helloworld:helloworldProcessor CQLProcessor

shell> $proc allQueries


select * from helloworldInputChannel

The OCEP server exposes a rich set of JMX operations, which can now be easily tested and manipulated using the mbean command.

Finally, when you are all done, you can invoke the shutdown command to stop the shell and the OCEP server.

Regression Testing

The OCEP shell facilitates the testing of OCEP applications. The next step is to be able to automate this process. This can be done by running the OCEP shell head-less using scripts. For example, consider the following script, named send-event.ocep:


channel -a [a=Long]

query "select * from ch0"

send 0 [a=1]

send 1 [a=2]


You can execute this script by specifying the gosh.args system property. To do this, edit the last line of the file (startwlevs.cmd in Windows), as in the following example:
"$JAVA_HOME/bin/java" -Dgosh.args=send-event.ocep $JVM_ARGS $DEBUG_ARGS -Dwlevs.home="$USER_INSTALL_DIR" -Dbea.home="$BEA_HOME"  -jar "${USER_INSTALL_DIR}/bin/wlevs.jar" $ARGS

One approach is to have a test-driver script that invokes other scripts and directs their output, as in this example:
source send-event.ocep | tac test-output/log/send-event.out

source test-delete.ocep | tac test-output/log/test-delete.out


The OCEP shell provides a quick and easy way to prototype, test, and regression test OCEP applications, EPN assemblies, and CQL queries.

Just keep in mind that the OCEP shell is currently a technology preview for Oracle CEP 11g PS5. We will keep improving it, so please send us your feedback! Enjoy.

Tuesday Nov 29, 2011

Aggregating cache data from OCEP in CQL

There are several use cases where OCEP applications need to join stream data with external data, such as data available in a Coherence cache. OCEP’s streaming language, CQL, supports simple cache-key based joins of stream data with data in Coherence (more complex queries will be supported in a future release). However, there are instances where you may need to aggregate the data in Coherence based on input data from a stream. This blog describes a sample that does just that.

For our sample, we will use a simplified credit card fraud detection use case. The input to this sample application is a stream of credit card transaction data. The input stream contains information like the credit card ID, transaction time and transaction amount. The purpose of this application is to detect suspicious transactions and send out a warning event. For the sake of simplicity, we will assume that all transactions with amounts greater than $1000 are suspicious. The transaction history is available in a Coherence distributed cache. For every suspicious transaction detected, a warning event must be sent with maximum amount, total amount and total number of transactions over the past 30 days, as shown in the diagram below.

Application Input

Stream input to the EPN contains events of type CCTransactionEvent. This input has to be joined with the cache with all credit card transactions. The cache is configured in the EPN as shown below:

    <wlevs:caching-system id="CohCacheSystem" provider="coherence"/>
    <wlevs:cache id="CCTransactionsCache" value-type="CCTransactionEvent" 
                 key-properties="cardID, transactionTime"

Application Output

The output that must be produced by the application is a fraud warning event. This event is configured in the spring file as shown below. Source for cardHistory property can be seen here.
    <wlevs:event-type type-name="FraudWarningEvent">
          <wlevs:properties type="tuple">
              <wlevs:property name="cardID" type="CHAR"/>
              <wlevs:property name="transactionTime" type="BIGINT"/>
              <wlevs:property name="transactionAmount" type="DOUBLE"/>
              <wlevs:property name="cardHistory" type="OBJECT"/>

Cache Data Aggregation using Java Cartridge

In the output warning event, cardHistory property contains data from the cache aggregated over the past 30 days. To get this information, we use a java cartridge method. This method uses Coherence’s query API on credit card transactions cache to get the required information. Therefore, the java cartridge method requires a reference to the cache. This may be set up by configuring it in the spring context file as shown below:

    <bean class="">
        <property name="cache" ref="CCTransactionsCache"/>

This is used by the java class to set a static property:

    public void setCache(Map cache)
        s_cache = (NamedCache) cache;

The code snippet below shows how the total of all the transaction amounts in the past 30 days is computed. Rest of the information required by CardHistory object is calculated in a similar manner. Complete source of this class can be found here. To find out more information about using Coherence's API to query a cache, please refer Coherence Developer’s Guide.

public static CreditHistoryData execute(String cardID)
     Filter filter = QueryHelper.createFilter("cardID = :cardID and transactionTime > :transactionTime", map);
        CardHistoryData history = new CardHistoryData();
        Double sum = (Double) s_cache.aggregate(filter, new DoubleSum("getTransactionAmount"));
    return history;

The java cartridge method is used from CQL as seen below:

select cardID,
          CCTransactionsAggregator.execute(cardID) as cardHistory
from inputChannel
where transactionAmount>1000

This produces a warning event, with history data, for every credit card transaction over $1000.

That is all there is to it. The complete source for the sample application, along with the configuration files, is available here. In the sample, I use a simple java bean to load the cache with initial transaction history data. An input adapter is used to create and send transaction events for the input stream.

Thursday Sep 16, 2010

Oracle CEP Events at OpenWorld 2010

[Read More]

Wednesday Sep 08, 2010

Generating Complex Events from a Partitioned Stream

[Read More]

Tuesday Aug 10, 2010

How to Programmatically Access Oracle CEP MBeans

[Read More]

Wednesday Jul 28, 2010

Oracle CEP: Enriching the Results of CQL Aggregation Queries

[Read More]

Tuesday Oct 13, 2009

Oracle CEP at Oracle OpenWorld 2009

[Read More]

Wednesday Jul 08, 2009

Oracle CEP and Twitter

[Read More]

Thursday May 28, 2009

Oracle CEP at JavaOne

[Read More]

Thursday Apr 09, 2009

Oracle CEP in Foreign Exchange (FX) Markets

[Read More]

Thursday Sep 18, 2008

DEBS 2008 Paper on WL EvS

[Read More]

Wednesday Sep 17, 2008

Oracle Complex Event Processing

[Read More]



« April 2014