Recently I was working with Oracle Event Processing (OEP) and needed to set it up as part of a high availability cluster. OEP uses Coherence for quorum membership in an OEP cluster. Because the solution used caching it was also necessary to include access to external Coherence nodes. Input messages need to be duplicated across multiple OEP streams and so a JMS Topic adapter needed to be configured. Finally only one copy of each output event was desired, requiring the use of an HA adapter. In this blog post I will go through the steps required to implement a true HA OEP cluster.
The diagram below shows a very simple non-HA OEP configuration:
Events are received from a source (JMS in this blog). The events are processed by an event processing network which makes use of a cache (Coherence in this blog). Finally any output events are emitted. The output events could go to any destination but in this blog we will emit them to a JMS queue.
OEP provides high availability by having multiple event processing instances processing the same event stream in an OEP cluster. One instance acts as the primary and the other instances act as secondary processors. Usually only the primary will output events as shown in the diagram below (top stream is the primary):
The actual event processing is the same as in the previous non-HA example. What is different is how input and output events are handled. Because we want to minimize or avoid duplicate events we have added an HA output adapter to the event processing network. This adapter acts as a filter, so that only the primary stream will emit events to out queue. If the processing of events within the network depends on how the time at which events are received then it is necessary to synchronize the event arrival time across the cluster by using an HA input adapter to synchronize the arrival timestamps of events across the cluster.
Lets begin by setting up the base OEP cluster. To do this we create new OEP configurations on each machine in the cluster. The steps are outlined below. Note that the same steps are performed on each machine for each server which will run on that machine:
Now that we have created our servers we need to configure them so that they can find each other. OEP uses Oracle Coherence to determine cluster membership. Coherence clusters can use either multicast or unicast to discover already running members of a cluster. Multicast has the advantage that it is easy to set up and scales better (see http://www.ateam-oracle.com/using-wka-in-large-coherence-clusters-disabling-multicast/) but has a number of challenges, including failure to propagate by default through routers and accidently joining the wrong cluster because someone else chose the same multicast settings. We will show how to use both unicast and multicast to discover the cluster.
|Multicast Discovery||Unicast Discovery|
|Coherence multicast uses a class D multicast address that is shared by all servers in the cluster. On startup a Coherence node broadcasts a message to the multicast address looking for an existing cluster. If no-one responds then the node will start the cluster.||Coherence unicast uses Well Known Addresses (WKAs). Each server in the cluster needs a dedicated listen address/port combination. A subset of these addresses are configured as WKAs and shared between all members of the cluster. As long as at least one of the WKAs is up and running then servers can join the cluster. If a server does not find any cluster members then it checks to see if its listen address and port are in the WKA list. If it is then that server will start the cluster, otherwise it will wait for a WKA server to become available.|
You should now have a working OEP cluster. Check the cluster by starting all the servers.
Look for a message like the following on the first server to start to indicate that another server has joined the cluster:
<Coherence> <BEA-2049108> <The domain membership has changed to [server2, server1], the new domain primary is "server1">
Log on to the Event Processing Visualizer of one of the servers – http://<hostname>:<port>/wlevs. Select the cluster name on the left and then select group “AllDomainMembers”. You should see a list of all the running servers in the “Servers of Group – AllDomainMembers” section.
Now that we have a working OEP cluster let us look at a simple application that can be used as an example of how to cluster enable an application. This application models service request tracking for hardware products. The application we will use performs the following checks:
Note use case 1 is nicely time bounded – in this case the time window is 10 seconds. Hence this is an ideal candidate to be implemented entirely in CQL.
Use case 2 has no time constraints, hence over time there could be a very large number of CQL queries running looking for a matching TAG but a different SRID. In this case it is better to put the TAGs into a cache and search the cache for duplicate tags. This reduces the amount of state information held in the OEP engine.
The sample application to implement this is shown below:
Messages are received from a JMS Topic (InboundTopicAdapter). Test messages can be injected via a CSV adapter (RequestEventCSVAdapter). Alerts are sent to a JMS Queue (OutboundQueueAdapter), and also printed to the server standard output (PrintBean). Use case 1 is implemented by the MissingEventProcessor. Use case 2 is implemented by inserting the TAG into a cache (InsertServiceTagCacheBean) using a Coherence event processor and then querying the cache for each new service request (DuplicateTagProcessor), if the same tag is already associated with an SR in the cache then an alert is raised. The RaiseEventFilter is used to filter out existing service requests from the use case 2 stream.
The non-HA version of the application is available to download here.
We will use this application to demonstrate how to HA enable an application for deployment on our cluster.
A CSV file (TestData.csv) and Load generator properties file (HADemoTest.prop) is provided to test the application by injecting events using the CSV Adapter.
Note that the application reads a configuration file (System.properties) which should be placed in the domain directory of each event server.
Before deploying an application to a cluster it is a good idea to create a group in the cluster. Multiple servers can be members of this group. To add a group to an event server just add an entry to the <cluster> element in config.xml as shown below:
Multiple servers can be members of a group and a server can be a member of multiple groups. This allows you to have different levels of high availability in the same event processing cluster.
Deploy the application using the Visualizer. Target the application at the group you created, or the AllDomainMembers group.
Test the application, typically using a CSV Adapter. Note that using a CSV adapter sends all the events to a single event server. To fix this we need to add a JMS output adapter (OutboundTopicAdapter) to our application and then send events from the CSV adapter to the outbound JMS adapter as shown below:
So now we are able to send events via CSV to an event processor that in turn sends the events to a JMS topic. But we still have a few challenges.
First challenge is managing input. Because OEP relies on the same event stream being processed by multiple servers we need to make sure that all our servers get the same message from the JMS Topic. To do this we configure the JMS connection factory to have an Unrestricted Client ID. This allows multiple clients (OEP servers in our case) to use the same connection factory. Client IDs are mandatory when using durable topic subscriptions. We also need each event server to have its own subscriber ID for the JMS Topic, this ensures that each server will get a copy of all the messages posted to the topic. If we use the same subscriber ID for all the servers then the messages will be distributed across the servers, with each server seeing a completely disjoint set of messages to the other servers in the cluster. This is not what we want because each server should see the same event stream. We can use the server name as the subscriber ID as shown in the below excerpt from our application:
<wlevs:adapter id="InboundTopicAdapter" provider="jms-inbound">
This works because I have placed a ConfigurationPropertyPlaceholderConfigurer bean in my application as shown below, this same bean is also used to access properties from a configuration file:
<property name="location" value="file:../Server.properties"/>
With this configuration each server will now get a copy of all the events.
As our application relies on elapsed time we should make sure that the timestamps of the received messages are the same on all servers. We do this by adding an HA Input adapter to our application.
<wlevs:adapter id="HAInputAdapter" provider="ha-inbound">
<wlevs:listener ref="RequestChannel" />
<wlevs:instance-property name="timeProperty" value="arrivalTime"/>
The HA Adapter sets the given “timeProperty” in the input message to be the current system time. This time is then communicated to other HAInputAdapters deployed to the same group. This allows all servers in the group to have the same timestamp in their event. The event is identified by the “keyProperties” key field.
To allow the downstream processing to treat the timestamp as an arrival time then the downstream channel is configured with an “application-timestamped” element to set the arrival time of the event. This is shown below:
<wlevs:channel id="RequestChannel" event-type="ServiceRequestEvent">
<wlevs:listener ref="MissingEventProcessor" />
<wlevs:listener ref="RaiseEventFilterProcessor" />
Note the property set in the HAInputAdapter is used to set the arrival time of the event.
So now all servers in our cluster have the same events arriving from a topic, and each event arrival time is synchronized across the servers in the cluster.
Note that an OEP cluster has multiple servers processing the same input stream. Obviously if we have the same inputs, synchronized to appear to arrive at the same time then we will get the same outputs, which is central to OEPs promise of high availability. So when an alert is raised by our application it will be raised by every server in the cluster. If we have 3 servers in the cluster then we will get 3 copies of the same alert appearing on our alert queue. This is probably not what we want. To fix this we take advantage of an HA Output Adapter. unlike input where there is a single HA Input Adapter there are multiple HA Output Adapters, each with distinct performance and behavioral characteristics. The table below is taken from the Oracle® Fusion Middleware Developer's Guide for Oracle Event Processing and shows the different levels of service and performance impact:
|High Availability Option||Missed Events?||Duplicate Events?||Performance Overhead|
|Section 126.96.36.199, "Simple Failover"||Yes (many)||Yes (few)||Negligible|
|Section 188.8.131.52, "Simple Failover with Buffering"||Yes (few)Foot 1||Yes (many)||Low|
|Section 184.108.40.206, "Light-Weight Queue Trimming"||No||Yes (few)||Low-MediumFoot 2|
|Section 220.127.116.11, "Precise Recovery with JMS"||No||No||High|
I decided to go for the lightweight queue trimming option. This means I won’t lose any events, but I may emit a few duplicate events in the event of primary failure. This setting causes all output events to be buffered by secondary's until they are told by the primary that a particular event has been emitted. To configure this option I add the following adapter to my EPN:
<wlevs:adapter id="HAOutputAdapter" provider="ha-broadcast">
<wlevs:listener ref="OutboundQueueAdapter" />
<wlevs:listener ref="PrintBean" />
<wlevs:instance-property name="keyProperties" value="timestamp"/>
<wlevs:instance-property name="monotonic" value="true"/>
<wlevs:instance-property name="totalOrder" value="false"/>
This uses the time of the alert (timestamp property) as the key to be used to identify events which have been trimmed. This works in this application because the alert time is the time of the source event, and the time of the source events are synchronized using the HA Input Adapter. Because this is a time value then it will increase, and so I set monotonic=”true”. However I may get two alerts raised at the same timestamp and in that case I set totalOrder=”false”.
I also added the additional configuration to config.xml for the application:
This causes the primary to tell the secondary's which is its latest emitted alert every 1 second. This will cause the secondary's to trim from their buffers all alerts prior to and including the latest emitted alerts. So in the worst case I will get one second of duplicated alerts. It is also possible to set a number of events rather than a time period. The trade off here is that I can reduce synchronization overhead by having longer time intervals or more events, causing more memory to be used by the secondary's or I can cause more frequent synchronization, using less memory in the secondary's and generating fewer duplicate alerts but there will be more communication between the primary and the secondary's to trim the buffer.
The warm-up window is used to stop a secondary joining the cluster before it has been running for that time period. The window is based on the time that the EPN needs to be running to be have the same state as the other servers. In our example application we have a CQL that runs for a period of 10 seconds, so I set the warm up window to be 15 seconds to ensure that a newly started server had the same state as all the other servers in the cluster. The warm up window should be greater than the longest query window.
When we are running OEP as a cluster then we have additional overhead in the servers. The HA Input Adapter is synchronizing event time across the servers, the HA Output adapter is synchronizing output events across the servers. The HA Output adapter is also buffering output events in the secondary’s. We can’t do anything about this but we can move the Coherence Cache we are using outside of the OEP servers, reducing the memory pressure on those servers and also moving some of the processing outside of the server. Making our Coherence caches external to our OEP cluster is a good idea for the following reasons:
To create the external Coherence cache do the following:
|OEP Server 1||OEP Server 2||Cache Server 1||Cache Server 2|
We have now configured Coherence to use an external data grid for its application caches. When starting we should always start at least one of the grid servers before starting the OEP servers. This will allow the OEP server to find the grid. If we do start things in the wrong order then the OEP servers will block waiting for a storage enabled node to start (one of the WKA servers if using Unicast).
We have now created an OEP cluster that makes use of an external Coherence grid for application caches. The application has been modified to ensure that the timestamps of arriving events are synchronized and the output events are only output by one of the servers in the cluster. In event of failure we may get some duplicate events with our configuration (there are configurations that avoid duplicate events) but we will not lose any events. The final version of the application with full HA capability is shown below:
The following files are available for download:
The following references may be helpful: