Spark の DataFrame のテスト
はじめに
Apache Spark では、御存知の通り大規模なデータを高速に扱う事が出来ます。大規模データ処理のインフラという観点では、速度のチューニングのために、データ構造を調整したりデータ処理の順番を最適化したりという作業は良くやると思います。
一方、Spark を、与えられた入力データに対して所定の結果を出力するという、データ処理関数・プロシージャーと見た場合、入力データに対して正しい結果を出力する事を担保する必要があります。
一般的なデータ処理のプログラムであれば、処理をテストしやすい形に分割した上で、ユニットテスト(単体テスト)を書くのが一般的だと思います。Spark の場合はどうすれば良いのでしょうか。
本記事では、Apache Spark 上で動く、 DataFrame
(あるいは Dataset
) を使ったデータ処理プログラムの単体テストを書く方法について記載します。
前提条件・要件
今回扱うテスト対象のプログラムとしては、以下のような前提条件を想定します。
- HDFS/S3 上の parquet ファイルを入力
- DataFrame を使用してデータ処理を行う
- リアルタイムではなくバッチ処理
テストの要件としては以下の通りです。
- 正しい入力データに対して、正しい結果が出ること担保する
- エッジケースについてもテスト
- 入力データ異常、欠損データを早い段階で捕捉できることを担保する
- テストはローカル環境で実施可能(外部の HDFS/S3 を必要としない)
- sbt test で実行する
やることの概要
今回、以下のライブラリを使います。
holdenk/spark-testing-base: Base classes to use when writing tests with Spark
テストケースは ScalaTest を使って書きますので、ScalaTest の経験が無い場合は、予めドキュメントに目を通しておくことをお勧めします。
実際の手順
セットアップ
基本的に、README に書いてある通りです。
spark-testing-base は、<Sparkのバージョン>_<spark-testing-base自体のバージョン>
のようにバージョンを指定します。Release Notes を見ると、最新は 0.10.0 のようです。(私が使ったのは、もう少し古いバージョンです。)
従って、以下のように dependency を build.sbt 等に追加します。
val sparkVersion = "2.3.1"
val sparkTestingBaseVersion = s"${sparkVersion}_0.10.0"
"com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
また、デフォルトのメモリ設定だと OutOfMemoryError
が発生しやすいので、以下の設定も追加します。
fork in Test := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled"),
テスト対象のクラス
テスト対象のクラスを書く際には、通常の Scala と同様のプラクティスが当てはまることが多いです。具体的には
- 副作用を局所化する
- 適切に分割する
などです。
また、今回はタイトルの通り DataFrame
(= Dataset[Row]
)をテストするのですが、例として以下のようなクラスを考えます。ここでは、foo
メソッドをテストすると仮定します。
object FooMain {
def main(args: Array[String]): Unit = {
implicit val session: SparkSession = SparkSession
.builder()
.appName("example-app")
.enableHiveSupport()
.getOrCreate()
val df1 = session
.read
.parquet(s"s3://example-bucket/data1")
val df2 = session
.read
.parquet(s"s3://example-bucket/data2")
val resultDf = foo(df1, df2)
resultDf.write.parquet(s"s3://example-bucket/result")
}
def foo(
inputDf1: Dataset[Row],
inputDf2: Dataset[Row]
)(implicit spark: SparkSession): Dataset[Row] {
import spark.implicits._
inputDf1.join(inputDf2, "id")
.filter(....)
.select(....)
// など
}
}
テストの書き方
次に、テストの書き方ですが、まずはコードから載せます。
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.{Dataset, Row}
import org.scalatest.FunSuite
class FooTest extends FunSuite with DataFrameSuiteBase {
test("test something") {
// ここの `spark` は、DataFrameSuiteBase の親である
// DataFrameSuiteBaseLike で定義されている
// その他、`sc` (SparkContext) なども使用可能
import spark.implicits._
val df1 = List(
(1, 5, "abc"),
(2, 11, "def"),
(3, 1, "Z")
).toDF("id", "col_1a", "col_1b").alias("df1")
val df2 = List(
(2, 5),
(3, 3),
(4, 1)
).toDF("id", "col_2").alias("df2")
val resultDf = FooMain.foo(df1, df2)(spark)
val expectedDf = List(
(2, 16),
(3, 4)
).toDF("id", "sum")
assertDataFrameEquals(expectedDf, resultDf)
}
}
コードで大体分かるかと思いますが、
- テスト対象メソッドに渡す
DataFrame
は、List
などから作成 - 期待される結果の
DataFrame
も、同様にList
などから作成 - 実際の結果との比較には、spark-testing-base が提供する assertion を使用
といった流れです。
実行
sbt test
で実行します。詳細は ScalaTest のドキュメントを参照して下さい。
その他、細かい情報
Hive support
Spark には、Hive support という機能があり、それを使うと Hive のテーブルを読み書きしたりできます。(詳細は Spark のドキュメントを参照。)
ローカルのテストを実行する環境によっては、この機能が有効となっていると、以下のようなエラーが発生する可能性があります。
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':
(略)
Cause: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x
原因としては、以下の Stack Overflow にある通り色々なケースがあるようです。
Spark 2.1 – Error While instantiating HiveSessionState – Stack Overflow
ただ、テストコードの中で Hive support の機能を使う必要が無ければ、これを無効にすることが出来ます。
方法としては DataFrameSuiteBaseLike#enableHiveSupport
を override して false を返すようにすれば良いです。具体的には以下のようにします。
class FooTest extends FunSuite with DataFrameSuiteBase {
override protected implicit def enableHiveSupport: Boolean = false
test("test foo") {
// :
// :
}
}
その他のクラスの使い方は Wiki を
上の例では DataFrameSuiteBase
を使いましたが、それと同様に DatasetSuiteBase
というのもあります。また、stream を対象とした StreamingSuiteBase
もあります。
また、上の例のように List などからテストデータを作らず、簡単に作成してくれる XxxGenerator
というのもありますので、テストの種類に応じて仕様を検討してみて下さい。
詳しくは公式の wiki を参照。
Home · holdenk/spark-testing-base Wiki
まとめ
spark-testing-base を使うと、Spark のデータ処理に対するテストが比較的簡単に書けます。
Spark では大規模なデータを扱うことが多いので、本番稼働が始まってからデータの不備やエッジケースなどで予期せぬ挙動やシステム停止が起きると、小規模なシステムに比べて影響が大きいです。テストを書くことで、そうしたリスクを減らすことができます。
コメントを残す