※ 本記事は、Madhu Kumar Sによる”GoldenGate REST API’s Call to replicate the data to Oracle Cloud Infrastructure (OCI) Streaming and Apache Kafka“を翻訳したものです。

2023年8月16日


Oracle GoldenGate for Big DataではKafkaハンドラが使用されるため、GoldenGateでサポートされている様々なデータベースからデータベースの変更を取得し、Apache Kafkaトピックにストリーミングできます。GoldenGate Kafkaハンドラは、データの処理、変換およびKafkaトピックへの配信方法を定義します。Kafkaハンドラの構成プロパティは、ここにあります。

次の図は、GoldenGateパラメータ・ファイルに、表マッピングおよびKafkaハンドラへの参照に関するすべてのパラメータが含まれている方法を示しています。Kafkaハンドラには、すべてのキー・プロパティおよびKafka構成ファイルへの参照が含まれ、Kafkaと対話してメッセージを生成します。

ARCH-RESTAPI-GG4BD

Apache Kafkaにデータをレプリケートするには、次のステップに従ってください。

1. Kafkaプロデューサ構成ファイルを作成します。

a) Oracle Cloud Infrastructure (OCI) Streamingのプロデューサ構成ファイルの作成。

b) Apache Kafkaのプロデューサ構成ファイルの作成。

2. Kafkaハンドラ・プロパティ・ファイルを作成します。

3. Replicatパラメータ・ファイルを作成します。

4. GoldenGate Replicatを起動します。

5. GoldenGate Replicatを検証します。

「このブログでは、Replicat名を”REPKAFKA“、Kafkaハンドラ・ファイルを”REPKAFKA.properties“、プロデューサ・ファイルを”custom_kafka_producer.properties“として使用しています。ソース・データベースからデータを取得するための抽出と、証跡ファイルをビッグ・データ・デプロイメントにプッシュするための配布パスをすでに作成しているものとします」

ステップ1: Kafkaプロデューサ構成ファイルを作成

環境と一致するように、構成ファイル内のKafkaブローカ・ホストおよびポートを更新します。Kafkaプロデューサ構成ファイルの詳細は、このリンクを参照してください。データをOCI Streamingにプッシュする場合は、「ステップ1.a」コマンドを実行するか、Apache kafkaにプッシュする場合は、「ステップ1.b」コマンドを実行してください

ステップ1.a) Oracle Cloud Infrastructure (OCI) Streamingのプロデューサ構成ファイルの作成

OCIストリーミング用のKafkaプロデューサ構成ファイルのサンプル。

curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
       "lines": [
               "bootstrap.servers=cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092",
               "security.protocol=SASL_SSL",
               "sasl.mechanism=PLAIN",
               "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
               "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
               "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="paasdevgg/oracleidentitycloudservice/user.name@oracle.com/ocid1.streampool.oc1.phx.amaaaaaa3p5c3vqa4hfyl7uv465pay4audmoajughhxlsgj7afc2an5u3xaq" password="YOUR-AUTH-TOKEN";"
              ]
    }'| python -mjson.tool

注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.”は無視しても問題ありません。

ステップ1.aの出力:

oci stream kafka producer file

ステップ1.bのGoldenGate Webコンソール出力:

oci stream kafka producer file gg file

ステップ1.b) Apache Kafkaのプロデューサ構成ファイルの作成

curl --location --request POST 'http://<Admin-Server IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/custom_kafka_producer.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
      "bootstrap.servers=<kafka-server-ip>:9092",
      "acks = 1",
      "compression.type = gzip",
      "reconnect.backoff.ms = 1000",
      "value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
      "key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer",
      "# 100KB per partition",
      "batch.size = 102400",
      "linger.ms = 10000",
      "max.request.size = 5024000 ",
      "send.buffer.bytes = 50240000"
       ]
}'| python -mjson.tool

注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file CUSTOM_KAFKA_PRODUCER.properties failed validity check.”は無視しても問題ありません。

ステップ1.bの出力:

custom-curl-properties

前述のコマンドは、GoldenGate Webコンソールでも検証できます。右上隅にあるハンバーガー・メニュー( hamburger menu )の「構成を参照」をクリックし、パラメータ・ファイルをクリックします。

ステップ1.bのGoldenGate Webコンソール出力:

gg-producer-properties-file

ステップ2: Kafkaハンドラ・プロパティ・ファイルを作成

次に、Kafkaハンドラのサンプル構成を示します。強調表示された構成を更新します。

curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/config/files/REPKAFKA.properties' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
"lines": [
          "# Properties file for Replicat REPKAFKA",
          "#Kafka Handler Template",
          "gg.handlerlist=kafkahandler",
          "gg.handler.kafkahandler.type=kafka",
          "#TODO: Set the name of the Kafka producer properties file.",
          "gg.handler.kafkahandler.kafkaProducerConfigFile=custom_kafka_producer.properties"
          "#TODO: Set the template for resolving the topic name.",
          "gg.handler.kafkahandler.topicMappingTemplate= ${tableName}",
          "gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}",
          "gg.handler.kafkahandler.mode=op",
          "gg.handler.kafkahandler.format=json",
 "gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]}",
          "#TODO: Set the location of the Kafka client libraries.",
          "gg.classpath={Kafka-Home}/libs/*",
          "jvm.bootoptions=-Xmx512m -Xms32m"
   ]
 }'| python -mjson.tool

注意 : cURLの出力で、エラー・メッセージ “title”: “Parameter file REPKAFKA.properties failed validity check.”は無視しても問題ありません。

ステップ2の出力:

properties-files

ステップ2のGoldenGate Webコンソール出力:

gg-properties-file

3. Replicatパラメータ・ファイルを作成

curl --location 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA' \
--user 'username':PASSWORD \
--header 'Content-Type: text/plain' \
--data '{
        "intent": "Unidirectional",
        "status": "stopped",
        "begin": {
        "sequence": 0,
        "offset": 0
        },
        "managedProcessSettings": "Default",
        "mode": {
              "type": "nonintegrated"
         },
        "source": { 
               "name": "<Trail Name>"
         },
         "config": [
                "REPLICAT REPKAFKA",
                "MAP *.*, TARGET *.*;"
            ]
}| python -mjson.tool

ステップ3の出力:

replicat-curl

ステップ3のGoldenGate Webコンソール出力:

replicat-param

GoldenGate KafkaのReplicatは正常に作成されました。

replicat create

ステップ4: GoldenGate Replicatを起動

curl --location --request POST http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/commands/execute\
--user "USERNAME:"PASSWORD \
-d '{
     "name":"start",
     "processName":"REPKAFKA"
    }'| python -mjson.tool

ステップ4の出力:

kafka-replicat-start-curlステップ4のGoldenGate Webコンソール出力:

kafka-replicat-start

 

ステップ5: statsを使用してGoldenGate Replicatを検証

curl --silent --location --request POST 'http://<Admin-Server-IP/Hostname>:<Admin-Server-Port>/services/v2/replicats/REPKAFKA/command' \
--user "USERNAME:"PASSWORD \
--data '{
        "command":"STATS",
        "arguments":"HOURLY"
}'|python -mjson.tool

 

ステップ5のGoldenGate Webコンソール出力:

stats

 

付録  :

ご使用の環境に応じて、次のパラメータを置き換えてください。

  • Admin-Server-IP/Hostname : GoldenGate管理サーバーのホスト名またはIP。
  • Admin-Server-Port : GoldenGateポート番号を実行している管理サーバー。
  • USERNAME : GoldenGate管理コンソールのユーザー名。
  • PASSWORD : GoldenGate管理コンソールのパスワード。
  • kafka-server-ip : ホスト名またはIPアドレスを実行しているKakfa。
  • Kafka_home : KafkaクライアントJARホームの場所。

 

詳細はこちら: