In today's world, data is being produced at an exponential rate. While new big data technologies are available to capture all of this information and process it to discover interesting trends, sometimes we need to take immediate action or would like the current status of a specific item. To accomplish this, you could store all of the incoming data, but the trade-off could be potentially maintaining a lot of data that you don't need and requiring users to write queries that first determine the most current row. Another technique could be to perform database updates, but this could lead to database contention in scenarios where a single row is updated very frequently, like in the domain of IoT where devices can sending out frequent status messages.
One way to solve this problem that may not be familiar with is through the use of Oracle Stream Analytics (OSA). OSA has an extremely sophisticate language called "Continuous Query Language" or CQL. It is similar to SQL and is SQL-99 compliant, but it has additional syntax because it operates in-memory and solves problem very efficiently. The problem of eliminating unnecessary updates on the database is one that OSA solves very well, very easily in a single query.
First, we'll explain some basic details about the OSA run-time. While there is a very friendly user-interface for OSA which will allow you to develop an entire application in minutes, you also have the ability to develop an application in a more traditional way using JDeveloper. When developing with JDeveloper, you will need to understand that the application model is called an "Event Processing Network" or EPN.
The EPN handles the flow of data through your application and contains CQL processor nodes to hold the CQL that you wish to be performed. In our case, the CQL to perform this seemly complex task which may have taken dozens of lines of Java code to write is done very easily and efficiently in-memory using a very simple statement as shown in the picture below.
In this query, we are selecting only the attributes that we need (or we could use SELECT * FROM InputChannel ) and using CQL syntax to partition the stream by device id with "ROWS 1" meaning that only 1 of each row per device id will be kept for the time period of 2 seconds "RANGE 2 SECONDS" and the results will be output every 2 seconds "SLIDE 2 SECONDS". In this case, where the "RANGE" value and the "SLIDE" value have the same time frame, we essentially start over from the beginning (with no state in the query) every two seconds.
Note: if 2 seconds is too long for you, you can specify 1 second or even a value specified in milliseconds.
In our test, the data will be generated at random, but we will look at the data and the results to verify if they are accurate. In this case, we are sending the data to Oracle BAM 12c. We will output the data from our OSA application using the JMS adapter which will automatically write out the data as a JMS map message (we simply supply the event type name to the OSA JMS configuration and the out-of-the-box JMS adapter will create the JMS messages without the need for a converter class). This is convenient for our purposes since a BAM "Enterprise Message Source" or EMS can be easily configured in the BAM 12c administration user interface and the map message attributes can be assigned to the data object and everything is done in both OSA and BAM without writing any Java code.
We will make sure that we start with an empty data object for this test and reset the EMS metrics.
Our BAM EMS is configured for "Upsert", this means that it will insert the record if it does not exist (based on the key which is defined as the "deviceID") or it will update it if a row with that device ID exists already in the data object (i.e. database table).
Let's first send a small amount of a controlled set of data to see that it works as expected. We will send 30 events for only 10 unique device IDs and we will intentionally introduce a small delay between each event.
We will see from the BAM EMS metrics that only 10 messages were sent which is correct because there were only 10 different device ids in the sample and 10 messages were persisted.
If we examine the deviceCode and the codeValue for each device ID, we see that these are the most current 10 messages which is what we want the user to see on the BAM dashboard (the latest message from each device).
If you have two keys fields for your "Upsert" operation, you can list them both in the partition by clause separated by a comma as follows:
There will be a larger number of combinations of rows in the data object. But only 1 entry for each Device ID, Device Code combination.
Try it for yourself and you will see that we have achieved our goal of providing the BAM user with the most up-to-date device status extremely quickly and easily without creating unnecessary JMS messages that would have needed to be created, sent over the network to be put on the queue, taken off the queue by the BAM EMS and processed and potentially meet database contention due to so many updates of the same row. We have so easily dramatically increased the efficiency of our solution to meet the goal of delivering to the user the latest device status messages and as an added bonus we've increased operational efficiently by not generating messages that will quickly be overwritten thereby saving CPU usage for other purposes.