Spark の DataFrame のテスト

Spark の DataFrame のテスト

はじめに

Apache Spark では、御存知の通り大規模なデータを高速に扱う事が出来ます。大規模データ処理のインフラという観点では、速度のチューニングのために、データ構造を調整したりデータ処理の順番を最適化したりという作業は良くやると思います。

一方、Spark を、与えられた入力データに対して所定の結果を出力するという、データ処理関数・プロシージャーと見た場合、入力データに対して正しい結果を出力する事を担保する必要があります。

一般的なデータ処理のプログラムであれば、処理をテストしやすい形に分割した上で、ユニットテスト(単体テスト)を書くのが一般的だと思います。Spark の場合はどうすれば良いのでしょうか。

本記事では、Apache Spark 上で動く、 DataFrame (あるいは Dataset) を使ったデータ処理プログラムの単体テストを書く方法について記載します。

前提条件・要件

今回扱うテスト対象のプログラムとしては、以下のような前提条件を想定します。

  • HDFS/S3 上の parquet ファイルを入力
  • DataFrame を使用してデータ処理を行う
  • リアルタイムではなくバッチ処理

テストの要件としては以下の通りです。

  • 正しい入力データに対して、正しい結果が出ること担保する
    • エッジケースについてもテスト
  • 入力データ異常、欠損データを早い段階で捕捉できることを担保する
  • テストはローカル環境で実施可能(外部の HDFS/S3 を必要としない)
    • sbt test で実行する

やることの概要

今回、以下のライブラリを使います。
holdenk/spark-testing-base: Base classes to use when writing tests with Spark

テストケースは ScalaTest を使って書きますので、ScalaTest の経験が無い場合は、予めドキュメントに目を通しておくことをお勧めします。

実際の手順

セットアップ

基本的に、README に書いてある通りです。

spark-testing-base は、<Spark のバージョン>_<spark-testing-base のバージョン> のようにバージョンを指定します。Release Notes を見ると、最新は 0.10.0 のようです。(私が使ったのは、もう少し古いバージョンです。)

従って、以下のように dependency を build.sbt 等に追加します。

val sparkVersion = "2.3.1"
val sparkTestingBaseVersion = s"${sparkVersion}_0.10.0"

"com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",

また、デフォルトのメモリ設定だと OutOfMemoryError が発生しやすいので、以下の設定も追加します。

fork in Test := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled"),

テスト対象のクラス

テスト対象のクラスを書く際には、通常の Scala と同様のプラクティスが当てはまることが多いです。具体的には

  • 副作用を局所化する
  • 適切に分割する

などです。

また、今回はタイトルの通り DataFrame (= Dataset[Row])をテストするのですが、例として以下のようなクラスを考えます。ここでは、foo メソッドをテストすると仮定します。

object FooMain {
  def main(args: Array[String]): Unit = {
    implicit val session: SparkSession = SparkSession
      .builder()
      .appName("example-app")
      .enableHiveSupport()
      .getOrCreate()

    val df1 = session
      .read
      .parquet(s"s3://example-bucket/data1")

    val df2 = session
      .read
      .parquet(s"s3://example-bucket/data2")

    val resultDf = foo(df1, df2)

    resultDf.write.parquet(s"s3://example-bucket/result")
  }

  def foo(
    inputDf1: Dataset[Row],
    inputDf2: Dataset[Row]
  )(implicit spark: SparkSession): Dataset[Row] {
    import spark.implicits._

    inputDf1.join(inputDf2, "id")
      .filter(....)
      .select(....)
      // など
  }
}

テストの書き方

次に、テストの書き方ですが、まずはコードから載せます。

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.{Dataset, Row}
import org.scalatest.FunSuite

class FooTest extends FunSuite with DataFrameSuiteBase {
  test("test something") {
    // ここの `spark` は、DataFrameSuiteBase の親である
    // DataFrameSuiteBaseLike で定義されている
    // その他、`sc` (SparkContext) なども使用可能
    import spark.implicits._

    val df1 = List(
      (1, 5, "abc"),
      (2, 11, "def"),
      (3, 1, "Z")
    ).toDF("id", "col_1a", "col_1b").alias("df1")
    val df2 = List(
      (2, 5),
      (3, 3),
      (4, 1)
    ).toDF("id", "col_2").alias("df2")

    val resultDf = FooMain.foo(df1, df2)(spark)
    val expectedDf = List(
      (2, 16),
      (3, 4)
    ).toDF("id", "sum")

    assertDataFrameEquals(expectedDf, resultDf)
  }
}

コードで大体分かるかと思いますが、

  • テスト対象メソッドに渡す DataFrame は、 List などから作成
  • 期待される結果の DataFrame も、同様に List などから作成
  • 実際の結果との比較には、spark-testing-base が提供する assertion を使用

といった流れです。

実行

sbt test で実行します。詳細は ScalaTest のドキュメントを参照して下さい。

その他、細かい情報

Hive support

Spark には、Hive support という機能があり、それを使うと Hive のテーブルを読み書きしたりできます。(詳細は Spark のドキュメントを参照。)

ローカルのテストを実行する環境によっては、この機能が有効となっていると、以下のようなエラーが発生する可能性があります。

java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':
 (略)
Cause: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x

原因としては、以下の Stack Overflow にある通り色々なケースがあるようです。

Spark 2.1 – Error While instantiating HiveSessionState – Stack Overflow

ただ、テストコードの中で Hive support の機能を使う必要が無ければ、これを無効にすることが出来ます。

方法としては DataFrameSuiteBaseLike#enableHiveSupport を override して false を返すようにすれば良いです。具体的には以下のようにします。

class FooTest extends FunSuite with DataFrameSuiteBase {
  override protected implicit def enableHiveSupport: Boolean = false

  test("test foo") {
    // :
    // :
  }
}

その他のクラスの使い方は Wiki を

上の例では DataFrameSuiteBase を使いましたが、それと同様に DatasetSuiteBase というのもあります。また、stream を対象とした StreamingSuiteBase もあります。

また、上の例のように List などからテストデータを作らず、簡単に作成してくれる XxxGenerator というのもありますので、テストの種類に応じて仕様を検討してみて下さい。

詳しくは公式の wiki を参照。

Home · holdenk/spark-testing-base Wiki

まとめ

spark-testing-base を使うと、Spark のデータ処理に対するテストが比較的簡単に書けます。

Spark では大規模なデータを扱うことが多いので、本番稼働が始まってからデータの不備やエッジケースなどで予期せぬ挙動やシステム停止が起きると、小規模なシステムに比べて影響が大きいです。テストを書くことで、そうしたリスクを減らすことができます。