静的データベースと動的データベース(Spark SQLの小ネタ)
このようなデータベースの種類を分ける概念は一般的にはないと思われますが、「Spark SQL」で開発しているとまさに動的だなぁという思いが湧いてくることが多々ありましたので記事にしてみました。(従来のRDBであるMySqlなどでも動的な使い方が出来るとは思いますが、スキーマを動的に変えたり、メモリ上で新しいテーブルを生成したりという使い方はあまりしないので、これらは静的という扱いにしています。)
Spark SQLの簡単な説明
Spark SQLは以下のような使い方を良くします。カラムをscalaプログラムで動的に追加して、別のテーブルとして保存するといった流れです。ちなみにテーブルはAWSのS3にparquetという形式のファイルとして保存できるので、それを読みこんでメモリ上にも新たにテーブルが作られるというイメージです。メモリ上のテーブルはSparkではDataFrameと呼ばれます。parquetファイルとDataFrameは両方ともスキーマ情報を含んでいますので、プログラムで特にその指定を明示的にしなくても扱えるので便利です。
// S3のparquetファイルを読み込む
// 1. カラム id, date, info を有するparquetファイルが
// 「date=2020-09-14」ディレクトリの下に配置されている場合
// 2. Sparkにはpartitionという機能があり、dateカラムの日付別に
// ディレクトリを持たせて管理することが出来ます
val dataFrame =
spark.read.parquet(s"s3://test-1234/sample_table/date=2020-09-14")
// 新しいカラムを追加(column_newはカラム名、次の引数でカラムの値を指定)
val dataFrameNew =
dataFrame.withColumn("column_new", lit("新しいカラム"))
// dataFrameNewを別のテーブルとして書き込み
dataFrameNew
.select("id", "date", "info", "column_new")
.write
.option("header","false").mode("overwrite").partitionBy("date")
.parquet(s"s3://test-1234/new_table/date=2020-09-14")
// dataFrameとdataFrameNewのスキーマを覗くとカラムが追加されたのが分かります
println(dataFrame.schema.fieldNames)
println(dataFrameNew.schema.fieldNames)
スキーマ情報の違うテーブルを無理やり結合(SQLのunion相当)する
今回の記事のメインの部分になります。Sparkの動的な機能を使うとこんなことも出来るよ!という内容になってます。
Sparkの2つのDataFrameを結合する場合、スキーマ情報を一致させ、さらにselectでカラムの順番を一致させるという手順が必要になります。これを実現するには、上記でも説明した通り、追加すべきカラムの名前を調べてからwithColumn()で明示的に追加する必要があるのですが、このステップをなんとか動的に出来ないかと試行錯誤したところ可能だということが分かりましたのご紹介します。
この方法を使えばスキーマ情報の違うテーブルがいくつあろうと自動で結合してくれるので、テーブルを跨いで情報を調べたいときに便利です。
仮に以下のような日付ごとにスキーマ情報の違うテーブルがs3に存在するとして説明をします。
前提としてスキーマ情報は以下とします。
id 数値
date 文字列(yyyy-mm-dd)
info_a 文字列
info_b 文字列
■テーブル1
s3://test-1234/sample_table/date=2020-09-15/aaaa.parquet
=> カラムは id, date, info_a を含む
+----+----------+------+
| id | date |info_a|
+----+----------+------+
| 1|2020-09-15| red|
| 2|2020-09-15| blue|
+----+----------+------+
■テーブル2
s3://test-1234/sample_table/date=2020-09-16/bbbb.parquet
=> カラムは id, date, info_b を含む
+----+----------+------+
| id | date |info_b|
+----+----------+------+
| 1|2020-09-16| green|
| 2|2020-09-16| gray|
+----+----------+------+
上記の2つのテーブルを結合して以下のようなテーブルを生成したい
+----+----------+------+------+
| id | date |info_a|info_b|
+----+----------+------+------+
| 1|2020-09-15| red| null|
| 2|2020-09-15| blue| null|
| 1|2020-09-16| null| green|
| 2|2020-09-16| null| gray|
+----+----------+------+------+
上記のように同じテーブル内の別のpartitionに違うスキーマのparquetファイルを保存することは理論上可能です。
info_a、info_bという名前を意識しなくても、自動で結合させるには以下のような方法を用います。
val dateList = List("2020-09-15", "2020-09-16")
// date partitionごとにファイルを読み込んでDataFrameのリストを生成
val dfListByDate =
// 日付のリストをDataFrameのリストに変換する
dateList.map { date =>
spark.read
.parquet(s"s3://test-1234/sample/date=$date")
// partitionごとにreadするとdateカラムが消えてしまうので付加しなおす
.withColumn("date", lit(s"$date"))
}
// 最終的に必要となるカラム名のリストを作成
// 今回の例の場合、List("id", "date", "info_a", "info_b")が得られる
val mergedFieldNames =
dfListByDate.map(_.schema.fieldNames).flatten.toSet.toList
// 最後にDataFrameのお互いに足りないカラムをNULL値で補い結合します。
// ここでポイントとなるのはselect()する際に個別のカラム名ではなく
// カラム名のリストを渡すことです。Spark SDKの素晴らしい柔軟性ですね。
val result =
// カラムの追加後、reduceLeft()で全てのDataFrameを結合
dfListByDate.map { dfByDate =>
// カラム名のリストのdiffを取れば不足しているカラム名が得られる
val neededfieldNames =
mergedFieldNames diff dfByDate.schema.fieldNames;
// 不足しているカラムがない場合はselect()して順番を合わせるだけ
if (neededfieldNames.length == 0) {
dfByDate.select(mergedFieldNames.map(col): _*)
// 不足しているカラムがある場合はwithColumn()で付加してからselect()する
} else {
neededfieldNames.foldLeft(dfByDate) {
(df: Dataset[Row], fieldName) =>
df.withColumn(fieldName, lit(null))
}.select(mergedFieldNames.map(col): _*)
}
}.reduceLeft(_ union _)
// 結合完了!!!!
result.show
上記のdfListByDateの部分を自分の結合させたい好きなリストに変更してあげれば、どんなテーブルでも結合しほうだいになりました。
ただし、idの値が1つは文字列、1つは数値のように型が違う場合はうまくいかないかもしれませんので、ご注意ください。この型の変換も上記の自動化の中に組み込めばさらに汎用性が高まりそうです。
まとめ
この便利さに触れると他のSQL(RDS, No SQL, SAPなど)の動的な仕組みというものがどうなっているかについて知りたくなってきますね。調べておもしろそうな内容あればまた記事にしたいと思います。
コメントを残す