※ 本記事は、Madhu Kumar Sによる”GoldenGate REST API’s Call to replicate the data to Oracle Cloud Infrastructure (OCI) Streaming and Apache Kafka“を翻訳したものです。
2023年8月16日
Oracle GoldenGate for Big DataではKafkaハンドラが使用されるため、GoldenGateでサポートされている様々なデータベースからデータベースの変更を取得し、Apache Kafkaトピックにストリーミングできます。GoldenGate Kafkaハンドラは、データの処理、変換およびKafkaトピックへの配信方法を定義します。Kafkaハンドラの構成プロパティは、ここにあります。
次の図は、GoldenGateパラメータ・ファイルに、表マッピングおよびKafkaハンドラへの参照に関するすべてのパラメータが含まれている方法を示しています。Kafkaハンドラには、すべてのキー・プロパティおよびKafka構成ファイルへの参照が含まれ、Kafkaと対話してメッセージを生成します。

Apache Kafkaにデータをレプリケートするには、次のステップに従ってください。
1. Kafkaプロデューサ構成ファイルを作成します。
a) Oracle Cloud Infrastructure (OCI) Streamingのプロデューサ構成ファイルの作成。
b) Apache Kafkaのプロデューサ構成ファイルの作成。
2. Kafkaハンドラ・プロパティ・ファイルを作成します。
3. Replicatパラメータ・ファイルを作成します。
4. GoldenGate Replicatを起動します。
5. GoldenGate Replicatを検証します。
「このブログでは、Replicat名を”REPKAFKA“、Kafkaハンドラ・ファイルを”REPKAFKA.properties“、プロデューサ・ファイルを”custom_kafka_producer.properties“として使用しています。ソース・データベースからデータを取得するための抽出と、証跡ファイルをビッグ・データ・デプロイメントにプッシュするための配布パスをすでに作成しているものとします」
ステップ1: Kafkaプロデューサ構成ファイルを作成
環境と一致するように、構成ファイル内のKafkaブローカ・ホストおよびポートを更新します。Kafkaプロデューサ構成ファイルの詳細は、このリンクを参照してください。データをOCI Streamingにプッシュする場合は、「ステップ1.a」コマンドを実行するか、Apache kafkaにプッシュする場合は、「ステップ1.b」コマンドを実行してください
ステップ1.a) Oracle Cloud Infrastructure (OCI) Streamingのプロデューサ構成ファイルの作成
OCIストリーミング用のKafkaプロデューサ構成ファイルのサンプル。
curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"bootstrap.servers=cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092",
"security.protocol=SASL_SSL",
"sasl.mechanism=PLAIN",
"value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="paasdevgg/oracleidentitycloudservice/user.name@oracle.com/ocid1.streampool.oc1.phx.amaaaaaa3p5c3vqa4hfyl7uv465pay4audmoajughhxlsgj7afc2an5u3xaq" password="YOUR-AUTH-TOKEN";"
]
}'| python -mjson.tool
注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.”は無視しても問題ありません。
ステップ1.aの出力:

ステップ1.bのGoldenGate Webコンソール出力:

ステップ1.b) Apache Kafkaのプロデューサ構成ファイルの作成
curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"bootstrap.servers=<kafka-server-ip>:9092",
"acks = 1",
"compression.type = gzip",
"reconnect.backoff.ms = 1000",
"value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
"key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
"# 100KB per partition",
"batch.size = 102400",
"linger.ms = 10000",
"max.request.size = 5024000 ",
"send.buffer.bytes = 50240000"
]
}'| python -mjson.tool
注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.”は無視しても問題ありません。
ステップ1.bの出力:

前述のコマンドは、GoldenGate Webコンソールでも検証できます。右上隅にあるハンバーガー・メニュー(
)の「構成を参照」をクリックし、パラメータ・ファイルをクリックします。
ステップ1.bのGoldenGate Webコンソール出力:

ステップ2: Kafkaハンドラ・プロパティ・ファイルを作成
次に、Kafkaハンドラのサンプル構成を示します。強調表示された構成を更新します。
curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/REPKAFKA.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
"# Properties file for Replicat REPKAFKA",
"#Kafka Handler Template",
"gg.handlerlist=kafkahandler",
"gg.handler.kafkahandler.type=kafka",
"#TODO: Set the name of the Kafka producer properties file.",
"gg.handler.kafkahandler.kafkaProducerConfigFile=custom_kafka_producer.properties"
"#TODO: Set the template for resolving the topic name.",
"gg.handler.kafkahandler.topicMappingTemplate= ${tableName}",
"gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}",
"gg.handler.kafkahandler.mode=op",
"gg.handler.kafkahandler.format=json",
"gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}",
"#TODO: Set the location of the Kafka client libraries.",
"gg.classpath={Kafka-Home}/libs/*",
"jvm.bootoptions=-Xmx512m -Xms32m"
]
}'| python -mjson.tool
注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file REPKAFKA.properties failed validity check.”は無視しても問題ありません。
ステップ2の出力:

ステップ2のGoldenGate Webコンソール出力:

3. Replicatパラメータ・ファイルを作成
curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"intent": "Unidirectional",
"status": "stopped",
"begin": {
"sequence": 0,
"offset": 0
},
"managedProcessSettings": "Default",
"mode": {
"type": "nonintegrated"
},
"source": {
"name": "<Trail Name>"
},
"config": [
"REPLICAT REPKAFKA",
"MAP *.*, TARGET *.*;"
]
}| python -mjson.tool
ステップ3の出力:

ステップ3のGoldenGate Webコンソール出力:

GoldenGate KafkaのReplicatは正常に作成されました。

ステップ4: GoldenGate Replicatを起動
curl --location --request POST http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/commands/execute\
--user "USERNAME:"PASSWORD \
-d '{
"name":"start",
"processName":"REPKAFKA"
}'| python -mjson.tool
ステップ4の出力:
ステップ4のGoldenGate Webコンソール出力:
ステップ5: statsを使用してGoldenGate Replicatを検証
curl --silent --location --request POST 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA/command' \
--user "USERNAME:"PASSWORD \
--data '{
"command":"STATS",
"arguments":"HOURLY"
}'|python -mjson.tool
ステップ5のGoldenGate Webコンソール出力:

付録 :
ご使用の環境に応じて、次のパラメータを置き換えてください。
- Admin-Server-IP/Hostname : GoldenGate管理サーバーのホスト名またはIP。
- Admin-Server-Port : GoldenGateポート番号を実行している管理サーバー。
- USERNAME : GoldenGate管理コンソールのユーザー名。
- PASSWORD : GoldenGate管理コンソールのパスワード。
- kafka-server-ip : ホスト名またはIPアドレスを実行しているKakfa。
- Kafka_home : KafkaクライアントJARホームの場所。
詳細はこちら:
- Kafkaハンドラの使用
- 依存性ダウンローダ
- Kafkaハンドラ・クライアント依存性
- REST API
- GoldenGate for Big Dataを使用したOracle Cloud Infrastructure (OCI) Streamingへのリアルタイム・レプリケーション

