Option(Scala)の実用的な使い方 − データのマージ処理

Option(Scala)の実用的な使い方 − データのマージ処理

いまいち使いどころを理解できていなかったScalaのOptionですが、データのマージ処理を実装した際に、割と理解しやすいコードが書けたと感じましたのでざっくりとですがご紹介します。

環境

  • Scala 2.11.12
  • spark 2.4.3
  • AWS EMR 5.26.0

方針

マージを行うメイン処理がデータ取得する際に呼び出す関数の戻り値をDataset[Row]型ではなくOption[Dataset[Row]]型にすることで仕様変更に対応しました。これにより戻り値としてOption.empty[Dataset[Row]]を返せるようになるのですが、こうすることでどんなメリットがあるのかについては例を用いて説明したいと思います。

修正前の処理

以下のようなデータA,B,Cをマージするという処理において、result_b.csv、result_c.csvの2ファイルは必ず存在する前提で、なかった場合はExceptionを発生させて異常終了させるという仕様でしたので、関数readDataSetB、readDataSetCは直接Dataset[Row]型を返却していました。

  def mergeDataSet(): Unit = {
    val dataSetA = readDataSetA

    // 同じスキーマを持ったデータA,B,Cをマージ
    val mergedDataSet = List(readDataSetB, readDataSetC)
      .foldLeft(dataSetA)(_ union _)

    // マージしたデータをCSVとしてS3に出力
    fileIo.writeToS3(spark, mergedDataSet, s"${fileIo.rootPath}/merged_data/${configurations.partition}/merged_data.csv")
  }

  def readDataSetB: Dataset[Row] = {
      val schema = StructType($"result_id".string :: $"result_date".string)
      spark.read.schema(schema)
        .csv(s"s3://$bucket/result/$partition/result_b.csv")
  }

  def readDataSetC: Dataset[Row] = {
      val schema = StructType($"result_id".string :: $"result_date".string)
      spark.read.schema(schema)
        .csv(s"s3://$bucket/result/$partition/result_c.csv")
  }

しかし、後からresult_c.csvがない場合、処理を続行してしまって良いという仕様に変わったため、Exceptionをcatchしてエラーにならないようにすることにしました。

ここでJavaであればDataset[Row]型の戻り値をnullで返したりするのでしょうが、Scalaではnullを扱わなくても済むようにOption型が存在しますので、これで対応します。

修正後の処理

変更の主な内容はtry~catch、Option()の追加ですが、6行目に追加したflatten関数がとても重要です。Option()の中の型で値を取り出してくれる関数なのですが、Option.empty[Dataset[Row]]だった場合はデータがないということでリストから削除してくれます。

つまり、result_c.csvがない場合、データA,BがマージされデータCはないものとしてリスト処理されます。

  def mergeDataSet(): Unit = {
    val dataSetA = readDataSetA

    // 同じスキーマを持ったデータA,B,Cをマージ
    val mergedDataSet = List(readDataSetB, readDataSetC)
      .flatten
      .foldLeft(dataSetA)(_ union _)

    // マージしたデータをCSVとしてS3に出力
    fileIo.writeToS3(spark, mergedDataSet, s"${fileIo.rootPath}/merged_data/${configurations.partition}/merged_data.csv")
  }

  def readDataSetB: Option[Dataset[Row]] = {
      val schema = StructType($"result_id".string :: $"result_date".string)
      Option(
        spark.read.schema(schema)
          .csv(s"s3://$bucket/result/$partition/result_b.csv")
      )
  }

  def readDataSetC: Option[Dataset[Row]] = {
      try {
        val schema = StructType($"result_id".string :: $"result_date".string)
        Option(
          spark.read.schema(schema)
            .csv(s"s3://$bucket/result/$partition/result_c.csv")
        )
      } catch {
        case e: AnalysisException => Option.empty[Dataset[Row]]
      }
  }

Option()とflatten関数を組み合わせることでシンプルで分かりやすいメソッドチェーンを組むことができる1つの例だと思います。

まとめ

Optionすごく便利ですが、そのメリットが非常に伝えずらい機能だなとも思います。新しい使い方を開拓してご報告できそうなら、また書きます。

we are hiring

優秀な技術者と一緒に、好きな場所で働きませんか

株式会社もばらぶでは、優秀で意欲に溢れる方を常に求めています。働く場所は自由、働く時間も柔軟に選択可能です。

現在、以下の職種を募集中です。ご興味のある方は、リンク先をご参照下さい。