Sparkで処理したDataFrameをファイルとしてディスクに書き出す際、通常ファイルはパーティションの数分作成されます。
val df = Seq(("Taro", 25, "Male"), ("Jiro", 19, "Male"), ("Hanako", 29, "Female")).toDF("Name", "Age", "Sex")
df.rdd.getNumPartitions // =3
df.write.option("header", "true").csv("/tmp/spark_test/people")
上記の df
はパーティション数が3つなので、 /tmp/spark_test/people
は次のように3ファイルが生成されます:
/tmp
spark_test/
people/
_SUCCESS # これはメタデータファイル
part-00001-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv
part-00000-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv
part-00002-061868aa-5ec3-4c8d-bf80-3c9eda62f637-c000.csv
CSVのように、Sparkではない別のシステムでも使うようなファイルの場合は、単一のファイルの方が好都合な場合があります。
そこで今回は、DataFrameを単一のファイルに書き出す方法を2つ紹介します。
DataFrameを1つのパーティションに集める
colease
または repartition
を使ってDataFrameを1つのパーティションに集めてから書き出します:
val singlePartitionedDf = df.coalesce(1)
singlePartitionedDf.rdd.getNumPartitions // =1
singlePartitionedDf.write
.option("header", "true")
.mode("overwrite")
.csv("/tmp/spark_test/people")
ファイルは次の通りです:
/tmp
spark_test/
people/
_SUCCESS # これはメタデータファイル
part-00000-c66cd457-8b41-4102-b000-138291dd351d-c000.csv
※colease
と repartition
の違いは このStackOverflow が詳しいです。
Pros
- 書き出しの際に余分なファイルを生成しない
- どのフォーマットでも使える
Cons
- 指定したファイル名で保存したい場合、リネームの処理の実装が必要になる
- DataFrameのサイズが膨大な場合、1つのパーティション (Executor) にデータが乗り切らない場合がある
- Sparkはオンメモリで動作する
FileUtils.copyMerge等を使ってファイルを結合する
import org.apache.hadoop.fs.{FileUtil, Path}
df.write
.option("header", "true")
.mode("overwrite")
.csv("/tmp/spark_test/people")
val src = new Path("/tmp/spark_test/people")
val dest = new Path("/tmp/spark_test/people.csv")
val conf = spark.sparkContext.hadoopConfiguration
val destFs = dest.getFileSystem(conf)
destFs.delete(dest, true) // 既に同名のファイルが有ると IOException がスローされるので削除しておく
FileUtil.copyMerge(src.getFileSystem(conf), src, destFs, dest, false /* 結合後にソースファイルを削除したい場合は true */, conf, null)
/tmp/spark_test/people.csv
が生成されます。
但し、CSVファイルの場合、上記のコードではうまくいきません。何故なら、今回はヘッダーを出力しているのですが、一旦は3ファイルにそれぞれヘッダーが書き出される為、そのまま結合するとヘッダー行が複数混在してしまう為です。
なので、一工夫が必要です:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
val dataDf = df.select(df.columns.map(c => df.col(c).cast(StringType)): _*)
val headerDf = spark.createDataFrame(Seq(Row.fromSeq(dataDf.columns.toSeq)).asJava, dataDf.schema)
headerDf
.union(dataDf)
.write
.option("header", "false")
.mode("overwrite")
.csv("/tmp/spark_test/people")
ヘッダー行として、列の名前を行データとして持つDataFrameと、元のDataFrameをunionで結合しています。但し、ヘッダー行はすべての列が文字列の為、元のDataFrameの全ての列を文字列型にキャストする必要があります。
この後、先程の copyMerge
の処理をすればOKです。処理が成功すると、/tmp/spark_test/people.csv
にファイルが生成されます:
Name,Age,Sex
Taro,25,Male
Jiro,19,Male
Hanako,29,Female
Spark 3.x では使えなくなる
Spark 3.0で copyMerge
は FileUtil
クラスから削除されました。この為、Spark 3.xで使いたい場合は自前で実装する必要がありますが、実装自体はシンプルなので問題ないかと思います。
Pros
- 指定したファイル名で単一のファイルが書き出せる
- DataFrameの書き出しはパーティショニングされているので、メモリの心配は無い
Cons
- 一時的にパーティショニングされたデータを書き出す必要があるので、ストレージの容量を余分に使う
- Spark 3.xでは自前で実装する必要がある
- CSVでヘッダー行を出したい場合は事前にDataFrameを加工する必要がある
- Parquetなどのバイナリフォーマットでは使えない
おわり
以上、今回はSparkのDataFrameを1つのファイルに書き出す方法を2つ紹介しましたが、プロジェクトのユースケースに合わせて使い分けると良いかと思います。