Creating Scalable Fast Data Applications using Oracle Event Processing Platform (Setting Up an Active-Active Oracle CEP Domain)
By Ricardo Ferreira-Oracle on Mar 19, 2013
This article will discover some technical aspects that should be considered if you are involved in serious implementations of Oracle CEP, the technical foundation of the Oracle's strategy for Fast Data called Oracle Event Processing Platform. It is expected that you have some basic knowledge about Oracle CEP, JMS and some knowledge about programming using Java.
Fast Data and the Concern with Scalability
There is no such thing of application not meant to grow. Every application, even the simpler ones should expect some growth across the months or years during the time they are up and running. Growing is a consequence of a lot of things such as organizational growth, application maturity which in turn gives users more confidence to use it, an marketing campaign that worked and brought much more clients than expected, more front-ends enabling people to interact with your application through other types of devices, exponential generation of data from social networks that your application is configured to listen to, an market opportunity that demands more of your software or perhaps just natural growth of the users installed base.
It doesn't matter the source of growing, your application need to be ready to scale up. And this is true no matter which architectural style you're considering it as your strategy. Of course, there are some architectural styles that suggests a moderated growth like the client-server style or maybe the monolithic style. But take for instance the SOA ("Service-Oriented Architecture") style. The basic concept behind this architectural style is the reuse of services, which are the building blocks of the functional architecture, representing the business knowledge (standards, culture, procedures, routines) of an organization in a form of reusable functions. As much the reuse growths, more scalable your SOA foundation must be. You virtually can't predict the level of reuse of your services, but the key thing is, you should design your services to really scale up.
Another great example of architectural style that need to be designed to scale up is EDA ("Event-Driven Architecture"), which basically deals with processing of heterogeneous events coming from different sources, with different message formats and more importantly, with event channels that could potentially generate a number of events with different throughput's, frequencies and volumes. In the SOA architectural style this could happens too of course, but the scariest thing about EDA is that you don't necessarily deals with fixed message schemas, neither with well known message contracts. The previous knowledge about message contracts and schemas gives you the ability to predict the message size that the hardware infrastructure must deal with, an important requirement when you are sizing an infrastructure based on reasonable levels of reuse, like in the case of SOA.
As mentioned earlier, in the EDA architectural style you cannot predict the message schema or contract of yours events. It can be virtually any message format containing both structured and/or unstructured content. An good event-driven solution must be able to deal with this kind of situation, making the task of sizing an ideal hardware infrastructure really tough. Sizing an ideal hardware for an event-driven solution is a combination of both science and imagination. There are a lot of things to consider, a lot of scenarios to evaluate, a lot of hardware and/or software failures to predict and a huge number of situations that could potentially stops your application to run due hardware resources limitation, even in the first five hours running in production. Believe me, it is really tough.
Designing an event-driven solution that are ready to scale up demands more from the regular architect role that we found nowadays. It requires deep knowledge of the problem domain, deep knowledge of distributed systems, deep knowledge of servers systems (and perhaps engineered systems), deep knowledge of enterprise integration patterns and deep knowledge of the software's stacks used to build the solution, regardless if it is a proprietary, open-source or a combination of both.
Why do you need to worry about scalability? Because ten years ago the market demanded for event-driven solutions prepared to handle hundred of events per second. Today is the time of building event-driven solutions that should handle thousands of events per second. Fast Data, one of the new buzzwords of IT, demands for event-driven solutions that should handle millions of thousands events per second. My advice for any architect responsible for an event-driven solution should be, thinking in scalability as a huge main goal just like the problem domain to be solved.
Business Scenario for Scalability Study on Oracle CEP
Let's start our study of applying scalability in EDA considering a business scenario. Imagine that you are designing a EDA solution that combines technologies like CEP, BAM and JMS to deliver near real-time business monitoring of KPIs ("Key Performance Indicators") for an financial services company. All the information needed to process the KPIs came from a JMS channel that must be listened by an specialized adapter. Those JMS messages will be the business events. Inside every business event, there are information about payment transactions, containing some data like the total amount paid, the credit card brand, etc. The idea here is to process those payment transactions as they happens, in order to generate valuable KPIs that could be monitored through a near real-time monitoring solution like a BAM. For simplicity reasons, let's consider only one KPI for instance, and concentrate our focus on the CEP layer that is responsible for the KPIs compilation and aggregation. The example KPI will be the total count of payment transactions per second.
In order to compute this KPI, the CEP layer must execute the count aggregation function onto the stream of events, considering only those events of the last second. This means that this KPI will be compiled and aggregated on every one second (1000 milliseconds of time window) and the output should be also generated on every one second. The EPN ("Event Processing Network") of this business scenario should be something simpler like this:
Reading this EPN is not that complicated. You must basically read the flow from the left to the right. The basic idea behind this EPN is: listen the business events from an JMS adapter, put those events sequentially based on their temporal order into a event channel, compute the KPI based on the stream of events using an processor, send the generated output event (the KPI itself) to an new event channel and finally, present the KPI into the server output console using a custom adapter.
The event model of this EPN is composed by two simple event types. The first event type is the concept of the payment transaction, which acts in the EPN as event source. This event type contains three fields: an dateTime field that tells you the exactly moment that the payment transaction occurred, an amount field that reveals the amount paid for a product and/or service, and an brand field that tells you which credit card type was used in the payment transaction. The second event type would be the transactions per second KPI, which acts in this EPN as complex event. The only field that this event type has is the totalCountTPS, which represents the computed value of this KPI. The following UML class diagram summarizes this event model.
All the payment transactions are received through an built-in JMS adapter called in the EPN of paymentTransactionsJMSAdapter. This adapter is configured to listen to an JMS destination through an dedicated connection factory. The listing below is the configuration file for this JMS adapter.
The processor that will compute the KPI is also very simple. It basically counts the events coming from the event channel, filtering only those events that make part of one whole second. It also filters those events that has some meaningful values in the amount and brand fields, to prevent the computation of the KPI based on dirty events. The following CQL ("Continuous Query Language") statement are used to compute the KPI:
Finally, let's take a look in the last part of the EPN, which is the custom adapter created to print the results of the TransactionsPerSecondKPI event type in the server output console. As I mentioned earlier, for simplicity reasons I will not show how this event will be monitored in a BAM. Instead, I have created a custom adapter using the Oracle CEP adapters API to print in the server output console the content of the totalCountTPS field present in the TransactionsPerSecondKPI event type. The listing below is the implementation in Java of this custom console adapter.
As you can see in the code, this custom adapter doesn't make anything special. It just access the totalCountTPS field from the TransactionsPerSecondKPI event type and print the value in the server output console. The idea here is just to monitor in near real-time the computation of the KPIs, in order to test the behavior of Oracle CEP when is running in a single JVM or when its running in clustered on multiple JVMs.
Creating a Testing Environment to Simulate the Payment Transactions
Now that we are all set regarding the business scenario, we can start the tests. You need to create a simple Oracle CEP domain to test this application. You will also need an JMS provider to host the JMS destination. I would recommend you to use Oracle WebLogic as the JMS provider, but feel free to use any JMS provider compliant with JMS 1.1. Setup your JMS provider and create an queue to be used in the tests, and setup your Oracle CEP JMS adapter to listen this queue. Later in this article I will discuss more about the difference of using queues and topics, but for now let's just focus on the functional testing.
Implement the following Java program in your development environment. The program is just an example about how to send the JMS messages to an queue, and it considers that you are connecting to an WebLogic JMS domain. If you prefer using another JMS provider, you should adapt this program to correctly connect to your host system.
What this program does is continuously send ten messages to the specified JMS queue. As you can see in the code, after sending the messages, it takes a pause of one second, considering the elapsed time taken to send the messages to the JMS queue. This program never ends, unless of course that the user terminate the JVM created.
Starting the Functional Tests with one Single Oracle CEP JVM
Make sure that all your development environment are up and running, including your JMS provider and an fully operational Oracle CEP domain. Deploy the Oracle CEP application into this domain and run the client JMS application. An recorded result of this functional test is published on Youtube, so you can check it out the results of this test.
In the following sections, it will be discovered two different approaches for applying scalability. These approaches will be applied in the scenario of near real-time business monitoring of KPIs, transforming it into a more scalable solution that could handle the growing of the number of events just adding more Oracle CEP JVMs across the same and/or multiple server systems.
The Simpler Scalability Approach Ever: Using JMS Queues
Being you just designing an application that should deal with asynchronous calls or, just worried about delivery guarantees of messages, use JMS queues is always a good choice. Consumer applications connected into a JMS queue works hard for the reading of the most recent message, in a FIFO style. This means that each consumer will enter in a race condition along with other consumers to compete about which one gets the maximum number of messages possible. For the application consumer perspective, this could be a hard problem since there are no guarantees that an specific message will be consumed by an specific consumer, but for the JMS queue perspective, this is a really powerful scalability technique. The reason for that is because each consumer application will work hard to provide maximum throughput for the consumption of messages.
Imagine for instance an consumer application connected to an JMS queue, running on a server system with 24 cores of CPU, being two hardware chips with 6 cores each, using the Hyper-Threading Intel technology. If we realize that this consumer application running on this type of hardware gives us an average throughput of 3,000 TPS ("Transactions per Second"), we can certainly assume that putting another copy of the same consumer application, running on another server system with the same hardware configuration will give us an average throughput of 6,000 TPS. This is what we call horizontally scalability, when you increase the throughput of an software based application adding more server systems that will engage its hardware resources to an specific software goal, which in this case is consume as fast as possible the messages from an JMS queue.
This type of scalability technique delivers another important advantage: when you increase the total count of server systems running the consumer applications, you minimize the number of messages that each server system will need to handle. This seems to be a little bit contradictory isn't it, since in theory should be "a good thing" that each server system handle more messages as possible. Well, it is a good thing, but like anything else in software architecture, there are trade-offs. Consider for instance each consumer application running on the same hardware configuration. In this scenario, is reasonable to think that each server system will handle the same number of messages in average, because each server system are putting its hardware resources to work equally, and due the nature of the JMS queues (FIFO style consumption), the total number of messages of the queue will be divided to the number of server systems available.
The problem here is the memory footprint of each JVM running the consumer application. If you are consuming messages from the JMS queue with fewer server systems, the total number of messages that each server system will need to handle will be higher. Since each received message is allocated in the heap memory of the JVM, the total size of the heap will increase. Did you know that an javax.jms.MapMessage with an payload of 256 bytes allocates more than 400 bytes in the heap space? Imagine all those messages being received by your consumer application in the same time. You could reach the maximum size of your heap in a question of seconds. And the problem of reaching the maximum size of an JVM is the inevitable execution of the garbage collector. When the JVM detects that the heap memory are full (or almost full depending of the algorithm used) or to much fragmented, it engages the garbage collector threads to reclaim the allocated memory and/or rearrange the heap layout space. Depending of the situation of the heap memory, the JVM could use almost all the CPU cores of the server system to accomplish this task, and that's when your consumer application starts to be slower than usual, presenting performance issues and becoming a system bottleneck.
Let's consider the usage of JMS queues in our business scenario of near real-time business monitoring of KPIs. Each Oracle CEP JVM would be connected to an JMS queue that would maintain the messages of payment transactions. Each Oracle CEP JVM would act as an consumer application through its JMS adapter. Considering that each Oracle CEP JVM (or a group of it) would be running on separated server system, we could assume that the increasing of the number of server systems will increase the average throughput of message consumption from the JMS queue, and also that each Oracle CEP JVM would handle a reasonable number of messages on its JVM heap. Enough of theory, let's in see in practice how this scalability approach could be applied in our business scenario.
The first thing to do is transform your Oracle CEP domain in a multi-server and clustered domain. You can find a comprehensive set of information in the product documentation to help you doing this, but I will highlight the main steps for you here.
In the root directory of your Oracle CEP domain, you will find a sub-directory called "defaultserver". As the name suggests, this is your default server that are created automatically during the domain creation. For development and staging environments, this server is more than enough. But if you want to expand your domain, will will need to change that. Rename the default server directory from "defaultserver" to "server1". After that, make two copies of this directory, and call these newly created directories of "server2" and "server3" respectively.
Now you have to setup up some aspects of the cluster. Open the configuration file of the server1 in a text editor. The configuration file of the server1 can be found in the following location: <DOMAIN_HOME>/server1/config/config.xml. With the configuration file opened in the text editor, you will have to change the "netio" and "cluster" tags. Change the port value of the "NetIO" component to "9001", and the port value of the element "sslNetIo" component to "9011".
In the cluster tag you will have to define how the server1 will behave together with other members. Edit the cluster tag according to the listing below:
That's it. This is what have to be done to transform a simple server in a clustered-aware member. Repeat the same steps to the server2 and to the server3. Just keep and mind that if you plan to execute those servers in the same server system, will need to define different ports for each server. Also remember that the tag "server-name" must match with the server name you gave, which would be the name of the directory of the server.
One of the advantages of using JMS queues as your scalability approach is that you don't need to change anything at your Oracle CEP application. Just deploy the same application in all the new servers and start the tests again. Start the servers server1 and server2 and run the client JMS application. You will see in the servers output that each server will be generating the totalCountTPS KPI based on the number of events that it could receive from the JMS queue, which would be the division of the total count of events (ten events per second) by the total count of servers, which in this case is two. This will result in five events per server in average. If you start the server3 during the processing of events, you will see that the total number of events that each server is handling will decrease, which is an evidence that the scalability is really working due the fact that the servers partitioned the load of events.
An recorded result of this second test are also available on Youtube. Watch out the video below for how using JMS queues affects the scalability of your Oracle CEP applications.
So far, it seems that using JMS queues as your scalability approach is the right thing to do right? In fact, for the most demanding scenarios, this approach should be enough. But you should be aware of one catch: in-flight events can be missed during failures. I mean "in-flight" for all those events that had been received by the JMS adapter and an acknowledge of this receiving has been sent back to the JMS provider. On that moment, the message are not longer in the JMS queue, and more importantly, no other Oracle CEP JVM is aware of this event. This means that there are no recovery of events during failures in this approach. If you cannot tolerate missing events, using JMS queues for scalability is not the most reliable approach. But if your scenario can tolerate missing events, don't think twice and choose this approach once is simpler and does not implies in changing the Oracle CEP application.
Reliability Really Matters: Complicating Things Just a Little Bit with JMS Topics
OK, so you realized that you cannot tolerate missing events, and you need to improve as much as possible the reliability of your Oracle CEP application. The road to achieve this is quite more longer than just using JMS queues. There are some challenges that you need to surpass in order to conquer this level of reliability when no missing events can be tolerated without of course putting scalability aside. The challenges that you will need to surpass are:
- Provide guarantees that all of the JVMs are aware of all events
- Equally distribute the load of events across all of the JVMs
- Ensure that even in-flight events are completely synchronized
- Provide backups for every cluster member to ensure HA
The good news is, all those challenges can be easily surpassed, and the Oracle CEP product provides native support for all that stuff. But the bad news is, compared to the previous approach, the final solution starts looking like more complicated in terms of design and in terms of product configuration. Let's learn how apply this type of scalability approach (with the highest level of reliability that exists) in our near real-time business monitoring of KPIs scenario.
The situation has now changed. You need to make the solution available in two different data centers, working in a active-active schema and with high availability assured, in case of failure of the primary servers. There is no chance that any event be missed since the operational people from the financial services company must rely with the KPIs to take important decisions.
The two data centers are located in the state of California, but are geographically separated. The first one is based in Redwood, and the other one is based on Palo Alto. These data centers are connected through a fiber optical cable with an 10 GB/s of high bandwidth.
There are some changes that need to take place both in the Oracle CEP application and at the Oracle CEP domain. Let's start with the Oracle CEP application changes. First, you need to modify the EPN assembly descriptor file to include an instance of the following Oracle CEP component: com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean. This special component dynamically manages the group subscriptions available to partition the incoming events between the cluster members.
Secondly, you need to synchronize all the events with all members that belongs to the cluster, including in-flight events. This can be explicitly done using special HA adapters that Oracle CEP make available. Change your EPN flow to include just after of your input adapter an instance of an HA adapter. This adapter need to be aware about which property of your event type carries the information about its age. In that case, you need to configure in this HA adapter the "timeProperty" property. This is necessary to provides guarantees that even in case of failure of one cluster member, the ordering of the events won't be affected. You will also need to create an instance of an HA correlating adapter, just before your output adapter. After this changes, your EPN assembly descriptor should include the following components:
Now here comes the most important change of this approach, which is change the JMS destination from an queue to an topic. This change should be executed both at your JMS provider (because you need to explicitly create an topic endpoint) and at your JMS adapter configuration file. Using JMS topics provides guarantees that all of the JVMs are aware of all events. You will also need to define a events partition criteria at your JMS adapter configuration file. Since topic consumers acts more like subscribers instead of just consumers, every consumer application will receive a copy of all events sent to the topic endpoint. This means that an automatic partition of the events won't happen, but this need to occur in order to provide real scalability.
Thanks to the JMS technology, there is one way of provide some criteria during the messages consumption, which is using selectors. Selectors gives you the possibility to apply the Content Based Router EAI pattern based on existing header/property values. Change the JMS adapter configuration file to include selectors criteria to the message consumption:
Let's understand the changes. The group bindings entries provided tells the JMS adapter which events listen to. The first group binding tells that messages containing "rw" in the site property should be listened only by the Redwood site. The second group binding tells that messages containing "pa" in the site property should be listened only by the Palo Alto site. Each group binding is associated with a group id, which reveals what servers will be able to receive those events. For instance, in the case of the first group binding, only servers associated with the group "ActiveActiveGroupBean_Redwood" will be able to receive events that belongs to the Redwood site. This configuration will ensure that the load of events will be equally distributed across all the JVMs. Let's start the configuration of the Oracle CEP domain.
In the root directory of your Oracle CEP domain, create four copies from one of your current servers, naming them "rw1", "rw2", "pa1" and "pa2" respectively. Since the solution must be available in two different data centers, one in Redwood and another in Palo Alto, these four servers will sustain this topology. The rw1 and rw2 servers will be the Redwood servers, being rw1 the primary and rw2 its backup. The pa1 and pa2 servers will be the Palo Alto servers, being pa1 the primary and pa2 its backup. The idea here is to provide an active-active load balancing between servers rw1 and pa1, each one having its backups on each site. Those backups will ensure high availability for every cluster member. The final topology should be something similar to this:
You need to the change the server's configuration file in order make this topology works. Servers rw1 and rw2 should be part of the "ActiveActiveGroupBean_Redwood" group, and servers pa1 and pa2 should be part of the "ActiveActiveGroupBean_PaloAlto" group. Apply the configuration below on each server's file configuration of the Oracle CEP domain:
Don't forget to change the ports of the "NetIO" and "sslNetIo" components if you plan to execute all those servers in the same server system. In terms of Oracle CEP domain, this is all the configuration necessary, so we are all set in terms of infrastructure. Before starting the tests, we need to adapt the client JMS application in order to provide two things. First, to send the messages to an topic instead to an queue. Second, to provide the "site" property on each message, in order to sustain the message criteria provided in the JMS adapter configuration file. Implement the client JMS application according to the listing below.
This version of the client JMS application didn't change so much compared to its original version using queues, except from the usage of the message property called site. This message property will be used by the selectors as criteria to load-balance the incoming events across all the servers. In the real world, this type of message enrichment is commonly applied by an architectural mechanism that is situated before of the Oracle CEP JVMs, which could be an ESB, an application server or an corporate load balancer. Regardless which mechanism implementation will intend to use, in terms of architecture, it is responsibility of this mechanism to provide this message enrichment, another EAI pattern that must be part of your final solution.
Now we can start the tests. Deploy this new version of the Oracle CEP application into the servers (rw1, rw2, pa1 and pa2) and run again the client JMS application. An recorded result of this third and last test are also available on Youtube. Watch out the video below for how using JMS topics affects the scalability of your Oracle CEP applications.
Today more than ever, solution architects and developers should be aware about what kind of techniques can be applied in their solutions in order to provide real scalability. Trends like Fast Data are biggest motivators for that. This article discussed the importance of scalability, specially when necessary in event-driven solutions. The article, through an didactic business scenario, showed how to apply scalability in Oracle CEP applications, discovering the pros and cons of two different approaches. Finally, the article showed in details how to implement the two approaches in the Oracle CEP example application.