The emergence and popularity of “data-in-motion” technologies has grown with the maturity of countless technologies that generate continuous streams of data to cloud applications that monitor and perform analytics 24/7. The data is generated by various entities such as cloud monitoring software, smart wristwatches, cell phones, financial transaction messages, and wireless medical devices, and have various forms (e.g. SWIFT messages, IoT messages, system logs etc.)
For purposes of understanding general issues that most "data-in-motion" technologies need to address, let's focus the discussion on IoT data and a simple "data-in-motion" application. .
IoT data is typically generated by smart devices embedded in consumer products. The job of these devices is to send data to processing agents living on the Internet who then make the device “smart”. These devices include smart locks, smart thermostats, smart security systems, and a variety of other smart devices in various industries (e.g. smart health sensors for patients). They are typically Wi-Fi enabled devices that continually monitor and measure events, reporting information to on-line web services dedicated to capturing, analyzing and reacting to messages reported from the devices.
A characteristic of IoT data is that a device often produces a regular continuous flow of information. The aggregate of all these chatty devices is a large volume of data that needs to be processed by application logic running in Internet services. These applications typically analyze data feeds 24/7 and the applications need to scale out as the sales and usage of the devices grow. The devices are chatty but cryptic (i.e. they generate large amounts of data that is not simple to interpret) and contain device unit identification information and measurements.
The IoT applications need to take these cryptic messages and relate them to “data-at-rest” living under the web service to identify the type of device being reported, the specifications of the device, and to identify the retail customer who owns or uses the device.
In other words, for the device data to be meaningful, it needs to be related to technical data regarding the device and standard CRM (Customer Relationship Management) data. This is information that traditionally lives in relational database systems as SQL tables.
Apache Kafka is used for processing large amounts of “data in motion”. It is a scalable hybrid of traditional queuing and publish-subscribe models which capture live messages as Kafka records which can be read by various applications that analyze the information. The data in Apache Kafka is opaque (i.e. Kafka does not care about the shape of the data). Its job is to capture opaque key value pairs as records created by Kafka producer applications and retain them for a substantial period of time (e.g. one week) so they can be consumed by various and independent Kafka consumer applications based on their own schedules and functional requirements.
Apache Kafka introduces the concept of “topics”, “partitions” and “groups”.
A topic is a collection of records typically having the same shape and format. (e.g. a smart thermostat message containing a sensor type id (integer), a sensor unit id (integer), the temperature setting of thermostat (a floating point number), and the actual temperature (another floating point number), and the timestamp of the reading in epoch time (integer).
A “partition” is a horizontal shard of a topic.
A “group” is a set of instances of a single application, where each instance is dedicated solely to reading and processing a single partition for the group, in a divide and conquer strategy of processing.
The model scales out by expanding the number of systems serving a Kafka cluster, expanding the number of partitions capturing the data, and expanding the number of application instances processing the partitions.
The Challenge of Integrating Kafka messages with data living in SQL tables
The problem of integrating Kafka message with SQL tables comes down to a problem of data living in two places: SQL tables in a database system and topics in a Kafka cluster .
A simple solution is to do the integration within the context of a joint Kafka and SQL application.
Referring back to the IoT applications, this requires the application to read streamed records from Kafka, and then query a database using JDBC to look up device specifications (e.g. device life-span) associated with the device living in a SQL table. Additionally the application might also need to query a CRM table to identify the customer who owns the device, and relate the device to customer contact information (phone, email, address etc.).
The downside of this model is that it forces the application to make one or more queries for each Kafka record processed. At a certain rate of record processing, the cost of visiting a database for every Kafka record received becomes substantial overhead and quickly becomes non-scalable.
To work around the overhead, an alternative method is to export tables living in a database system into the application space. This means caching partial or full copies of a SQL table (either on the OS where the application is running, or within it the application’s own virtual address space).
The problem with this approach is that the tables can get very large and/or may contain volatile information which would require table snapshots to synchronize with the master copy (in the relational database). As the application scales out to run on multiple systems, keeping the local table copies in sync with each other and with the master-copy becomes challenging.
Since the intent is to move database table content local to the application, and since the application needs to scale out and run on multiple systems, the application must deal with the complexities of keeping multiple caches of the table living on different machines up-to-date and consistent.
The general problem becomes worse when there needs to be transaction consistency when processing streamed data and data living in SQL tables. Specifically when streamed data not only needs to be related to SQL table data, but can potentially alter the state of the SQL tables.
For example, consider a financial streaming application that supports the processing of a trading exchange, where the “data in motion” represents posted but unprocessed requests to buy or sell a given quantity of commodities at the current market price. The application needs to process the buy or sell reflecting the current market price of the commodity (stored in a table in the database). Additionally, it may want to adjust the market price in the table, based on the recent pattern of buys and sells reflecting the current trend of supply and demand.
This illustrates a use case where there is a tight feedback loop between the data in motion (i.e. "buys" and "sells"), and the database tables that it references (i.e. "market price"). All of this near real-time activity needs to be done efficiently, within the safety net of a standard database ACID transaction. For these reasons it makes much more sense to stream "data in motion" through a database system.
Introducing Oracle SQL Access to Kafka Views
Oracle SQL Access to Kafka views, referred as OSaK views below, provide a solution to these problems. An OSaK view is simply an Oracle view that is a Kafka client application. It is bound to a Kafka cluster, group, topic and one or more partitions belonging to the topic. When used in a query it reads a set of Kafka records from a topic and transforms them into set of SQL table rows abstracted by the Oracle view definition.
Each query to an OSaK view visits the same topic partitions and reads a fixed number of new records from each partition. In this sense each query to the view is bounded. It fetches rows already existing in a partition, typically up to the partition's "high-water mark" (i.e. the newest record in the Kafka partition) at the time the query was initiated. Any newer data arriving in the partition after the query starts will not be fetched. Typically Kafka applications are interested in polling Kafka topics and getting the newest batch of records written to one or more partitions. OSaK views have been designed to support this mode of access.
A Simple Example
Let’s look at how this works using Oracle SQL.
For simplicity, let’s assume there is a Kafka cluster that has a topic called “sensor” which contains records from smart thermostat IoT devices. These devices are Wi-Fi enabled and transmit readings from households to a server which is monitoring the activity. The key/value Kafka records emitted are in a simple delimited text form and the concatenation of the key and value fields have the following shape:
CREATE TABLE SENSOR_MESSAGE_SHAPE(
sensor_timestamp TIMESTAMP WITH TIMEZONE,
- The “sensor_msg_id” is a unique id (number) of a message sent from the device
- The “sensor_timestamp” indicates when the message was sent
- The “sensor_type_id” identifies the type of smart thermostat
- The “sensor_unit_id” uniquely identifies the actual physical device
- The “temperature_reading” is what the temperature was at the time the message was sent
- The “temperature_setting” is what the customer set as a desired temperature
Now we want to create an OSaK view that reflects the shape of SENSOR_MESSAGE_SHAPE, and accesses messages in Kafka, rendering them as rows in SQL.
Registering a Kafka cluster and creating an OSaK view
Management of OSaK views is done by calling procedures and functions in the ORA_KAFKA package.
First we need to call ORA_KAFKA.REGISTER_CLUSTER which associates a user defined cluster name (e.g. ‘SMARTSTAT’) to the connection attributes needed to access the Kafka cluster owning the “sensor” topic. This includes the Kafka bootstrap server (e.g. kafkahost:9092), and Oracle directories that manage the Kafka client software, and logs containing any errors involving connectivity or problems interpreting the message format.
‘Smart Thermostat Kafka cluster’
We then create a single OSaK view that maps to the Kafka cluster and the “sensor” topic living in the cluster. Since Kafka topics need a uniquely named Kafka group id in order to access the topic we provide it with one called “monitor”. (Note the uniqueness of “monitor” as a Kafka group id requires coordination with the admin of the Kafka cluster.) We also use the shape table previously defined to define the columns living in the OSaK view.
'SMARTSTAT', -- Registered Kafka cluster name
'monitor', -- Kafka group id of the sensor topic
'sensor', -- The name of the topic in Kafka
'CSV', -- The representation of data
'SENSOR_MESSAGE_SHAPE', -- The relational shape of the data
views_created, -- Number of views created
application_id -- An id that is the prefix of view names created
At this point one OSaK view has been created which will access data in all the partitions in the “sensor” topic. The view's name is canonical, reflecting the cluster, group, topic names.
Currently two representations of data are supported. Key value pairs of delimited text (e.g. 'CSV'), or key value pairs of JSON strings which become two VARCHAR2 columns: 'key' and 'value'. In the JSON case one applies Oracle JSON operators in the query to extract desired fields from these two columns. (Note that in the near future, AVRO will also be supported.)
When one describes the view in SQL*PLUS, the view has the same columns as SENSOR_MESSAGE_SHAPE. These columns are preceded by system defined columns that describe the Kafka partition, Kafka offset, and Kafka epoch timestamp of the message.
Name Null? Type
------------------------------------- -------- -----------
KAFKA$PARTITION NOT NULL NUMBER(38)
KAFKA$OFFSET NOT NULL NUMBER(38)
KAFKA$EPOCH_TIMESTAMP NOT NULL NUMBER(38)
A query against the view would produce the Kafka message in relational form.
SQL> SELECT sensor_unit_id,
SENSOR_UNIT_ID SENSOR_TYPE_ID TEMPERATURE_READING TEMPERATURE_SETTING
-------------- -------------- ------------------- -------------------
23464 130 77.348 74.445
57168 132 73.449 71.984
23576 127 73.29 75.285
75350 153 63.007 65.871
14431 116 73.649 71.664
But how does this read work so each query reads sequentially, and always retrieves new rows from Kafka partitions? The answer to these questions require a discussion of Kafka offsets and Kafka groups.
What is a Kafka offset and a Kafka group?
Since an OSaK view is a Kafka consumer application it needs to abide by Kafka’s rules of access.
The OSaK view names are generated names that reflect the name of a Kafka cluster, the Kafka group it serves, and the topic in the Kafka cluster from which it is reading messages. It maps to one or more partitions living in that topic.
The most important rule Kafka imposes is that an application needs to identify itself with a unique Kafka group id, where each Kafka group has its own unique set of offsets relating to a topic.
A Kafka offset is simply a non-negative integer that represents a position in a topic partition where an OSaK view will start reading new Kafka records.
To better understand this model, an analogy is useful.
Think of a Kafka topic as being a large volume of books pertaining to a particular subject, except each volume magically adds more pages (i.e. Kafka records) to read every day. Think of a Kafka partition as being one volume that needs to be read. To process these volumes quickly, one would want to use multiple readers, each of whom is responsible for reading one volume. Each reader is assigned the same task: to exclusively read some pages, do some analysis, and then go back to repeat the process forever, always starting where he left off reading. Each reader is allocated one bookmark to remember their current position.
Kafka offsets are the bookmarks for partitions living in a topic. A group gets one bookmark per partition, which means the group cannot have two readers reading the same partition, otherwise one or both readers are going to get lost, either reading messages already read, or missing messages that were never read.
OSaK Views and Kafka groups and offsets
Getting back to OSaK views, they map to a topic and will exclusively read one or more partitions in that topic on behalf of the application identified by the group name.
Each view knows which partitions it is accessing and knows the offsets for those partitions where it needs to start reading new records. Queries to OSaK views are augmented by PL/SQL procedures which advance the offsets for each query. They also record the number of rows read from each partition belonging to the view, after the query completes.
Below is a canonical example of how to use OSaK views to read Kafka records using sequential access. At the top of the loop, the offsets of Kafka partitions owned by the view are set for the next iteration of the query. After the query completes, they also record offsets of the last records read for the partitions read are captured in system metadata.
The query against OSaK views never lands data on disk in Oracle. The data is streamed from Kafka into the Oracle query processing engine, and the application harvests the results.
Note that Kafka offsets are managed by Oracle and not Kafka. They live in system tables tracking the positioning of offsets for all partitions accessed by the KV_SMARTSTAT_MONITOR_SENSOR_0 view.
The NEXT_OFFSET call simply binds the KV_SMARTSTAT_MONITOR_SENSOR_0 view to a new set of offsets that represent new data living in Kafka partitions accessed by the view. The UPDATE_OFFSET communicates how many rows were read for each partition and advances the offsets to new positions.
The COMMIT guarantees that this unit of work is atomic.
When a ROLLBACK is issued explicitly by the application logic, or implicitly after a system failure, the offsets are not advanced. When the application executes again it will re-read the records that were not fully processed.
Any processing done within the loop will be done once or not at all.
Repeatable reads means the ability to scan a table or view multiple times within a transaction, and get the identical set of records.
Kafka does not support repeatable reads. Since OSaK views are Kafka consumer applications, this holds true for queries referencing OSaK views. Kafka is typically receiving new records in its partitions and advancing the "high-water" mark of each partition. Within an OSaK view processing loop, the offsets stay the same, but the "high-water" mark will be advancing in Kafka, reflecting the addition of new Kafka records needed to processed.
What this means is when you query the OSaK view multiple times within a transaction you will most likely see new records. (Alternatively, if your application was reading from offsets pointing to the oldest records in partitions, there is a chance that they could be purged before you read again).
In our previous example, there is no join in the query and only one query is executed within the processing loop, so the data read from Kafka is only read once. After each loop the offsets are advanced and new data is fetched. For applications using SQL for scanning Kafka data sequentially once in a transaction (e.g. using SQL to probe for Kafka data alerts using a simple SQL “where” clause), repeatable reads are not an issue.
Repeatable reads become an issue if a processing loop wants to query the view more than once in one or more SQL queries, or if the view is implicitly scanned more than once in a join, where the view is not the outer table of a FROM clause in a SQL query.
You can achieve repeatable reads by reading the OSaK view once into a memory based Oracle private temporary table at the outset of the loop. This freezes the result set in the temporary table, which can be used in multiple, arbitrarily complex SQL queries within the transaction loop.
Example of using Private Temporary Tables with OSaK Views
Below are two queries against the same Kafka data. One gets a count of sensors whose thermostats are behaving badly. The other identifies the name and email of the customers of those thermostats.
In the processing loop, we want to issue multiple queries from our OSaK view and relate the data to a sensor specification table and a sensor customer table living in Oracle.
SQL> DESCRIBE SENSOR_SPECS;
Name Null? Type
----------------------------- -------- ----------------------------
SENSOR_TYPE_ID NOT NULL NUMBER(38)
SQL> DESCRIBE SENSOR_CUSTOMERS;
Name Null? Type
----------------------------- -------- ----------------------------
SENSOR_UNIT_ID NOT NULL NUMBER(38)
To enforce repeatable reads we capture the scan of the OSaK view into a Oracle private temporary table at the outset of the loop, then use the results living in the private temporary table to do subsequent processing. After COMMIT we truncate the table and loop back to read new Kafka records.
/* Create a private temporary table with the shape of the OSaK View */
CREATE PRIVATE_TEMPORARY TABLE ORA$PTT_KV_SMARTSTAT_MONITOR_SENSOR_0
ON COMMIT PRESERVE DEFINITION AS
SELECT * FROM KV_SMARTSTAT_MONITOR_SENSOR_0 WHERE (1=0);
INSERT /* append */ INTO ORA$PTT_KV_SMARTSTAT_MONITOR_SENSOR_0
AS SELECT * FROM KV_SMARTSTAT_MONITOR_SENSOR_0;
// Identify bad thermostat devices where the variance between the
// temperature setting and the actual reading exceeds the normal
// variance for the device.
FROM SENSOR_SPECS s,
AND ABS(m.temperature_setting - m.temperature_reading) >
// identify customers owning the bad devices
SELECT c.full_name, c.email
FROM SENSOR_SPECS s,
WHERE m.sensor_type_id = s.sensor_type_id AND
m.sensor_unit_id = c.sensor_unit_id AND
ABS(m.temperature_setting - m.temperature_reading) >
TRUNCATE TABLE ORA$PTT_KV_SMARTSTAT_MONITOR_SENSOR_0;
You can enforce repeatable reads by using Oracle global temporary tables as well. The difference is that private temporary tables are lighter-weight and don't support indexes, while global temporary tables do.
In general, if the application is processing Kafka data in near real time, private temporary tables are a better fit. If however the application is accessing the Kafka cluster just once a day to analyze a large set of data using queries which need indexes, then global temporary tables would be the better choice. (Note that private temporary table feature is available in database versions 18c and above. For previous versions use global temporary tables.)
The Relationship between Kafka Topic Partitions and OSaK Views
In the above example, an OSaK view was created by calling ORA_KAFKA.CREATE_VIEWS and asserting that only one view be created. This means that the view pulls data from all partitions living in a Kafka topic.
Alternatively the call to ORA_KAFKA.CREATE_VIEWS could have asserted N views created, where N is the total number of partitions in the topic. If N is 12, then 12 different views would be created, each one consuming and tracking the offset of a single partition. The implementation allows ORA_KAFKA.CREATE_VIEW to create any number of views between 1 and N, but the predominate use cases are at the extremes where either a view maps to just one Kafka topic partition or to all partitions.
So, when do you create one view that maps to all Kafka partitions versus creating multiple views that each map to a single Kafka partition?
Use Case of One View Mapping to All Kafka Partitions
The use case for a view that maps to all topic partitions is when all the analytics are being done by SQL. The application is relying on Oracle SQL to do the heavy lifting with one or more SQL queries to get the answers it wants. The query results are either returned directly to the calling application or stored in an application defined summary table at the end of the canonical processing loop described earlier.
In this case, there is only one application instance that drives the analysis. It runs forever in a loop, advancing the offsets of the view, executing SQL, storing or emitting the results, updating offsets, and committing. The application could be as simple as running a UNIX cron job executing a SQL*Plus script every five minutes.
The SQL queries can scale out to process multiple partitions concurrently by increasing the DOP (degree-of-parallelism) of the SQL query. If for example the DOP is set to 12 (where 12 is the number of Kafka partitions), SQL will process the 12 partitions in parallel.
This architecture is very similar to the architecture for Oracle External tables processing data living in Hadoop HDFS files (which can process 12 to 15 terabytes per hour) when running on an Exadata machine where DOP can be as high as 256. When running on similar Exadata systems, OSaK views can process 256 Kafka topic partitions concurrently.
Use Case of One View Mapping to a Single Kafka Partition
The case for multiple views, each owning just one partition, is when the analytics are very domain specific and require specialized logic to be executed in the application. In this case you are either using SQL to do some preliminary analytics but passing on interim results to the application to finish the job, or you are using SQL to relate cryptic Kafka records with meaningful SQL data by joining it with Oracle tables. The data from such tables provides the application with additional information and context it needs to do its analysis.
In this use case, since application logic is doing the heavy lifting it will require high CPU and memory resources to process and analyze large amounts of data being returned by SQL.
Here you would want to run N application instances each reading one OSaK view that exclusively consumes one Kafka partition. This allows the application to scale out and to process Kafka partitions concurrently on multiple machines.
Loading Oracle Tables
For users who simply want to load an Oracle target table that is available to any number of Oracle clients without the Kafka access restrictions discussed above, they can use the LOAD_TABLE command.
num_rows_loaded INTEGER := 0;
load_succeeded := ORA_KAFKA.LOAD_TABLE(
'SMARTSTAT', -- Kafka cluster name
'loadsensorhistory', -- Kafka group id
'sensor', -- Kafka topic
'CSV', -- Data representation
'SENSOR_HISTORY', -- Target Oracle table
num_rows_loaded -- Number of rows loaded);
IF load_succeeded THEN
DBMS_OUTPUT('loading succeeded. rows loaded = ' ||
This call's interface is similar to ORA_KAFKA.CREATE_VIEWS.
On it's first call, the procedure creates an OSaK view mapping to all partitions in the topic and loads the Oracle table 'SENSOR_HISTORY' with records from all partitions up to their offset high water mark. On subsequent calls, it simply loads new records into the table using the existing OSaK view and advances the offsets. This would be called by a single instance application (e.g. a cron job running a SQL*Plus script, or a simple JDBC application). This call maintains a load history with statistics that track the number of records loaded, and the time taken to process each incremental load.
Current and Future Functionality
Current functionality includes:
- The ability to configure one or more Kafka clusters either securely for production or unsecured for development.
- The ability to define OSaK views which map to one, some, or all partitions.
- The ability to query topics using an OSaK view for doing sequential access.
- The ability to seek by offset when an OSaK view maps to a single topic partition (this feature is only available when an OSaK view maps to one partition).
- The ability to do incremental loads of Kafka topic records into a standard Oracle table.
- Support for JSON or delimited text representation in Kafka key/value pairs.
- The ability to initialize all offsets for a set of views serving a Kafka group. This allows an application to advance its set of OSaK views to offsets reflecting recent records in a Kafka topic. This is called either when the application is being run for the first time, or when it is being re-booted after substantial downtime.
- Additional functionality including applying seek functions to views by time which will retrieve rows from partitions in an OSaK view from a user supplied point of time.
- Provide utilities to add new partitions to a set of OSaK views if the Kafka topic scales out (i.e. adds more partitions).
- Convenience functions that will create private or global temporary tables from OSaK views.
Future functionality includes:
- Support for AVRO
- Diagnostic functions
- Predictive analysis (when to query Kafka for new information)
Current Release Kits
The Oracle SQL Access to Kafka kit is now available from SQL Developer:
SQL Developer 19.4: http://www.oracle.com/tools/downloads/sqldev-downloads.html
- Download SQL Developer 19.4 from the link above and extract the contents to a folder
- Identify the orakafka directory in the extracted contents
- Copy orakafka/orakafka.zip to a directory on your database system
- Unzip the file on the database system
- Install the project by following instructions in doc/README_INSTALL
- In the SQL Developer 'help' menu search for 'Kafka' to identify documentation for using Oracle SQL Access to Kafka. You can use Oracle SQL Access to Kafka from SQL Developer, or any SQL tool.
Using Oracle SQL Access to Kafka has five big advantages:
• Oracle has world class optimization and execution strategies to minimize code path needed to join tables and Kafka streams efficiently
• Oracle SQL analytics now can be applied directly to data read from Kafka
• The operation can be performed within the context of an Oracle ACID transaction controlled by the application
• The transaction encapsulates partition offset management and committing Kafka offsets to database metadata tables serving Kafka. This eliminates losing or re-reading Kafka records.
• The Kafka data can be processed by SQL and/or application logic without ever “landing” it to disk.
In short, it allows data in motion to be processed with data at rest using the standard Oracle SQL within a transaction context serving standard database application logic (e.g. JDBC applications).