リモート開発メインのソフトウェア開発企業のエンジニアブログです

SparkでDataFrameの内容を単一のファイルに保存する

Sparkで処理したDataFrameをファイルとしてディスクに書き出す際、通常ファイルはパーティションの数分作成されます。

val df = Seq(("Taro", 25, "Male"), ("Jiro", 19, "Male"), ("Hanako", 29, "Female")).toDF("Name", "Age", "Sex")
df.rdd.getNumPartitions // =3
df.write.option("header", "true").csv("/tmp/spark_test/people")

上記の df はパーティション数が3つなので、 /tmp/spark_test/people は次のように3ファイルが生成されます:

/tmp
  spark_test/
    people/
      _SUCCESS # これはメタデータファイル
      part-00001-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv
      part-00000-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv
      part-00002-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv

CSVのように、Sparkではない別のシステムでも使うようなファイルの場合は、単一のファイルの方が好都合な場合があります。

そこで今回は、DataFrameを単一のファイルに書き出す方法を2つ紹介します。

DataFrameを1つのパーティションに集める

colease または repartition を使ってDataFrameを1つのパーティションに集めてから書き出します:

val singlePartitionedDf = df.coalesce(1)
singlePartitionedDf.rdd.getNumPartitions // =1
singlePartitionedDf.write
  .option("header", "true")
  .mode("overwrite")
  .csv("/tmp/spark_test/people")

ファイルは次の通りです:

/tmp
  spark_test/
    people/
      _SUCCESS # これはメタデータファイル
      part-00000-c66cd457-8b41-4102-b000-138291dd351d-c000.csv

colease と repartition の違いは このStackOverflow が詳しいです。

Pros

  • 書き出しの際に余分なファイルを生成しない
  • どのフォーマットでも使える

Cons

  • 指定したファイル名で保存したい場合、リネームの処理の実装が必要になる
  • DataFrameのサイズが膨大な場合、1つのパーティション (Executor) にデータが乗り切らない場合がある
    • Sparkはオンメモリで動作する

FileUtils.copyMerge等を使ってファイルを結合する

import org.apache.hadoop.fs.{FileUtil, Path}

df.write
  .option("header", "true")
  .mode("overwrite")
  .csv("/tmp/spark_test/people")

val src = new Path("/tmp/spark_test/people")
val dest = new Path("/tmp/spark_test/people.csv")
val conf = spark.sparkContext.hadoopConfiguration
val destFs = dest.getFileSystem(conf)

destFs.delete(dest, true) // 既に同名のファイルが有ると IOException がスローされるので削除しておく

FileUtil.copyMerge(src.getFileSystem(conf), src, destFs, dest, false /* 結合後にソースファイルを削除したい場合は true */, conf, null)

/tmp/spark_test/people.csv が生成されます。

但し、CSVファイルの場合、上記のコードではうまくいきません。何故なら、今回はヘッダーを出力しているのですが、一旦は3ファイルにそれぞれヘッダーが書き出される為、そのまま結合するとヘッダー行が複数混在してしまう為です。

なので、一工夫が必要です:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType

val dataDf = df.select(df.columns.map(c => df.col(c).cast(StringType)): _*)
val headerDf = spark.createDataFrame(Seq(Row.fromSeq(dataDf.columns.toSeq)).asJava, dataDf.schema)

headerDf
  .union(dataDf)
  .write
  .option("header", "false")
  .mode("overwrite")
  .csv("/tmp/spark_test/people")

ヘッダー行として、列の名前を行データとして持つDataFrameと、元のDataFrameをunionで結合しています。但し、ヘッダー行はすべての列が文字列の為、元のDataFrameの全ての列を文字列型にキャストする必要があります。

この後、先程の copyMerge の処理をすればOKです。処理が成功すると、/tmp/spark_test/people.csv にファイルが生成されます:

Name,Age,Sex
Taro,25,Male
Jiro,19,Male
Hanako,29,Female

Spark 3.x では使えなくなる

Spark 3.0で copyMerge は FileUtil クラスから削除されました。この為、Spark 3.xで使いたい場合は自前で実装する必要がありますが、実装自体はシンプルなので問題ないかと思います。

FileUtil#copyMerge

Pros

  • 指定したファイル名で単一のファイルが書き出せる
  • DataFrameの書き出しはパーティショニングされているので、メモリの心配は無い

Cons

  • 一時的にパーティショニングされたデータを書き出す必要があるので、ストレージの容量を余分に使う
  • Spark 3.xでは自前で実装する必要がある
  • CSVでヘッダー行を出したい場合は事前にDataFrameを加工する必要がある
  • Parquetなどのバイナリフォーマットでは使えない

おわり

以上、今回はSparkのDataFrameを1つのファイルに書き出す方法を2つ紹介しましたが、プロジェクトのユースケースに合わせて使い分けると良いかと思います。

← 前の投稿

Slackの新しくなったメッセージ取得APIを使用する際に考えなければならないこと色々

次の投稿 →

BERTのモデル構造をもう少し詳しく

コメントを残す