Elasticsearch for Apache Hadoopを使ってSparkからAmazon ESにデータと連携してみた

Elasticsearch for Apache Hadoopを使ってSparkからAmazon ESにデータと連携してみた

今とあるプロジェクトで、Amazon EMRを使って少し大きめなボリュームのデータ処理をしているのですが、その中のあるデータの中身をWebフォームからニアリアルタイムでフィルタリングしたいと言う要望があり、その基盤としてElasticsearchを採用する事にしました。

前提としては、ざっくり言うと以下のような環境です:

  • 対象データの総ボリュームは5.5億レコード
    • ただし特定フィールドで集計を行うので、Elasticsearchに入れるルートドキュメントの数でいうと3000万程度
  • データの更新頻度は1日に1度
    • 極端に言えば24時間以内に処理が終わればOKと言う事
  • データの更新とは別で、1日に1度、予め保存したクエリ条件にマッチするレコードをParquetで書き込むと言う要件もある
  • ElasticsearchはVPC内のAmazon ES (Elasticsearch Service) 管理のドメインを使用
    • ロードバランサー越しにしか通信できない
    • プロビジョンしたElasticsearchのバージョンは執筆時点で最新版の6.7

Elasticsearchについて

データの投入方法以前に、ユースケースとして事前に想定ボリューム数で検証した結果、Elasticsearchであれば(ボリューム的に当然ですが)問題なく検索が可能と言う事が分かったので事前に決定していました。

クラスタは先述の通りAmazon ESで管理されています。

Sparkについて

投入方法については先述の要件が満たせれば何でも良かったのですが、色々試した結果、そもそもEMRでのデータ処理自体にも使っていた事もあって、Sparkが良さそうと言う事になり、これに決定しました。

また、今回双方の連携にはElasticsearchが提供しているElasticsearch for Apache Hadoopと言うインテグレーションライブラリを使いました。
SparkとElasticsearchを連携する為のAPIがJARアーカイブにまとめられており、 ここからダウンロードが可能です。

なお、今回はAmazon ESのバージョンが執筆時点で最新である6.7なので、それにあわせ、Elasticsearch for Apache Hadoop 6.7.2を使いました。

※実際のアプリケーションではbuild.sbtとかで依存管理をしますが今回はテストなので直接👆のjarを使います。
※Maven Repositoryのページは: https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.7.2

試してみる

事前に、先程👆でダウンロードしたJARをS3の適当なバケットにアップロードしておきます。

elasticsearch-hadoop-6.7.2.jar

その後、EMRで適当にSparkのクラスタを作成します。こちらも執筆時点で最新版の “Spark 2.4.0 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.1” を選択しています:

EMRクラスタ作成画面

クラスタがReadyになったら、SSHでログインします:

早速spark-shellを起動します。この時、先程用意したJARといくつかのオプションを設定しておきます:

$ spark-shell --jars s3://YOUR-BUCKET/elasticsearch-hadoop-6.7.2.jar \
  --conf spark.es.nodes=vpc-my-es-xxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com \
  --conf spark.es.port=80 \
  --conf spark.es.nodes.wan.only=true

Elasticsearch for Apache Hadoopでは、spark-shell または spark-submit 時に spark. 接頭辞を付ける事で、インテグレーションAPIの設定を指定する事ができます。
例えば spark.es.nodes は es.nodes と言うkeyで設定が行われます。(設定の一覧については公式ドキュメントを参照の事)

上記の3つの設定ですが、執筆時点ではAmazon ESとの疎通では必須となる設定です。Amazon ESで管理されているノードは直接エンドポイントが公開されず、通信には全て vpc-hogehoge-xxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com のようなドメインのロードバランサーを経由する必要があります。
また、ポートについてもデフォルトの9200ではなく80が使われてるので、指定しておく必要があります。
最後の es.nodes.wan.only ですが、このAPIがデフォルトの挙動として、各ノードと個別にやり取りをする事で効率化を図ろうとする挙動(Node Discovery)となっているのですが、先述の通りAmazon ESではそれが行えないので、この設定で true を明示しておかないとエラーが発生します。

尚、設定はSparkアプリケーションを起動後も個別に設定を指定する事ができます(後述)が、こう言う基本的な設定は起動時に設定しても良いでしょう。

さて、先程の spark-shell が成功すると、以下のようにreplが起動します:

データを書き込んでみる

早速、次のコマンドを実行してみましょう:

scala> import org.elasticsearch.spark._

scala> val inputData = Seq( Map("id" -> "1", "name" -> "Alice"), Map("id" -> "2", "name" -> "Britney") )
inputData: Seq[scala.collection.immutable.Map[String,String]] = List(Map(id -> 1, name -> Alice), Map(id -> 2, name -> Britney))

scala> sc.makeRDD(inputData).saveToEs("test-index/_doc")

これは、以下を実行しています:

  • 1行目の import org.elasticsearch.spark._ を import する事で、デフォルトのSparkContextを拡張し、saveToEs メソッドが実行できるようにしています
    • org.elasticsearch.spark.rdd.EsSpark.saveToEs(rdd, indexName) ともできます
  • 2行目で入力用JSONデータの作成
  • 3行目でデータをRDD化し、そのままElastcsearchのインデクス test-index のタイプ _docに保存しています

これで、2行のレコードが追加されました。別ターミナルでレコードを確認してみましょう:

$ curl vpc-my-es-xxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com/test-index/_search?pretty
{
  ...,
  "hits" : {
    ...,
    "hits" : [
      {
        "_index" : "test-index",
        "_type" : "_doc",
        "_id" : "1aHaYGsB-_mWDM_3fLJ-",
        "_score" : 1.0,
        "_source" : {
          "id" : "1",
          "name" : "Alice"
        }
      },
      {
        "_index" : "test-index",
        "_type" : "_doc",
        "_id" : "1qHaYGsB-_mWDM_3gLJb",
        "_score" : 1.0,
        "_source" : {
          "id" : "2",
          "name" : "Britney"
        }
      }
    ]
  }
}

データを読み込んでみる

反対に、Elasticsearchに入っているデータからDataFrameを作ってみたいと思います。以下は、先程書き込んだインデクスのデータを読み込んでいます:

scala> val dfFromEs = spark.read.format("es").load("test-index/_doc")
dfFromEs: org.apache.spark.sql.DataFrame = [id: string, name: string]

scala> dfFromEs.printSchema
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

spark.read.format("es") でリーダーを初期化して、 .load(インデクス名) で読み込むだけです。
replからの応答で分かりますが、入力した通りのドキュメント構造が再現されているので、実際にRDD化してcollectしてみましょう:

scala> dfFromEs.rdd.map { case org.apache.spark.sql.Row(id: String, name: String) => Map("id" -> id, "name" -> name) } .collect
resN: Array[scala.collection.immutable.Map[String,String]] = Array(Map(id -> 2, name -> Britney), Map(id -> 1, name -> Alice))

無事取得する事ができました。

設定について

さて、spark-shell 実行時にいくつか設定を指定したと思いますが、設定は各オペレーションを実行時にもオーバーライド指定する事が可能です。

書き込み時

例えば、先程の書き込み例のJSONにおいて、 id をElasticsearchのメタフィールドである _id にマッピングし、データを更新可能にしたいとします。
また、その際ドキュメント自体からは id を削除してみましょう:

scala> val inputData = Seq( Map("id" -> "1", "name" -> "Alice"), Map("id" -> "2", "name" -> "Britney") )
inputData: Seq[scala.collection.immutable.Map[String,String]] = List(Map(id -> 1, name -> Alice), Map(id -> 2, name -> Britney))

scala> sc.makeRDD(inputData).saveToEs("test-index-v2/_doc", Map( "es.mapping.id" -> "id", "es.mapping.exclude" -> "id" ))

saveToEs の第2引数(org.elasticsearch.spark.rdd.EsSpark.saveToEs の場合は第3引数)にMapで設定を追加指定する事が可能で、この設定は spark-shell 実行時に指定した設定とマージされます。
ちなみにes.mapping.id ではエントリのどのフィールドを _id にマップするか、また es.mapping.exclude ではどのフィールドを書き込み対象から外すかをそれぞれ指定できます。

実際にデータを見てみましょう:

$ curl vpc-my-es-xxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com/test-index-v2/_search?pretty
{
  ...,
  "hits" : {
    ...,
    "hits" : [
      {
        "_index" : "test-index-v2",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "name" : "Britney"
        }
      },
      {
        "_index" : "test-index-v2",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "name" : "Alice"
        }
      }
    ]
  }
}

想定した通りデータが入っています。また、実行結果は割愛しますが、先程述べた通り _id を指定しているので、既存データの更新を行う事ができます。

読み込み時

読み込み時にも同様の事が可能です。例えば、先程 _id を指定して書き込んだ方はドキュメント自体は id を含めないようにしましたが、このままだとDataFrame側でidが参照できず不便な事があります。
そこで、以下のように es.read.metadata オプションにtrueを指定して読み込んでみましょう:

scala> val dfFromEs = spark.read.format("es").options(Map( "es.read.metadata" -> "true" )).load("test-index-v2/_doc")
dfFromEs: org.apache.spark.sql.DataFrame = [name: string, _metadata: map<string,string>]

scala> dfFromEs.printSchema
root
 |-- name: string (nullable = true)
 |-- _metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

先程とは異なり、 _metadata と言うオブジェクトが入れ子になっています。
ここに _id やら _type が入っているので、例えば入力時のMapを再現したい場合、RDD化してから取り出してもいいですし、次のようにしてDataFrame上でカラム化しておく事もできます:

scala> dfFromEs.select(col("_metadata")("_id").alias("id"), col("name")).rdd.map { case org.apache.spark.sql.Row(id: String, name: String) => Map("id" -> id, "name" -> name) } .collect
resN: Array[scala.collection.immutable.Map[String,String]] = Array(Map(id -> 2, name -> Britney!!), Map(id -> 1, name -> Alice))

おすすめの設定

設定について既にいくつか述べましたが、今回のユースケースでは更に以下の設定を追加で指定しています:

  • es.batch.size.bytes: (default: 1mb)
  • es.batch.size.entries: (default 1000)
  • es.scroll.size: (default: 50)

いずれもパフォーマンスに関係がある設定となっており、最初の es.batch.size.bytes と es.batch.size.entries は書き込み時にBulk APIを呼び出す際のサイズで、コンポーズしたJSONエントリがいずれかのサイズに到達したらAPIがコールされると言う内容になっており、当然ながら大きいサイズを指定する事でElasticsearchへの接続数が減ります。
設定値についてはドキュメントサイズの傾向や、クラスタの設定、ネットワークの構成、Spark上でのタスク数に依存する為正解はありませんが、エラーが頻発しない範囲で、可能な限り大きい数値を使って調整しています。

es.scroll.size も同様で、Elasticsearchに投入するようなデータ量だと50では少なすぎで、例えば今回の量だと60万回もElasticsearchのScroll APIを呼び出さないといけなくなり、現実的ではないのでこれも可能な限り大きい値を設定しておく必要があります。

※パフォーマンスについては公式ドキュメントのこのページが詳しいです。

まとめ

Elasticsearchが提供するインテグレーションを使うと、Sparkとの連携が楽に出ると言う事が分かりました。
現在はまだ本格的な運用フェーズに入っていない為細かいチューニング等はまだしていないので、また別の機会でそのあたりをいつかご紹介できればと思います。

we are hiring

優秀な技術者と一緒に、好きな場所で働きませんか

株式会社もばらぶでは、優秀で意欲に溢れる方を常に求めています。働く場所は自由、働く時間も柔軟に選択可能です。

現在、以下の職種を募集中です。ご興味のある方は、リンク先をご参照下さい。