Enriching Complex Events with Their Underlying Causal Events

In this entry I look at another common and interesting use case for enriching a complex event.  This time I consider including the events that caused, or lead to, the complex event as a property of the complex event itself.  Let's consider the complex aggregation use case that has been serving as a running example in this blog.  Suppose that we now want to include the individual alert events that produced a complex alert as part of the state of the complex alert.  We will do this by adding a Collection-valued property to the Java class representing the complex alert event.


public class ComplexAlertEvent {
private String machineRoom;

private int totalSeverity;

private Collection<AlertEvent> simpleAlertEvents = new ArrayList<AlertEvent>();



The ComplexAlertEvent represents the overall severity of alerts coming from a particular machine room in the enterprise and contains the underlying alerts that contribute to the overall severity.  It is a machine room-level alert representing the current state of the machine room. Now, given the following sequence of input events:


[AlertEvent: machineRoom-mr10, severity-2 administrator-Hoyong]

[AlertEvent: machineRoom-mr10, severity-7 administrator-Alex]

[AlertEvent: machineRoom-mr10, severity-1 administrator-Eric]

[AlertEvent: machineRoom-mr10, severity-4 administrator-Anand]

[AlertEvent: machineRoom-mr10, severity-10 administrator-Seth]

[AlertEvent: machineRoom-mr10, severity-1 administrator-Andy]


We want the application to produce the following output:


[machineRoom-mr10, severity-10, simple alerts-[[AlertEvent: machineRoom-mr10, severity-2, administrator-Hoyong], [AlertEvent: machineRoom-mr10, severity-7, administrator-Alex], [AlertEvent: machineRoom-mr10, severity-1 administrator-Eric]]]

[machineRoom-mr10, severity-15, simple alerts-[[AlertEvent: machineRoom-mr10, severity-4 administrator-Anand], [AlertEvent: machineRoom-mr10, severity-10 administrator-Seth], [AlertEvent: machineRoom-mr10, severity-1 administrator-Andy]]]


In other words, there is one complex event containing three nested alert events for every three alert events in the input stream.  You may be wondering how we will get a CQL query to produce a collection-valued attribute in the complex event. Actually, this is going to require a bit of Java code as CQL does not support nested relations or a mapping from nested relations to Java Collections.  But first,  let's look at the queries that are required (more than one will be needed in this case):


<view id="aggregateAlerts">
<![CDATA[istream (
SELECT alerts.machineRoom as machineRoom,
sum(alerts.severity) as totalSeverity
FROM alerts[rows 3 slide 3] as alerts
GROUP BY alerts.machineRoom)

The first query performs the aggregation required by the complex event.  The output of this query is sent to a stream called "aggregateAlerts" which serves as input to the next query.


<view id="combinedAlerts">
<![CDATA[istream (
SELECT agg.machineRoom as machineRoom,
agg.totalSeverity as totalSeverity,
alerts.machineRoom as alertRoom,
alerts.severity as alertSeverity,
alerts.administrator as alertAdministrator
FROM alerts[rows 3 slide 3] as alerts,
aggregateAlerts [now] as agg
WHERE agg.machineRoom = alerts.machineRoom)


The second query joins the output of the first query with the original alert stream, producing a stream of events that contain both aggregate data as well as the original alert event data.  This join is the "trick" that allows CQL to associate the original causal data with the aggregate data.  Note, however, that this output stream, called "combinedAlerts" contains one event for each underlying alert event.  In other words, there is no nesting.  CQL does not support nested relations, so we will need to implement the nested Collection property that holds the simple alert events using a Java event bean. 

Now, since we are assuming in this example that there is only a single machine room, the next query is really not required, however, it would be necessary to sort the combinedAlerts events if there were multiple machine rooms in the input data stream.  Since this is a more realistic assumption, and in order to illustrate the use of ORDER BY I include the following third query in this example:


<query id="orderedAlerts">
<![CDATA[istream (
FROM combinedAlerts
ORDER BY machineRoom)


Now, we can be assured that any combinedAlerts from the same machine room, which occur at the same time, will be delivered in sorted order to the downstream event bean.  (If you look at the event bean code below you will notice how this sorted data assumption is embedded in the code.)  Another, detail that we need to be concerned with is delivering all of the combinedAlert events that occur at a given point in time as a batch to our downstream event bean.  Again, this is because the want to bean to receive all three events that correspond to the same ComplexAlertEvent in a single method call, so that it can easily perform the nesting.  In order to enable batching, the event bean needs to implement the BatchStreamSink interface and we also need to enable batching on the channel that sits between the processor and the event bean in the EPN. Here is what the EPN fragment looks like:


    <wlevs:processor id="alertProcessor" />

<wlevs:channel id="alertOutputChannel" event-type="CombinedEvent"
advertise="true" batching="true">
<wlevs:listener ref="MassageBean" />
<wlevs:source ref="alertProcessor" />

<wlevs:event-bean id="MassageBean"


As you can see, the processor outputs CombinedEvents to a channel that has batching enabled. The channel sends events to and even bean, "MassageBean", that massages the unvested CombinedEvent data into its nested format, producing ComplexAlertEvent events as output.  One thing to note, is to be careful and include the latest version of the XML schema at the beginning of your EPN document, as the batching feature was introduced only in the latest release.  You should be using http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_3.xsd.


Here is the code that performs the nesting:


package com.bea.wlevs.alerts.helloworld;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import com.bea.wlevs.ede.api.BatchStreamSink;
import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSender;
import com.bea.wlevs.ede.api.StreamSource;
import com.bea.wlevs.event.common.example.helloworld.AlertEvent;
import com.bea.wlevs.event.example.helloworld.CombinedEvent;
import com.bea.wlevs.event.example.helloworld.ComplexAlertEvent;

public class MassageBean implements BatchStreamSink, StreamSource {

private StreamSender sender;

public void onInsertEvents(Collection<Object> events)
throws EventRejectedException {
Iterator eventsIterator = events.iterator();
CombinedEvent combinedEvent = (CombinedEvent) eventsIterator.next();
ComplexAlertEvent complexEvent = null;
boolean done = false;

while (!done) {
if (complexEvent == null) {
complexEvent = new ComplexAlertEvent();

if (complexEvent.getMachineRoom().equals(
combinedEvent.getMachineRoom())) {
AlertEvent simpleEvent = new AlertEvent();

if (eventsIterator.hasNext()) {
combinedEvent = (CombinedEvent) eventsIterator.next();
else {
done = true;
} else {
complexEvent = null;


public void onInsertEvent(Object event) throws EventRejectedException {
Collection<Object> events = new ArrayList<Object>();

public void setEventSender(StreamSender sender) {
this.sender = sender;


This completes the example.  In the future, CQL will hopefully be enhanced to support nested relations and a more comprehensive Java mapping in order to make this use case even easier. As always, comments welcome.


Technorati Tags: ,,


Post a Comment:
  • HTML Syntax: NOT allowed

This blog contains information about Oracle Stream Explorer (formerly known as Oracle Event Processing)


« July 2016