Spark 2.x では、 DataFrame を JSON に書き出す際、値が null のデータは失われます。
次のコードを見てみましょう:
case class Person(name: String, age: Option[Int])
val people: Dataset[Person] = Seq(
Person("Sabrina Carpenter", Some(22)),
Person("Olivia Rodrigo", None)
).toDF.as[Person]
定義した Person
の age
はオプショナルにしており、作成した DataFrame の 2 番目のインスタンスでは None
を渡しています。この DF を出力してみます:
(Spark console)
scala> people.show
+-----------------+----+
| name| age|
+-----------------+----+
|Sabrina Carpenter| 22|
| Olivia Rodrigo|null|
+-----------------+----+
では、 JSON ファイルに書き出してみます:
people.repartition(1).write.json("/tmp/people")
出力は次のようになります:
# /tmp/people/part-00000-780df354-920a-41e5-b799-5b954941ae1b-c000.json
{"name":"Sabrina Carpenter","age":22}
{"name":"Olivia Rodrigo"}
この通り、2 番目の要素 (“Olivia Rodrigo”) では age
が null である為失われてしまいました。このデータを再度 Spark で読み込む分には問題がありませんが、今回これを使うシステムでは nullable なフィールドであっても指定が必須であった為、この問題に対処する必要がありました。
※ Spark 3.x 以降では spark.sql.jsonGenerator.ignoreNullFields
と言うオプションができ、これに false
を指定するだけで NULL を書き出す事ができます。具体的には spark.write.option("ignoreNullFields", "false").json(...)
の様にして使います。
対処方法
もの凄い力技ですが、 DataFrame の各行について、 Person
オブジェクトの JSON 表現 (String
型) へのシリアライズを自前で行い、 JSON ではなく Text Writer を使ってプレーンテキストで書き込むと言う手法を取りました。
コードにするとこんな感じになります:
def serializePersonToJson(p: Person): String =
// お好きな JSON シリアライザをお使い下さい。この例では json4s を使います
org.json4s.jackson.Serialization.write(p)(org.json4s.DefaultFormats.preservingEmptyValues)
val jsonSerializedPeople: Dataset[String] = people.map(serializePersonToJson(_))
では DF の中身を出力してみます:
(Spark console)
scala> jsonSerializedPeople.show
+-------------------------------------+
|value |
+-------------------------------------+
|{"name":"Sabrina Carpenter","age":22}|
|{"name":"Olivia Rodrigo","age":null} |
+-------------------------------------+
“Olivia Rodrigo” の age
が残された形で JSON 化できました。最後にプレーンテキストとしてファイルに書き出してみます:
jsonSerializedPeople.repartition(1).write.mode("overwrite").text("/tmp/people")
中身を見てみます:
# /tmp/people/part-00000-8994f145-65b3-4f92-9e4c-cdc876b4aa37-c000.txt
{"name":"Sabrina Carpenter","age":22}
{"name":"Olivia Rodrigo","age":null}
こちらでも無事出力を確認できました。
※ちなみに今回使うシステムでは出力される JSON ファイルは単一である必要があるので、以前書いた SparkでDataFrameの内容を単一のファイルに保存する を併せて使っています。