X

Big Data、Data Integration、Data Lakeに関するテクノロジー、製品・サービス情報、セミナー情報などをお届けします

Apache Sparkからオブジェクトストレージのデータを使う

はじめに

以前の記事「Apache KafkaのメッセージをOracle Cloud オブジェクト・ストレージへ永続化する」ではエッジで生成されたデータをApache Kafkaで受け、オブジェクトストレージに永続化するという題材を取り上げました。アプリログ、ソーシャルログ、ウェブログ、IoTなど様々なシステムで使われている、今やお決まりのデザインです。

オブジェクト・ストレージに貯められたデータは当然、何らかのデータ処理エンジンにより使われるわけですが、本記事では、定番の Apache Sparkを取り上げます。Apache Sparkは、その生産性の高さと、MapReduceのデメリットを解消できるアーキテクチャにより、リリース後、短期間でスターの仲間入りを果たしたデータ処理エンジンです。人工知能、ディープラーニング、機械学習といった技術トレンドの波もあり、瞬く間にデータサイエンティストに広く受け入れられるようになりました。

Apache Sparkについては巷に有識者の優良な情報が溢れていますので、そちらをご参考にしていただき、本記事としては、エッジで生成されたデータをApache Kafkaで受けてオブジェクトストレージに永続化(前記事)し、そのデータをApache Sparkで使う(本記事)という流れで見ていただけるとよいかと思います。

SparkからAmazon S3を使う構成はよく見るけど、Oracle Cloudのオブジェクトストレージも使えるの?と思われる方ががいらっしゃると思います。このためにオラクルはMapReduceとSparkからオブジェクトストレージを使うためのHDFSコネクタをリリースしています。

以下、これらの環境構築と、前回の記事でオブジェクトストレージに永続化したデータを実際にSparkで使うところまでをご紹介したいと思います。

 

Step 1 : Apache SPARKをインストールする

Sparkの実行に必要なソフトウェア群をインストールしてゆきます。

Java実行環境のインストール
$sudo yum install java-1.8.0-openjdk.x86_64
$export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
$java -version
 
Scalaのインストール
$wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.rpm
$sudo yum install scala-2.12.4.rpm
$scala -version
 
Sparkのインストール
$wget https://archive.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
$tar xvf spark-2.2.1-bin-hadoop2.7.tgz
$export SPARK_HOME=$HOME/spark-2.2.1-bin-hadoop2.7
$export PATH=$PATH:$SPARK_HOME/bin
 
Sparkの起動(今回はテストなのでMasterだけ)
$cd $SPARK_HOME
$./sbin/start-master.sh

 

Step 2 : HDFS Connectorをインストールする

と言っても、バイナリをダウンロードして解凍するだけです。 解凍後、下記★のついているjarファイルを利用します。

$wget https://docs.cloud.oracle.com/tools/hdfs/latest/download/oci-hdfs.zip
$unzip oci-hdfs.zip -d oci-hdfs
$ tree oci-hdfs/
oci-hdfs/
├── apidocs
(中略)
├── CHANGELOG.md
├── lib
│   ├── oci-hdfs-full-2.7.7.2.jar ★
│   └── oci-hdfs-full-2.7.7.2-javadoc.jar
├── LICENSE.txt
├── oci-hdfs.zip
├── third-party
│   └── lib
│       ├── bcpkix-jdk15on-1.60.jar ★
│       ├── bcprov-jdk15on-1.60.jar ★
│       └── jsr305-3.0.2.jar ★
└── Thirdpartyreadme.txt

11 directories, 44 files

 

Step 3 : Oracle CloudへアクセスすためのAPIキーペアを作成、登録する 

前回の記事ではS3互換APIキーを作成しました。これはApache Kafkaが、Oracle CloudのネイティブAPIではなくAmazon S3互換APIしかサポートしていないからです。 ですが、HDFS ConnectorはOracle CloudのネイティブAPIをサポートしていますので所定の方法でネイティブAPI用のキーペアを作成します。

$ cd $HOME
$ mkdir .oci
$ openssl genrsa -out ~/.oci/oci_api_key.pem 2048
$ chmod go-rwx ~/.oci/oci_api_key.pem
$ openssl rsa -pubout -in ~/.oci/oci_api_key.pem -out ~/.oci/oci_api_key_public.pem
$ ls
oci_api_key.pem  oci_api_key_public.pem
公開鍵と秘密鍵ができました。 公開鍵の内容を確認します。
$ cat oci_api_key_public.pem
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

この公開鍵をOCIにAPIキーとして登録します。
OCI Consoleログイン後、左上メニューから「アイデンティティ」->「ユーザー」と辿り、利用するユーザーのリンクをクリックします。
「ユーザーの詳細」画面で「公開キーの追加」ボタンをクリックし、上記で確認した鍵の内容を、下記のようにコピー&ペーストして「追加」をクリックすれば出来上がりです。

 

 

Step 4 : Sparkの設定ファイルを編集する

Sparkがオブジェクトストレージにアクセスできるようcore-site.xml にOCIの各種情報を設定します。

$ cd $SPARK_HOME/conf
$ vi core-site.xml
core-site.xmlファイルのサンプルです。
 
    fs.oci.client.hostname
    https://objectstorage.us-ashburn-1.oraclecloud.com
  
  
    fs.oci.client.hostname.myBucket.myNamespace
    https://objectstorage.us-ashburn-1.oraclecloud.com.kafka-bucket.ksonoda
  
  
    fs.oci.client.auth.tenantId
    ocid1.tenancy.oc1..xxxxxxxxxxxxxxxxxxxxxx
  
  
    fs.oci.client.auth.userId
    ocid1.user.oc1..xxxxxxxxxxxxxxxxxxxxxxxx
  
  
    fs.oci.client.auth.fingerprint
    69:f3:5b:c1:85:48:2c:5e:xxxxxxxxxxxxxxxxxx
  
  
    fs.oci.client.auth.pemfilepath
    ~/.oci/oci_api_key.pem
  

上記の各エントリの概要が下記になります。

エントリ 概要
fs.oci.client.hostname

オブジェクトストレージのRESTエンドポイントを指定。利用するリージョンによりエントリが異なります。上記サンプルではus-ashburn-1のリージョンを指定しています。

fs.oci.client.hostname.myBucket.myNamespace

RESTエンドポイントに加えてバケット名とテナント名を記載。上記サンプルでは、バケット名を「kafka-bucket」、テナント名を「ksonoda」と指定しています。

fs.oci.client.auth.tenantId テナントのOCIDを記載。Cloud Consoleから確認。
fs.oci.client.auth.userId APIをCallするIAMユーザーのOCIDを記載。Cloud Consoleから確認。
fs.oci.client.auth.fingerprint 上記IAMユーザーで使用するAPIキーペアのフィンガープリントを記載。Cloud Consoleから確認。
fs.oci.client.auth.pemfilepath 秘密キーへのファイルパス。Step3で作成したキーファイルのパス。デフォルトは「~/.oci/oci_api_key.pem」です。

 

これでcore-site.xmlは完成です。 次に、spark-defaults.conf.templateからspark-defaults.confにコピーします。

$ cp spark-defaults.conf.template spark-defaults.conf
viでspark-defaults.confの最後に下記一行を追加します。
$ vi spark-defaults.conf
spark.sql.hive.metastore.sharedPrefixes = shaded.oracle,com.oracle.bmc
これで設定ファイルの作業は完了です。

 

Step 5 : SparkShellを起動し、データ操作を行う

今回はSparkのインタラクティブシェルであるSparkShellで実行してみましょう。 SparkShellでHDFS Connectorを利用する方法は非常にシンプルです。 下記のようにSparkShell起動時に--jarsオプションを使い、Step 2でダウロードした4つのjarファイルを指定するだけです。

./bin/spark-shell --jars /home/opc/oci-hdfs/lib/oci-hdfs-full-2.7.7.2.jar, \
                         /home/opc/oci-hdfs/third-party/lib/bcpkix-jdk15on-1.60.jar, \
                         /home/opc/oci-hdfs/third-party/lib/bcprov-jdk15on-1.60.jar, \
                         /home/opc/oci-hdfs/third-party/lib/jsr305-3.0.2.jar
この--jarsオプションはSparkShellの機能拡張のために実装されているオプションです。 この仕組みを利用しオブジェクトストレージのネームスペースをSparkから直接識別できるようにしています。 これにより、このSparkShellのセッションでは下記のように「oci://...」という記述でオブジェクトストレージのネームスペースを表現することができます。 これはちょうど、HDFSのネームスペースを「hdfs://...」と表現するのと似ています。
scala> val text_file = sc.textFile("oci://@/test.txt)

 

Step 6 : SparkShellからオブジェクトストレージ上のjsonファイルを読み込み、データ操作を実行する

前回の記事では、テスト用のストリームデータをApache Kafka経由でオブジェクトストレージにjsonフォーマットで永続化しました。 ちょうど下記のようなデータです。

{
  "venue": {
    "venue_name": "JVP Media Quarter",
    "lon": 35.225697,
    "lat": 31.76721,
    "venue_id": 26516325
  },
  "visibility": "public",
  "response": "yes",
  "guests": 0,
  "member": {
    "member_id": 260937091,
    "photo": "https://secure.meetupstatic.com/photos/member/8/1/0/c/thumb_279693036.jpeg",
    "member_name": "Shalev Shachar"
  },
  "rsvp_id": 1797819747,
  "mtime": 1563781228842,
  "event": {
    "event_id": "263176706",
    "event_name": "DataHack 2019",
    "time": 1567600200000,
    "event_url": "https://www.meetup.com/DataHack/events/263176706/"
  },
  "group": {
    "group_topics": [
      {
        "urlkey": "statistical-computing",
        "topic_name": "Statistical Computing"
      },
      {
        "urlkey": "machine-learning",
        "topic_name": "Machine Learning"
      },
      {
        "urlkey": "data-analytics",
        "topic_name": "Data Analytics"
      },
      {
        "urlkey": "data-visualization",
        "topic_name": "Data Visualization"
      },
      {
        "urlkey": "data-mining",
        "topic_name": "Data Mining"
      },
      {
        "urlkey": "data-science",
        "topic_name": "Data Science"
      },
      {
        "urlkey": "applied-statistics",
        "topic_name": "Applied Statistics"
      },
      {
        "urlkey": "big-data-analytics",
        "topic_name": "Big Data Analytics"
      },
      {
        "urlkey": "statistical-modeling",
        "topic_name": "Statistical Modeling"
      }
    ],
    "group_city": "Tel Aviv-Yafo",
    "group_country": "il",
    "group_id": 19936118,
    "group_name": "DataHack - Data Science, Machine Learning & Statistics",
    "group_lon": 34.77,
    "group_urlname": "DataHack",
    "group_lat": 32.07
  }
}
このデータをSparkにDataFrameして取り込み、簡単なデータ操作を実行して、parquetフォーマットでオブジェクトストレージに書き戻してみましょう。 繰り返しになりますが、HDFS Connectorにより。Sparkのコード内で下記のように、オブジェクトストレージ上のオブジェクトを「oci://...」という 表記で表現できるようになります。
scala> val eventDF = spark.read.json("oci://kafka-bucket@ksonoda/topics/test-topic/date=2019-07-22/hour=07/test-topic+0+0000000000.json")
eventDF: org.apache.spark.sql.DataFrame = [event: struct, group: struct ... 7 more fields]
これで、データが入った、eventDFというDataFrameができました。 スキーマ構造を見てみます。
scala> eventDF.printSchema
root
 |-- event: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_url: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- group: struct (nullable = true)
 |    |-- group_city: string (nullable = true)
 |    |-- group_country: string (nullable = true)
 |    |-- group_id: long (nullable = true)
 |    |-- group_lat: double (nullable = true)
 |    |-- group_lon: double (nullable = true)
 |    |-- group_name: string (nullable = true)
 |    |-- group_state: string (nullable = true)
 |    |-- group_topics: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- topic_name: string (nullable = true)
 |    |    |    |-- urlkey: string (nullable = true)
 |    |-- group_urlname: string (nullable = true)
 |-- guests: long (nullable = true)
 |-- member: struct (nullable = true)
 |    |-- member_id: long (nullable = true)
 |    |-- member_name: string (nullable = true)
 |    |-- other_services: struct (nullable = true)
 |    |    |-- facebook: struct (nullable = true)
 |    |    |    |-- identifier: string (nullable = true)
 |    |    |-- twitter: struct (nullable = true)
 |    |    |    |-- identifier: string (nullable = true)
 |    |-- photo: string (nullable = true)
 |-- mtime: long (nullable = true)
 |-- response: string (nullable = true)
 |-- rsvp_id: long (nullable = true)
 |-- venue: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |    |-- venue_id: long (nullable = true)
 |    |-- venue_name: string (nullable = true)
 |-- visibility: string (nullable = true)
簡単なデータ操作をしてみましょう。 上記のように、多段階のjsonフォーマットになっているので、いくつかの情報だけを抜き出して、シンプルな テーブル構造のDataFrameを作ってみたいと思います。
scala> val event_memberDF = eventDF.select('member.getItem("member_id").as("MEMBER_ID"),'member.getItem("member_name").as("MEMBER_NAME"),'group.getItem("group_city").as("CITY"))
event_memberDF: org.apache.spark.sql.DataFrame = [MEMBER ID: bigint, MEMBER NAME: string ... 1 more field]
getItem()でmember_id、member_name、group_cityの3つを抜きだしたmemberDFという名前のDataFrameを作りました。 構造を見てみると下記のようになります。
scala> event_memberDF.show
+---------+--------------------+--------------+
|MEMBER ID|         MEMBER NAME|          CITY|
+---------+--------------------+--------------+
|260937091|      Shalev Shachar| Tel Aviv-Yafo|
|253153232|             thegoat|        London|
|217410063|          Roni Talvi| Tel Aviv-Yafo|
|203549208|         Zeb Qureshi|        Riyadh|
| 34162952|Passelaigue Philippe|         Paris|
|251994477|Sonya BARLOW Like...|        London|
|236468617|        Peter Strong|      Brisbane|
|188329333|        Jane Hebiton|         Perth|
|285716066|   Thalapathy Jinesh|       Chennai|
|124097382|        Nadine Smith|        Durban|
|272267506|        Philip Yates|        London|
|207094267|            Elodie_L|         Paris|
|196556526|               Rahul|    Carmichael|
|284653354|     Stefan de Graaf|      Kilkenny|
| 96032612|      Melissa Burgos|      New York|
|285716066|   Thalapathy Jinesh|       Chennai|
| 13031231|        Victor Wyatt|        London|
|283710198|       Kamran Majeed|     Islamabad|
+---------+--------------------+--------------+
only showing top 20 rows
このDataFrameをオブジェクトストレージに書き戻したいと思います。 折角なのでフォーマットをparquetに変更して書き戻してみましょう。 やはり、この際も、オブジェクトストレージ上のネームスペースの指定は「oci://...」です。
scala> event_memberDF.coalesce(1).write.option("header","true").format("parquet").save("oci://kafka-bucket@ksonoda/event_member.parquet")
これで、オブジェクトストレージに書き戻すことができました。 念のため、書き戻したevent_member.parquetを再度Sparkに読み込んで内容を確認してみたいと思います。
scala> spark.read.parquet("oci://kafka-bucket@ksonoda/event_member.parquet").show
+---------+--------------------+--------------+
|MEMBER_ID|         MEMBER_NAME|          CITY|
+---------+--------------------+--------------+
|260937091|      Shalev Shachar| Tel Aviv-Yafo|
|253153232|             thegoat|        London|
|217410063|          Roni Talvi| Tel Aviv-Yafo|
|203549208|         Zeb Qureshi|        Riyadh|
| 34162952|Passelaigue Philippe|         Paris|
|251994477|Sonya BARLOW Like...|        London|
|236468617|        Peter Strong|      Brisbane|
|188329333|        Jane Hebiton|         Perth|
|285716066|   Thalapathy Jinesh|       Chennai|
|124097382|        Nadine Smith|        Durban|
|272267506|        Philip Yates|        London|
|207094267|            Elodie_L|         Paris|
|196556526|               Rahul|    Carmichael|
|284653354|     Stefan de Graaf|      Kilkenny|
| 96032612|      Melissa Burgos|      New York|
|285716066|   Thalapathy Jinesh|       Chennai|
| 13031231|        Victor Wyatt|        London|
|283710198|       Kamran Majeed|     Islamabad|
+---------+--------------------+--------------+
only showing top 20 rows

ちゃんと、書き戻されていることが確認できました。

ここから、更に、このParquetファイルをベースにデータウェアハウスを作りたいという場合は、ブログ「ADWからObject StoreのParquetファイルにアクセスしてみる」を併せてご参照ください。オブジェクトストレージ上のparquetファイルはOracle CloudのAutonomous Data Warehouseからも外部表としてアクセスできます。

つまり、エッジで生成されたストリームデータをApache Kafkaで受け、オブジェクトストレージへ永続化後、Apache Sparkのバッチ処理でデータ変換を行い、 Autonomous Data Warehouse でレポート作成、分析などを行うというシナリオが完成します。

このように、オブジェクトストレージを共通データストア基盤とすることで、用途に応じたあらゆるデータ処理エンジンを導入でき、かつ、データ処理エンジン間のデータ移動を最小限に抑えることができる効率のよいシステムが実現できます。

以上、本記事では、Sparkからオブジェクトストレージ上のデータを読み書きするための仕組みを概説しました。

 

参考資料

HDFS Connector for Object Storage

Using the HDFS Connector with Spark

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.