いまいち使いどころを理解できていなかったScalaのOptionですが、データのマージ処理を実装した際に、割と理解しやすいコードが書けたと感じましたのでざっくりとですがご紹介します。
環境
- Scala 2.11.12
- spark 2.4.3
- AWS EMR 5.26.0
方針
マージを行うメイン処理がデータ取得する際に呼び出す関数の戻り値をDataset[Row]型ではなくOption[Dataset[Row]]型にすることで仕様変更に対応しました。これにより戻り値としてOption.empty[Dataset[Row]]を返せるようになるのですが、こうすることでどんなメリットがあるのかについては例を用いて説明したいと思います。
修正前の処理
以下のようなデータA,B,Cをマージするという処理において、result_b.csv、result_c.csvの2ファイルは必ず存在する前提で、なかった場合はExceptionを発生させて異常終了させるという仕様でしたので、関数readDataSetB、readDataSetCは直接Dataset[Row]型を返却していました。
def mergeDataSet(): Unit = {
val dataSetA = readDataSetA
// 同じスキーマを持ったデータA,B,Cをマージ
val mergedDataSet = List(readDataSetB, readDataSetC)
.foldLeft(dataSetA)(_ union _)
// マージしたデータをCSVとしてS3に出力
fileIo.writeToS3(spark, mergedDataSet, s"${fileIo.rootPath}/merged_data/${configurations.partition}/merged_data.csv")
}
def readDataSetB: Dataset[Row] = {
val schema = StructType($"result_id".string :: $"result_date".string)
spark.read.schema(schema)
.csv(s"s3://$bucket/result/$partition/result_b.csv")
}
def readDataSetC: Dataset[Row] = {
val schema = StructType($"result_id".string :: $"result_date".string)
spark.read.schema(schema)
.csv(s"s3://$bucket/result/$partition/result_c.csv")
}
しかし、後からresult_c.csvがない場合、処理を続行してしまって良いという仕様に変わったため、Exceptionをcatchしてエラーにならないようにすることにしました。
ここでJavaであればDataset[Row]型の戻り値をnullで返したりするのでしょうが、Scalaではnullを扱わなくても済むようにOption型が存在しますので、これで対応します。
修正後の処理
変更の主な内容はtry~catch、Option()の追加ですが、6行目に追加したflatten関数がとても重要です。Option()の中の型で値を取り出してくれる関数なのですが、Option.empty[Dataset[Row]]だった場合はデータがないということでリストから削除してくれます。
つまり、result_c.csvがない場合、データA,BがマージされデータCはないものとしてリスト処理されます。
def mergeDataSet(): Unit = {
val dataSetA = readDataSetA
// 同じスキーマを持ったデータA,B,Cをマージ
val mergedDataSet = List(readDataSetB, readDataSetC)
.flatten
.foldLeft(dataSetA)(_ union _)
// マージしたデータをCSVとしてS3に出力
fileIo.writeToS3(spark, mergedDataSet, s"${fileIo.rootPath}/merged_data/${configurations.partition}/merged_data.csv")
}
def readDataSetB: Option[Dataset[Row]] = {
val schema = StructType($"result_id".string :: $"result_date".string)
Option(
spark.read.schema(schema)
.csv(s"s3://$bucket/result/$partition/result_b.csv")
)
}
def readDataSetC: Option[Dataset[Row]] = {
try {
val schema = StructType($"result_id".string :: $"result_date".string)
Option(
spark.read.schema(schema)
.csv(s"s3://$bucket/result/$partition/result_c.csv")
)
} catch {
case e: AnalysisException => Option.empty[Dataset[Row]]
}
}
Option()とflatten関数を組み合わせることでシンプルで分かりやすいメソッドチェーンを組むことができる1つの例だと思います。
まとめ
Optionすごく便利ですが、そのメリットが非常に伝えずらい機能だなとも思います。新しい使い方を開拓してご報告できそうなら、また書きます。
ピンバック: itemprop="name">(Scala) List[Option[A]]#flattenで起きる事 | もばらぶエンジニアブログ