Oracle GoldenGate for Big Data uses Kafka Handler, allows you to capture and stream database changes from various databases supported by GoldenGate into Apache Kafka topics.The GoldenGate Kafka Handler defines how data is processed, transformed and delivered to Kafka topics. You can find the configuration properties for Kafka Handler here.
The following diagram illustrates how GoldenGate parameter files contains all the parameters regarding table mappings and references to Kafka Handler.The Kafka handler will include all key properties and the reference to the Kafka configuration file to interact and produce messages to Kafka.

Please follow the below steps to replicate the data to Apache Kafka.
1. Create a Kafka producer configuration file.
a) Creating a producer configuration file for Oracle Cloud Infrastructure (OCI) Streaming.
b) Creating a producer configuration file for Apache Kafka.
2. Create a Kafka Handler properties file.
3. Create a Replicat parameter file.
4. Start the GoldenGate Replicat.
5. Validate the GoldenGate Replicat.
In this blog, we are using Replicat name as “REPKAFKA“,Kafka Handler files as “REPKAFKA.properties and Producer file as “custom_kafka_producer.properties”. We assume you already created a extract for capture the data from source database and a distribution path to push the trail files to Big Data deployment
Step 1: Create a Kafka producer configuration file.
Update the Kafka broker host and port in the configuration file to match your environment. Refer this link for more information about the Kafka Producer Configuration File. If you wish to push the data to OCI Streaming ,execute “step 1.a)” commad or to Apache kafka ,execute “Step 1.b) command“
Step 1.a) Creating a producer configuration file for Oracle Cloud Infrastructure (OCI) Streaming.
Sample Kafka producer configuration file for OCI Streaming.
curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"bootstrap.servers=cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092",
"security.protocol=SASL_SSL",
"sasl.mechanism=PLAIN",
"value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="paasdevgg/oracleidentitycloudservice/user.name@oracle.com/ocid1.streampool.oc1.phx.amaaaaaa3p5c3vqa4hfyl7uv465pay4audmoajughhxlsgj7afc2an5u3xaq" password="YOUR-AUTH-TOKEN";"
]
}'| python -mjson.tool
Note : you can safely ignore the error message “”title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.” in the cURL output.
Output of the Step 1.a:

GoldenGate web console output for Step 1.b:

Step 1.b) Creating a producer configuration file for Apache Kafka.
curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"bootstrap.servers=<kafka-server-ip>:9092",
"acks = 1",
"compression.type = gzip",
"reconnect.backoff.ms = 1000",
"value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
"key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
"# 100KB per partition",
"batch.size = 102400",
"linger.ms = 10000",
"max.request.size = 5024000 ",
"send.buffer.bytes = 50240000"
]
}'| python -mjson.tool
Note : you can safely ignore the error message “”title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.” in the cURL output.
Output of the Step 1.b:

You can validate the above command on GoldenGate web console also. Click the hamburger menu(
)in the upper right corner,Browse to Configuration and click the Parameter files.
GoldenGate web console output for Step 1.b:

Step 2: Create a Kafka Handler Properties file.
The following is a sample configuration for the Kafka Handler. Update the highlighted configuration.
curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/REPKAFKA.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"# Properties file for Replicat REPKAFKA",
"#Kafka Handler Template",
"gg.handlerlist=kafkahandler",
"gg.handler.kafkahandler.type=kafka",
"#TODO: Set the name of the Kafka producer properties file.",
"gg.handler.kafkahandler.kafkaProducerConfigFile=custom_kafka_producer.properties"
"#TODO: Set the template for resolving the topic name.",
"gg.handler.kafkahandler.topicMappingTemplate= ${tableName}",
"gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}",
"gg.handler.kafkahandler.mode=op",
"gg.handler.kafkahandler.format=json",
"gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}",
"#TODO: Set the location of the Kafka client libraries.",
"gg.classpath={Kafka-Home}/libs/*",
"jvm.bootoptions=-Xmx512m -Xms32m"
]
}'| python -mjson.tool
Note : you can safely ignore the error message “”title”: “Parameter file REPKAFKA.properties failed validity check.” in the cURL output.
Output of the Step 2:

GoldenGate web console output for Step 2:

3. Create a Replicat parameter file.
curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"intent": "Unidirectional",
"status": "stopped",
"begin": {
"sequence": 0,
"offset": 0
},
"managedProcessSettings": "Default",
"mode": {
"type": "nonintegrated"
},
"source": {
"name": "<Trail Name>"
},
"config": [
"REPLICAT REPKAFKA",
"MAP *.*, TARGET *.*;"
]
}| python -mjson.tool
Output of the Step 3:

GoldenGate web console output for Step 3:

GoldenGate Replicat for Kafka has been created successfully.

Step 4 : Start the GoldenGate Replicat.
curl --location --request POST http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/commands/execute\
--user "USERNAME:"PASSWORD \
-d '{
"name":"start",
"processName":"REPKAFKA"
}'| python -mjson.tool
Output of the Step 4:
GoldenGate web console output for Step 4:
Step 5 : Validate the GoldenGate Replicat through stats.
curl --silent --location --request POST 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA/command' \
--user "USERNAME:"PASSWORD \
--data '{
"command":"STATS",
"arguments":"HOURLY"
}'|python -mjson.tool
GoldenGate web console output for Step 5:

Appendix :
Kindly replace the below parameter according to your environment.
- Admin-Server-IP/Hostname : GoldenGate Admin Server hostname or IP.
- Admin-Server-Port : GoldenGate Admin Server running Port number.
- USERNAME : Username of the GoldenGate Admin console.
- PASSWORD : Password of the GoldenGate Admin console.
- kafka-server-ip : Kakfa running hostname or IP Address.
- Kafka_home : The Kafka client JARs home location.
Learn more here:
- Using the Kafka Handler
- Dependency Downloader
- Kafka Handler Client Dependencies
- REST API
- Realtime Replication into Oracle Cloud Infrastructure (OCI) Streaming with GoldenGate for Big Data

