X

Big Data、Data Integration、Data Lakeに関するテクノロジー、製品・サービス情報、セミナー情報などをお届けします

Apache KafkaのメッセージをOracle Cloud オブジェクト・ストレージへ永続化する

はじめに

今日のデータ分析基盤では大規模なデータを安価に保持できるデータストアが必要です。ビッグデータ時代黎明期においてはHDFSがその一旦を担い、データストアの定石アーキテクチャとして定着しました。

しかし、Amazon S3に代表される、いわゆるクラウド・ストレージの台頭により、HDFSは唯一の選択肢ではなくなりつつあることもまた事実です。

かつて、データマネージメント市場において、HDFSを中心にあらゆるエコシステムが急速に形成されたように、今やクラウド・ストレージとのインターフェースを持たないデータ処理エンジンは皆無と言える状況です。

また、一昔前まで、この技術分野ではリレーショナル・データベースを中心にシステムを構成するアーキテクチャが一般的でした。オンライントランザクション処理のオペレーショナルデータベースからオンライン分析処理のデータウェアハウスへ、バッチによるETL処理を経由しデータを渡しつつ、他システムとはメッセージキューやEAIで連携、という具合です。

しかし、残念ながらこの旧式の仕組みでは、あらゆるタイプの大容量データをリアルタイムに処理できるスケーラブルでフォルトトレラントなシステムを安価に実現することはできませんでした。

これらの課題を背景に、Apache Kafka(※)は開発され、そのメリットに一早く着目した先進的なエンジニアにより、特にクラウド上の現代的なアーキテクチャとして定着しつつあります。

とりわけ「Apache Kafkaでストリームデータを受け、Amazon S3に永続化」というデータの貯め方が定石手法として見られわけですが、これは当然、Oracle Cloud Infrastructureのオブジェクトストレージでも可能です。本記事では、その具体的な設定例を取り上げてみたいと思います。

※「Apache Kafkaは、Apache Software Foundationの登録商標または商標です。」

 

Step 1 : バケットを作成し、S3互換APIキーで接続できるようにします

Kafkaで受けたメッセージをオブジェクトストレージに保存するためのバケットを作成します。

Cloud Consoleの左上メニューから「オブジェクト・ストレージ」>「オブジェクト・ストレージ」と辿り、「スコープ」から任意のコンパートメントを選択し、「バケットの作成」ボタンをクリックします。バケットの名前を入力し、「バケットの作成」ボタンをクリックすれば完了です。詳細手順は「バケットの管理」の「バケットを作成するには」を参照。

バケットを配置するコンパートメントには、S3互換APIキーを使用して接続するための設定を実施しておく必要があります。

Cloud Console右上の「プロファイル」から「テナンシ」を選択、「オブジェクト・ストレージ設定の編集」ボタンをクリック、「AMAZON S3互換APIで指定されたコンパートメント」のプルダウンメニューからバケットを作成したコンパートメントを選択し、保存すれば完了です。詳細手順は「Amazon S3互換性およびSwift API用のコンパートメントの指定」の「指定されたコンパートメントの表示と指定」をご参照ください。

次に、オブジェクトストレージに接続するためのS3互換APIキーを作成します。S3同様、ここで作成する秘密キーとアクセスキーを使った認証によってKafkaがオブジェクトストレージにアクセスできるようになります。

OCIコンソールにログイン後、「左上メニュー>」「アイデンティティ」>「ユーザー」と辿り、任意のユーザーをクリックし「ユーザーの詳細」画面へ移動、「リソース」の「顧客秘密キー」をクリック、「秘密キーの生成」ボタンをクリックし、この設定に名前をつけます。詳細手順は「顧客秘密キーを作成するには」をご参照。

「秘密キーの生成」ボタンを押すと秘密キーが表示されます。秘密キーは一度しか表示されませんので、作成した段階で、確実にコピーして一旦保存することをお勧めします。

これで秘密キーとアクセスキーのペアが作成されました。

ここで作成したアクセスキーと秘密キーをコピーし、環境変数としてexportします。もちろん.bash_profileなどを使われてもOKです。

$ export AWS_ACCESS_KEY_ID=xxxxxxxxxxxxxxxxxxxxxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

 

 

Step 2 : Confluentのインストールと起動

今回はApache Kafkaの商用パッケージを開発されているConfluent社のバイナリを使ってみたいと思います。

Confluent社のサイトからバイナリをダウンロードし解凍するとディレクトリが作成され、同ディレクトリ配下のbinディレクトリにコマンド群があります。

$ ls -F
bin/  etc/  ksql_logs/  lib/  README  share/  src/
$ cd bin

今回はテストなのでfirewalldを止めておきます。

$ sudo systemctl stop firewalld

Zookeeper/Kafka/Kafka Connectを起動します。

$ bin/confluent start connect

This CLI is intended for development only, not for production

https://docs.confluent.io/current/cli/index.html

Using CONFLUENT_CURRENT: /tmp/confluent.Wq84QztF

Starting zookeeper

zookeeper is [UP]

Starting kafka kafka is [UP]
Starting schema-registry schema-registry is [UP]
Starting connect connect is [UP]

これで必要なサービスが起動しました。

 

 

Step 3 : トピックを作成し、テストデータを取り込みます

まずはトピックを作成します。

$ bin/kafka-topics --zookeeper localhost:2181 --create --topic test-topic --replication-factor 1 --partitions 5

Created topic test-topic.

作成したトピックを確認します。

$ bin/kafka-topics --zookeeper localhost:2181 --list

作成したトピックtest-topicにサンプルのメッセージを取り込みます。

$ curl -s http://stream.meetup.com/2/rsvps | bin/confluent produce test-topic

This CLI is intended for development only, not for production

https://docs.confluent.io/current/cli/index.html

Kafkaで受けているメッセージの内容を確認してみます。

$ bin/confluent consume test-topic -- --from-beginning

This CLI is intended for development only, not for production

https://docs.confluent.io/current/cli/index.html

{"venue":{"venue_name":"Sainsburys","lon":-0.108527,"lat":51.517307,"venue_id":24421151},"visibility":"public","response":"yes","guests":0,"member":{"member_id":14619028,"photo":"……………(略)

 

 

Step 4 : オブジェクトストレージにメッセージを永続化します

まず、設定ファイルを作成し任意の場所に保存します。この例ではinjest-to-oci.jsonというファイル名で作成しましています。この設定ファイルの内容によって、オブジェクトストレージにストリームデータが永続化される際の挙動が決まります。

{                                                                                                                                                                                                                                                                   
  "name": "injest-to-oci",                                                                                                                                                                                                                                          
  "config": {                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The S3 sink connector class",                                                                                                                                                                                                                      
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",                                                                                                                                            
    "tasks.max":"1",                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                    
    "_comment": "Which topics to export to S3",                                                                                                                                                                                                                     
    "topics":"test-topic",                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                    
    "_comment": "The S3 bucket that will be used by this connector instance",                                                                                                                                                                                       
    "s3.bucket.name":"kafka-bucket",                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                    
    "_comment": "The AWS region where the S3 bucket is located",                                                                                                                                                                                                    
    "s3.region":"us-ashburn-1",

    "_comment": "The REST endpoint of the OCI Object Storage Service",
    "store.url": "https://ksonoda.compat.objectstorage.us-ashburn-1.oraclecloud.com",                                                                                                                                                                                                                                        
                                                                                                                                                                                                                                                                    
    "_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",                                             
    "s3.part.size":"5242880",                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",                                                                                                                                                                              
    "flush.size":"100000",                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                    
    "_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",                                                                                                                                                                        
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                  
    "key.converter.schemas.enable":"false",                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                    
    "_comment": "Kafka Connect converter used to deserialize values",                                                                                                                                                                                               
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",                                                                                                                                                                                                
    "value.converter.schemas.enable":"false",                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                    
    "_comment": "The type of storage for this storage cloud connector",                                                                                                                                                                                             
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The storage format of the objects uploaded to S3",                                                                                                                                                                                                 
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",                                                                                                                                                                                                
                                                                                                                                                                                                                                                                    
    "_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",                                                                                          
    "schema.compatibility":"NONE",                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                                    
    "_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",                                                                                                                                                  
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",                                                                                                                                                                            
                                                                                                                                                                                                                                                                    
    "_comment": "The locale used by the time-based partitioner to encode the date string",                                                                                                                                                                          
    "locale":"en",
    
    "_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",                                                                                                                                                            
    "timezone":"UTC",                                                                                                                                                                                                                                               
                                                                                                                                                                                                                                                                    
    "_comment": "The date-based part of the S3 object key",                                                                                                                                                                                                         
    "path.format":"'date'=YYYY-MM-dd/'hour'=HH",                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                    
    "_comment": "The duration that aligns with the path format defined above",                                                                                                                                                                                      
    "partition.duration.ms":"3600000",                                                                                                                                                                                                                              
                                                                                                                                                                                                                                                                    
    "_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",                                                                                          
    "rotate.interval.ms":"60000",                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                    
    "_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",                                                                                                                                                  
    "timestamp.extractor":"Record"                                                                                                                                                                                                                                  
  }                                                                                                                                                                                                                                                                 
}

各項目が何を意味するのかはコメント欄を見ていただくとだいたいわかると思いますが、下記4つの重要項目だけ補足で説明します。

作成済みのバケット名を記載します。ここでは先のステップで作成したkafka-bucketという名前を入力しています。

"_comment": "The S3 bucket that will be used by this connector instance",                                                                                                                                                                                       
"s3.bucket.name":"kafka-bucket",

バケットを作成したリージョン名を記載します。当たり前ですがS3とは表記が異なります。リージョン表記一覧はこちら

"_comment": "The AWS region where the S3 bucket is located",                                                                                                                                                                                                    
"s3.region":"us-ashburn-1",

OCIオブジェクトストレージのエンドポイントを記載します。[]の部分をご自分の環境に合わせて編集する必要があります。

"_comment": "The REST endpoint of the OCI Object Storage Service",
"store.url": "https://[tenant_name].compat.objectstorage.[region].oraclecloud.com",

例えば、契約したテナント名が「ksonoda」、利用するリージョンがAshburnであれば下記のようになります。

このパラメータはS3互換のオブジェクトストレージサービスを利用する場合に必要な設定です。

"store.url": "https://ksonoda.compat.objectstorage.us-ashburn-1.oraclecloud.com",

以上が設定ファイルの説明です。

この設定ファイルを使って、下記コマンドでKafkaに取り込まれたメッセージを、オブジェクトストレージに永続化します。

$ bin/confluent load injest-to-oci -d ./injest-to-oci.json
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html

これで、メッセージがオブジェクト・ストレージに永続化されました。オブジェクトストレージのバケットの中をのぞいてみるとメッセージがオブジェクトとして永続化されていることがわかりす。

 

以上、本記事では、データ分析基盤システムにおいてエッジで生成されるデータをKafkaで受け取り、オブジェクトストレージに永続化する部分の設定例を取り上げてみました。

この設定例は単一のインスタンス上に構築した非常に簡単なものです。もっと本格的な構成で検証してみたいけどOracle Cloudは使ったことがないなぁという方は Oracle が githubで提供している下記terraformスクリプトを使ってみてください。非常に簡単に構築できますよ。

Confluent構築のterraformスクリプト

Confluent構築のterraformスクリプト(kubernetes版)

 

※本記事の続編「Apache Sparkからオブジェクトストレージのデータを使う」も併せてご参照ください。
続編では、本記事でオブジェクトストレージに永続化したjsonファイルをSparkから読み込み、簡単なデータ操作実行後、オブジェクトストレージへ書き戻す処理について概説しています。

 

参考資料

From Apache Kafka to Amazon S3: Exactly Once

Kafka Connect Amazon S3 Sink Connector

Using Non-AWS Storage Providers

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.