How to read the column having in json format? While reading data from any source like csv
, json
, parquet
, kafka
or any other data source we might have a column type of String but contains JSON
in it, So how should we get this data in structured way.
Lets take a sample data which contains JSON
data as one field as below.
[ { "id": "0001", "json_string": "{\"head\": {\"column1\":\"value1\",\"column2\":\"value2\"},\"body\": [{\"column3\":\"value3\",\"column4\":\"value4\",\"column5\":\"value5\"}]}" }, { "id": "0002", "json_string": "{\"head\": {\"column1\":\"value11\",\"column2\":\"value22\"},\"body\": [{\"column3\":\"value33\",\"column4\":\"value44\",\"column5\":\"value55\"}]}" } ]
If you simply read this data you get two columns as
val df = spark.read .option("multiline", true) .json(jsonPath) df.show(false) +----+---------------------------------------------------------------------------------------------------------------------------+ |id |json_string | +----+---------------------------------------------------------------------------------------------------------------------------+ |0001|{"head": {"column1":"value1","column2":"value2"},"body": [{"column3":"value3","column4":"value4","column5":"value5"}]} | |0002|{"head": {"column1":"value11","column2":"value22"},"body": [{"column3":"value33","column4":"value44","column5":"value55"}]}| +----+---------------------------------------------------------------------------------------------------------------------------+
So the one way to extract the JSON
fields to the top level is to create a Schema for that as below
val schema = StructType(Seq(
StructField("head", StructType(Seq(
StructField("column1", StringType, nullable = false),
StructField("column2", StringType, nullable = false)
))),
StructField("body", ArrayType(StructType(Seq(
StructField("column3", StringType, nullable = false),
StructField("column4", StringType, nullable = false),
StructField("column5", StringType, nullable = false)
)))),
))
Now you can use function from_json
with the schema to read convert that String value to Structured value.
df.withColumn("json_string", from_json(col("json_string"), schema))
.select("id", "json_string.*").show(false)
This will give
+----+------------------+-----------------------------+ |id |head |body | +----+------------------+-----------------------------+ |0001|{value1, value2} |[{value3, value4, value5}] | |0002|{value11, value22}|[{value33, value44, value55}]| +----+------------------+-----------------------------+ And the structure looks like below, now you can use any nested fields easily root |-- id: string (nullable = true) |-- body: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- column3: string (nullable = true) | | |-- column4: string (nullable = true) | | |-- column5: string (nullable = true) |-- head: struct (nullable = true) | |-- column1: string (nullable = true) | |-- column2: string (nullable = true)
The easier way to generate schema for this case is
You can select the field and read as JSON
string and get the schema.
val json_schema = spark.read.json(df.select("json_string").as[String]).schema
val df2 = df.withColumn("json_string", from_json(col("json_string"), json_schema))
.select("id", "json_string.*")
This will give you the schema of field “json_schema
” and can be used to read JSON
field json_string
and still get the same output.
+----+------------------+-----------------------------+ |id |head |body | +----+------------------+-----------------------------+ |0001|{value1, value2} |[{value3, value4, value5}] | |0002|{value11, value22}|[{value33, value44, value55}]| +----+------------------+-----------------------------+