What is Kafka?
The full scope of the information about Kafka you may find here, but in the nutshell, it's distributed fault tolerant message system. It allows you to connect many systems in an organized fashion. Instead, connect each system peer to peer:
you may land all your messages company wide on one system and consume it from there, like this:
Kafka is kind of Data Hub system, where you land the messages and serve it after.
More technical details.
I'd like to introduce a few key Kafka's terms.
1) Kafka Broker. This is Kafka service, which you run on each server and which operates all read and write request
2) Kafka Producer. The process which writes data in Kafka
3) Kafka Consumer. The process, which reads data from Kafka.
4) Message. The name describes itself, I just want to add that messages have key and value. In comparison to NoSQL databases key Kafka's key is not indexed. It has application purposes (you may put some application logic in Key) and administrative purposes (each message with the same key goes to the same partition).
5) Topic. Set of the messages organized into topics. Database guys would compare it with a table.
6) Partition. It's a good practice to divide the topic into partitions for performance and maintenance purposes. Messages within the same key go to the same partition. If a key is absent, messages are distributing in round - robin fashion.
7) Offset. The offset is the position of each message in the topic. The offset is indexed and it allows you quickly access your particular message.
When do you delete data?
One of the basic Kafka concepts is that of retention - Kafka does not keep data forever, nor does it wait for all consumers to read a message before deleting a message. Instead, the Kafka administrator configures a retention period for each topic - either amount of time for which to store messages before deleting them or how much data to store older messages are purged. This two parameters control this: log.retention.ms and log.retention.bytes.
The amount of data to retain in the log for each topic-partition. This is the limit per partition: multiply by the number of partitions to get the total data retained for the topic.
How to query Kafka data with Big Data SQL?
for query the Kafka data you need to create hive table first. let me show an ent-to-end example. I do have a JSON file:
and I'm going to load it into Kafka with standard Kafka tool "kafka-console-producer":
for a check that messages have appeared in the topic you may use the following command:
after I've loaded this file into Kafka topic, I create a table in Hive.
Make sure that you have oracle-kafka.jar and kafka-clients*.jar in your hive.aux.jars.path:
after this you may run follow DDL in the hive:
and as soon as hive table been created I create ORACLE_HIVE table in Oracle:
here you also have to keep in mind that you need to add oracle -kafka.jar and kafka -clients*.jar in your bigdata.properties file on the database and on the Hadoop side. I have dedicated the blog about how to do this here.
Now we are ready to query:
Oracle 12c provides powerful capabilities for working with JSON, such as dot API. It allows us to easily query the JSON data as a structure:
Working with AVRO messages.
In many cases, customers are using AVRO as flexible self-described format and for exchanging messages through the Kafka. For sure we do support it and doing this in very easy and flexible way.
I do have a topic, which contains AVRO messages and I define Hive table over it:
Here I define 'oracle.kafka.table.value.type'='avro' and also I have to specify 'oracle.kafka.table.value.schema'. After this we have structure.
In a similar way I define a table in Oracle RDBMS:
And we good to query the data!
1) Number of Partitions.
This is the most important thing to keep in mind there is a nice article about how to choose a right number of partitions. For Big Data SQL purposes I'd recommend using a number of partitions a bit more than you have CPU cores on your Big Data SQL cluster.
2) Query fewer columns
Use column pruning feature. In other words list only necessary columns in your SELECT and WHERE statements. Here is the example.
I've created void PL/SQL function, which does nothing. But PL/SQL couldn't be offloaded to the cell side and we will move all the data towards the database side:
after this I ran the query, which requires one column and checked how much data have been returned to the DB side:
after this I repeat the same test case with 10 columns:
"cell interconnect bytes returned by XT smart scan" 32193.98 MB
so, hopefully, this test case clearly shows that you have to use only useful columns
There is no Indexes rather than Offset columns. The fact that you have key column doesn't have to mislead you - it's not indexed. The only offset allows you have a quick random access
4) Warm up your data
If you want to read data faster many times, you have to warm it up, by running "select *" type of the queries.
Kafka relies on Linux filesystem cache, so for reading the same dataset faster many times, you have to read it the first time.
Here is the example
- I clean up the Linux filesystem cache
- I tun the first query:
it took 278 seconds.
- Second and third time took 92 seconds only.
5) Use bigger Replication Factor
Use bigger replication factor. Here is the example. I do have two tables one is created over the Kafka topic with Replication Factor = 1, second is created over Kafka topic with ith Replication Factor = 3.
this query took 278 seconds for the first run and 92 seconds for the next runs
This query took 279 seconds for the first run, but 34 seconds for the next runs.
6) Compression considerations
Kafka supports different type of compressions. If you store the data in JSON or XML format compression rate could be significant. Here is the examples of the numbers, that could be:
|Data format and compression type||Size of the data, GB|
|JSON on HDFS, uncompressed||273.1|
|JSON in Kafka, uncompressed||286.191|
|JSON in Kafka, Snappy||180.706|
|JSON in Kafka, GZIP||52.2649|
|AVRO in Kafka, uncompressed||252.975|
|AVRO in Kafka, Snappy||158.117|
|AVRO in Kafka, GZIP||54.49|
This feature may save some space on the disks, but taking into account, that Kafka primarily used for the temporal store (like one week or one month), I'm not sure that it makes any sense. Also, you will pay some performance penalty, querying this data (and burn more CPU).
I've run a query like:
and had followed results:
|Type of compression||Elapsed time, sec|
so, uncompressed is the leader. Gzip and Snappy slower (not significantly, but slow). taking into account this as well as fact, that Kafka is a temporal store, I wouldn't recommend using compression without any exeptional need.
7) Use parallelize your processing.
If for some reasons you are using a small number of partitions, you could use Hive metadata parameter "oracle.kafka.partition.chunk.size" for increase parallelism. This parameter defines a size of the input Split. So, if you set up this parameter equal 1MB and your topic has 4MB total, you will proceed it with 4 parallel threads.
Here is the test case:
- Drop Kafka topic
- Create again with only one partition
- Check it
- Check the size of input file:
- Load data to the Kafka topic
- Create Hive External table
- Create Oracle external table
- Run test query
it took 142 seconds
- Re-create Hive external table with 'oracle.kafka.partition.chunk.size' parameter equal 1MB
- Run query again:
Now it took only 7 seconds
One MB split is quite low, and for big topics we recommend to use 256MB.
8) Querying small topics.
Sometimes it happens that you need to query really small topics (few hundreds of messages, for example), but very frequently. At this case, it makes sense to create a topic with fewer paritions.
Here is the test case example:
- Create topic with 1000 partitions
- Load only one message there
- Create hive external table
- Create Oracle external table
- Query all rows from it
it took 6 seconds
- Create topic with only one partition and put only one message there and run same SQL query over it
now it takes only 0.5 second
9) Type of data in Kafka messages.
You have few options for storing data in Kafka messages and for sure you want to do pushdown processing. Big Data SQL supports pushdown operations only for JSONs. This means that everything that you could expose thought the JSON will be pushed down to the cell side and will be prosessed there.
- The query which could be pushed down to the cell side (JSON):
- The query which could not be pushed down to the cell side (XML):
If amounts of data is not significant, you could use Big Data SQL for processing. If we are talking about big data volumes, you could process it once and convert into different file formats on HDFS, with Hive query:
10) JSON vs AVRO format in the Kafka topics
In continuing to the previous point, you may be wondering which semi-structured format use? The answer is easy - use what your data source produce there is no significant performance difference between Avro and JSON. For example, a query like:
Will be done in 112 seconds in case of JSON and in 105 seconds in case of Avro.
and JSON topic will take 286.33 GB and Avro will take 202.568 GB. There is some difference, but not worth for converting the original format.
How to bring data from OLTP databases in Kafka? Use Golden Gate!
Oracle Golden Gate is the well-known product for capturing commit logs on the database side and bring the changes into a target system. The good news that Kafka may play a role in the target system. I'd like to skip the detailed explanation of this feature, because it's already explained in very deep details here.
Known Issue. Running Kafka broker on wildcard
By default, Kafka doesn't use wildcard address (0.0.0.0) for brokers and pick some IP address. it maybe a problem in case of multi-network Kafka cluster. One network could be used for interconnect, second for external connection. Luckily, there is easy way to solve this issue and start Kafka Broker on Wildcard address.
1) go to: Kafka > Instances (Select Instance) > Configuration > Kafka Broker > Advanced > Kafka Broker Advanced Configuration Snippet (Safety Valve) for kafka.properties
2) and add: