Apache Kafka was developed at LinkedIn labs almost a decade ago. Since its graduation from Apache Incubator, Apache Kafka has been the open-source industry standard for event stream processing and has become the cornerstone of most modern and distributed architectures. It is now difficult to imagine Cloud, Big Data, and Data Management solutions without event streaming at their core.
In this article, I will focus on streaming change data events into Kafka using Oracle Cloud Infrastructure (OCI) GoldenGate.
OCI GoldenGate is a fully managed, native cloud service that moves data in real-time, at scale. OCI GoldenGate processes data as it moves from one or more data management systems to target systems. You can also design, run, orchestrate, and monitor data replication tasks without having to allocate or manage any compute environments.
OCI GoldenGate makes real-time data ingestion into Kafka faster, better, and cheaper.
Creating an OCI GoldenGate replication is a two steps process: first create a Connection then configure a Replicat. After you create the connection, you can create the Replicat in the OCI GoldenGate Console.
Kafka provides the ability to publish (write) and subscribe to (read) streams of events. OCI GoldenGate writes and reads streams of events from various Kafka platforms:
To create a connection for any of the previously listed Kafka flavours, go to the Oracle Infrastructure Cloud Console > Oracle Database > GoldenGate > Connections and click Create Connection.
Provide a Name for your connection, select the Compartment where you want your connection to be located and select the Type. Depending on the target Kafka platform, please select Connection Type.
And Click Next.
Provide Host and Port of your Kafka cluster. If you’re connecting to a private endpoint; select Network connectivity via private endpoint, select the subnet and provide Private IP address together with host and port. If you’re connecting to a private endpoint on a 3rd party cloud, please make sure that there is network is configured between OCI and 3rd party cloud.
Next, select the Security Protocol depending on the security protocol of your Kafka/ node cluster. Depending on the protocol selected, you may need to provide additional details like usernames, passwords or certificates.
OCI GoldenGate supports Confluent Kafka with and without schema registry. Confluent Schema Registry is treated as a saparate connection type in OCI GoldenGate.
To create a Confluent Schema Registry connection, go to the Oracle Infrastructure Cloud Console > Oracle Database > GoldenGate > Connections and click Create Connection.
Provide a Name for your connection, select the Compartment where you want your connection to be located and select the Type as Confluent Schema Registry.
Provide the schema registry URL. Select Authentication Type and provide required fields.
After the connection is created, you need to assign it to a deployment. When created, connections and deployments are detached from each other. Connection assignment attaches a connection to a deployment to be used in the replication processes.
To assign a connection to a deployment, go to Connection details, and click Assigned deployments then Assign deployment. Select the name of the deployment to be assigned from the list and click Assign deployment.
To configure a replication to Kafka, click Add Replicat in the Administration Service.
Select the Replicat Type. There are two different Replicat types available: Classic Replicat and Coordinated Replicat. Classic Replicat is a single threaded process whereas Coordinated Replicat is a multithreaded one that applies transactions in parallel. Coordinated replicat may create a special condition for Kafka. If more than 1 replicat threat pushes to the same topic/ partition, a race condition is created where messages from different threats may interlace with each other. This race condition may create inconsistency in the order of source operations in the Kafka topic/ partitions. If coordinated replicat is used, this condition should be considered when designing topic mappings.
After selecting the Replicat Type, provide the Replicat Options. Please provide a Name for the replicat and the Trail Name.
Depending on the target Kafka platform, please select Target:
If you want to use Kafka Connect for replication, select Kafka Connect. Once selected, it will list Converters as JSON and Avro. If Avro selected, it will list the available Schema Regisrty connections. To see your schema registry connection listed, it needs to be assigned to your deployment.
Once selected, available aliases will be listed, and you’ll need to choose an alias. Once an alias is selected, Replicat will be using the credential details from the connection alias that’s selected.
After providing the Replicat Options, provide the Parameter File. You can either specify source to target mapping or leave as is for wild card selection. If Coordinated Replicat is selected as the replicat type, an additional parameter needs to be provided: TARGETDB LIBFILE libggjava.so SET property=/u02/Deployment/etc/conf/ogg/<your_replicat_name>.properties
Last step is the the Properties File configuration. You’ll notice that some of the properties are already pre-configured. This is the base level configuration for starting a replication into Kafka and default values are provided for many properties. You just need to provide the target topic name in topicMappingTemplate property. While providing the topic name, you can either use a static value or use one of the Dynamic Keywords which would create the topic names based on the dynamic keyword selected or combine a static value and dynamic keywords.
Finally, click on Create or Create and Run.
If Kafka Connect is not selected in Replicat Options, you'll see the properties listed as below. You just need to provide the target topic name in topicMappingTemplate property. While providing the topic name, you can either use a static value or use one of the Dynamic Keywords which would create the topic names based on the dynamic keyword selected or combine a static value and dynamic keywords.
There will be different partitions in each Kafka topic. Message keys are used to decide which partition to send the message to. Messages having the same key, always land in the same partition. For example: if the source system produces events associated with the same customer, then using the customer ID as the concatenated key will guarantee all the events from a given customer are in the right order when reading them. Oracle’s recommendation is to use the primary key columns as the message key while replicating to Kafka. First, as primary keys are unique, it is a rapidly changing value that thereby provides a good distribution across different partitions. Second, operations for the same row will have the same primary key values. The result is, the calculated message key will be the same for operations for the same row and therefore, those messages will be sent to the same partition and operation order will be protected.
If topicMappingTemplate is used as ${fullyQualifiedTableName} and keyMappingTemplate is used as ${primaryKeys}, operations coming from the same table will land in the same topic helping keep the source operation order consistency.
You can apply additional properties depending on the format that is selected using the gg.handler.kafkahandler.format property.
Pluggable Formatters are available for json, delimitedtext, avro_row, avro_op and XML.
OCI GoldenGate is an easy to use, flexible, cost effective platform for replicating changed data into Kafka. It enriches the messages with metadata details coming from the source database and provides configurations to guarantee the source operation order consistency in Kafka topics and partitions.
Deniz Sendil is a Senior Principle Product Manager in Oracle GoldenGate product group with a focus on GoldenGate for Big Data. Deniz has over 17 years of experience in data management, analytics and data integration domains.