Aggregating cache data from OCEP in CQL

There are several use cases where OCEP applications need to join stream data with external data, such as data available in a Coherence cache. OCEP’s streaming language, CQL, supports simple cache-key based joins of stream data with data in Coherence (more complex queries will be supported in a future release). However, there are instances where you may need to aggregate the data in Coherence based on input data from a stream. This blog describes a sample that does just that.

For our sample, we will use a simplified credit card fraud detection use case. The input to this sample application is a stream of credit card transaction data. The input stream contains information like the credit card ID, transaction time and transaction amount. The purpose of this application is to detect suspicious transactions and send out a warning event. For the sake of simplicity, we will assume that all transactions with amounts greater than $1000 are suspicious. The transaction history is available in a Coherence distributed cache. For every suspicious transaction detected, a warning event must be sent with maximum amount, total amount and total number of transactions over the past 30 days, as shown in the diagram below.

Application Input

Stream input to the EPN contains events of type CCTransactionEvent. This input has to be joined with the cache with all credit card transactions. The cache is configured in the EPN as shown below:

    <wlevs:caching-system id="CohCacheSystem" provider="coherence"/>
    <wlevs:cache id="CCTransactionsCache" value-type="CCTransactionEvent" 
                 key-properties="cardID, transactionTime"

Application Output

The output that must be produced by the application is a fraud warning event. This event is configured in the spring file as shown below. Source for cardHistory property can be seen here.
    <wlevs:event-type type-name="FraudWarningEvent">
          <wlevs:properties type="tuple">
              <wlevs:property name="cardID" type="CHAR"/>
              <wlevs:property name="transactionTime" type="BIGINT"/>
              <wlevs:property name="transactionAmount" type="DOUBLE"/>
              <wlevs:property name="cardHistory" type="OBJECT"/>

Cache Data Aggregation using Java Cartridge

In the output warning event, cardHistory property contains data from the cache aggregated over the past 30 days. To get this information, we use a java cartridge method. This method uses Coherence’s query API on credit card transactions cache to get the required information. Therefore, the java cartridge method requires a reference to the cache. This may be set up by configuring it in the spring context file as shown below:

    <bean class="">
        <property name="cache" ref="CCTransactionsCache"/>

This is used by the java class to set a static property:

    public void setCache(Map cache)
        s_cache = (NamedCache) cache;

The code snippet below shows how the total of all the transaction amounts in the past 30 days is computed. Rest of the information required by CardHistory object is calculated in a similar manner. Complete source of this class can be found here. To find out more information about using Coherence's API to query a cache, please refer Coherence Developer’s Guide.

public static CreditHistoryData execute(String cardID)
     Filter filter = QueryHelper.createFilter("cardID = :cardID and transactionTime > :transactionTime", map);
        CardHistoryData history = new CardHistoryData();
        Double sum = (Double) s_cache.aggregate(filter, new DoubleSum("getTransactionAmount"));
    return history;

The java cartridge method is used from CQL as seen below:

select cardID,
          CCTransactionsAggregator.execute(cardID) as cardHistory
from inputChannel
where transactionAmount>1000

This produces a warning event, with history data, for every credit card transaction over $1000.

That is all there is to it. The complete source for the sample application, along with the configuration files, is available here. In the sample, I use a simple java bean to load the cache with initial transaction history data. An input adapter is used to create and send transaction events for the input stream.


Hi -

Can you check the java aggregation method signature above, please?

You have:
public static CreditHistoryData(String cardID) {...}
Shouldn't it be:
public static CreditHistoryData execute(String cardID) {...}

Also it's worth detailing the steps needed in the java bundle's manifest to expose the package correctly to the CEP engine, and also in the CEP application's bundle if they're different.


Posted by Barney Moss on December 05, 2011 at 01:28 AM PST #

Yes, the method signature was a typo. I have corrected it now. Thanks.

Regarding steps required to expose the package to CQL engine - The setup is such that you don't need anything special in the manifest to make the classes available in the CQL engine. You can see this in the manifest file contained in the complete zip file available in the blog.

One thing to mention is, if you want to refer to the class in the CQL statement without the full package name, you should import the class in the manifest of the application where the CQL statement is. In the sample used in the blog, that is not necessary since everything is in the single application bundle.

Posted by Manju James on December 06, 2011 at 04:33 AM PST #

Post a Comment:
  • HTML Syntax: NOT allowed

This blog contains information about Oracle Stream Explorer (formerly known as Oracle Event Processing)


« April 2015