Reading JSON String column in Apache Spark?.

How to read the column having in json format? While reading data from any source like csv, json, parquet, kafka or any other data source we might have a column type of String but contains JSON in it, So how should we get this data in structured way.

Lets take a sample data which contains JSON data as one field as below.

[
  {
    "id": "0001",
    "json_string": "{\"head\": {\"column1\":\"value1\",\"column2\":\"value2\"},\"body\": [{\"column3\":\"value3\",\"column4\":\"value4\",\"column5\":\"value5\"}]}"
  },
  {
    "id": "0002",
    "json_string": "{\"head\": {\"column1\":\"value11\",\"column2\":\"value22\"},\"body\": [{\"column3\":\"value33\",\"column4\":\"value44\",\"column5\":\"value55\"}]}"
  }
]

If you simply read this data you get two columns as

val df = spark.read
  .option("multiline", true)
  .json(jsonPath)

df.show(false)

  +----+---------------------------------------------------------------------------------------------------------------------------+
  |id  |json_string                                                                                                                |
  +----+---------------------------------------------------------------------------------------------------------------------------+
  |0001|{"head": {"column1":"value1","column2":"value2"},"body": [{"column3":"value3","column4":"value4","column5":"value5"}]}     |
  |0002|{"head": {"column1":"value11","column2":"value22"},"body": [{"column3":"value33","column4":"value44","column5":"value55"}]}|
  +----+---------------------------------------------------------------------------------------------------------------------------+

So the one way to extract the JSON fields to the top level is to create a Schema for that as below

val schema = StructType(Seq(
StructField("head", StructType(Seq(
StructField("column1", StringType, nullable = false),
StructField("column2", StringType, nullable = false)
))),
StructField("body", ArrayType(StructType(Seq(
StructField("column3", StringType, nullable = false),
StructField("column4", StringType, nullable = false),
StructField("column5", StringType, nullable = false)
)))),
))

Now you can use function from_json with the schema to read convert that String value to Structured value.

df.withColumn("json_string", from_json(col("json_string"), schema))
.select("id", "json_string.*").show(false)

This will give

+----+------------------+-----------------------------+
|id  |head              |body                         |
+----+------------------+-----------------------------+
|0001|{value1, value2}  |[{value3, value4, value5}]   |
|0002|{value11, value22}|[{value33, value44, value55}]|
+----+------------------+-----------------------------+

And the structure looks like below, now you can use any nested fields easily

root
|-- id: string (nullable = true)
|-- body: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- column3: string (nullable = true)
|    |    |-- column4: string (nullable = true)
|    |    |-- column5: string (nullable = true)
|-- head: struct (nullable = true)
|    |-- column1: string (nullable = true)
|    |-- column2: string (nullable = true)

The easier way to generate schema for this case is
You can select the field and read as JSON string and get the schema.

val json_schema = spark.read.json(df.select("json_string").as[String]).schema

val df2 = df.withColumn("json_string", from_json(col("json_string"), json_schema))
.select("id", "json_string.*")

This will give you the schema of field “json_schema” and can be used to read JSON field json_string and still get the same output.

+----+------------------+-----------------------------+
|id  |head              |body                         |
+----+------------------+-----------------------------+
|0001|{value1, value2}  |[{value3, value4, value5}]   |
|0002|{value11, value22}|[{value33, value44, value55}]|
+----+------------------+-----------------------------+

How to calculate a sum of ArrayType Column in Apache Spark

Here is the different way to calculate the sum of an ArrayType Column in Apache Spark, You can use this to do any other type of aggregation too

Sample Dataframe:

val df = Seq(
  (1, List(35, 25)),
  (2, List(35, 25, 40)),
  (3, List(35, 25, 140)),
).toDF("id", "score")

For Spark 3.0+

For spark 3.0+ you can use aggregate function to calculate the sum as below 

df.withColumn("totalScore", aggregate($"score", lit(0), (x, y) => (x + y)))
  .show(false)

Output:

+---+-------------+----------+
|id |score        |totalScore|
+---+-------------+----------+
|1  |[35, 25]     |60        |
|2  |[35, 25, 40] |100       |
|3  |[35, 25, 140]|200       |
+---+-------------+----------+

For Spark 2.4.0+

The higher-order functions are supported from the version of spark 2.4.0, this helps to do a lot of complex operations with the collection data types.

Here is an example how you can user aggregate 

df.withColumn("totalScore", expr("aggregate (score, 0, (acc, val) -> acc + val) "))
  .show(false)

Output:

+---+-------------+----------+
|id |score        |totalScore|
+---+-------------+----------+
|1  |[35, 25]     |60        |
|2  |[35, 25, 40] |100       |
|3  |[35, 25, 140]|200       |
+---+-------------+----------+

For Earlier Version of spark.

There are many ways to calculate, I will show some ways to do it using Dataset.

df.as[(Int, Seq[Int])]
  .map({ case (id, scores) => (id, scores, scores.sum) })
  .toDF("id", "score", "totalScore")
  .show(false)
//Or
df.map(row => {
  (
    row.getInt(0),
    row.getAs[Seq[Int]](1),
    row.getAs[Seq[Int]](1).sum
  )
}).toDF("id", "totalScore")
  .show(false)

Output:

+---+-------------+----------+
|id |score        |totalScore|
+---+-------------+----------+
|1  |[35, 25]     |60        |
|2  |[35, 25, 40] |100       |
|3  |[35, 25, 140]|200       |
+---+-------------+----------+

This might not be as efficient as above aggregation, Since there won’t be any optimisations

You can also use UDF if you have complex aggregation logic but is not highly recommended.

Apache Spark Functions: transform, transform_keys, transform_values functions in spark3

Spark Functions: transform, transform_keys, transform_values functions in spark3

There are many higher-order functions added in the new version of spark. Among them transform function is one which can be very handy when it comes to doing some transformation with ArrayType columns.

def transform(column: Column, f: (Column) ⇒ Column): Column

This simply takes column to be transformed which needs to be ArrayType and function to transform from Column to Column.

def transform(column: Column, f: (Column, Column) ⇒ Column): Column

It takes a column that is ArrayType and a function as (col, index) =< transformed_col

Here is a example of how we can use transform to add the index in ArrayType column.

val df = Seq(
  ("1233", 2, Array(4659, 53540, 7890)),
  ("1234", 1, Array(7232, 7840, 8396)),
  ("1235", 4, Array(999)),
  ("1236", 1, Array(78650, 4829, 3000)),
).toDF("key1", "key2”, "code")
df.withColumn("code", transform($"code", (x, i) => struct(x, i)))

//Now the index is added in each element of array in column “code”
Output:
+----+----+----------------------------------+
|key1|key2|code                              |
+----+----+----------------------------------+
|1233|2   |[[4659, 0], [53540, 1], [7890, 2]]|
|1234|1   |[[7232, 0], [7840, 1], [8396, 2]] |
|1235|4   |[[999, 0]]                        |
|1236|1   |[[78650, 0], [4829, 1], [3000, 2]]|
+----+----+----------------------------------+

For the same thing you could have done this in spark2.4+

df.withColumn("code", expr("transform(code, (x,i) -> (x,i) )"))

And for previous versions you had to write udf to achieve the same thing.

val addIndex = udf((arr: Seq[Int]) => arr.zipWithIndex)
df.withColumn("code", addIndex($"code")).show(false)

The transform function is a really powerful function if we need to do some operation type column.

Similarly for the MapTypes column there is transform_keys and transform_values

//Input Data
val df = Seq(
  ("1233", 2, Map(1 -> "two", 2 -> "three")),
  ("1234", 1, Map(3 -> "four")),
  ("1235", 4, Map(5 -> "six")),
  ("1236", 1, Map(6 -> "seven", 4 -> "five")),
).toDF("key1", "key2", "code")

Add 1 to all the keys in map 
df.withColumn("code", transform_keys($"code", (k, v) => k + 1)).show(false)

+----+----+-----------------------+
|key1|key2|code                   |
+----+----+-----------------------+
|1233|2   |[2 -> two, 3 -> three] |
|1234|1   |[4 -> four]            |
|1235|4   |[6 -> six]             |
|1236|1   |[7 -> seven, 5 -> five]|
+----+----+-----------------------+


Change the values to upper case in map.
df.withColumn("code", transform_values($"code", (k, v) => upper(v))).show(false)

+----+----+-----------------------+
|key1|key2|code                   |
+----+----+-----------------------+
|1233|2   |[1 -> TWO, 2 -> THREE] |
|1234|1   |[3 -> FOUR]            |
|1235|4   |[5 -> SIX]             |
|1236|1   |[6 -> SEVEN, 4 -> FIVE]|
+----+----+-----------------------+


How to read and write data from MongoDB with Spark3

Make sure you have spark3 running on cluster or locally.

Running MongoDB in docker container:

docker run -d -p 27017:27017 --name "mongo" -v ~/data:/data/db mongo

Go inside the docker container and add some data to test 

docker exec -it mongo mongo

And Insert some sample data 

db.products.insertMany([
    { "_id" : 10, "item" : "large box", "qty" : 20 },
    { "_id" : 11, "item" : "small box", "qty" : 55 },
    { "_id" : 12, "item" : "envelope", "qty" : 100 },
    { "_id" : 13, "item" : "tape", "qty" : 20 },
    { "_id" : 14, "item" : "bubble wrap", "qty" : 30 }
])

Spark Shell

If you want to use spark-shell to test the mongoDB connection and play with some spark functions 

./spark-shell --conf "spark.mongodb.input.uri=mongodb://localhost/test.products?readPreference=primaryPreferred" \
                  --conf "spark.mongodb.output.uri=mongodb://localhost/test.products" \
                  --packages org.mongodb.spark:mongo-spark-connector_2.12:2.4.2

And import MongoSpark and load the collection 

scala> import com.mongodb.spark.MongoSpark
import com.mongodb.spark.MongoSpark

scala> val data = MongoSpark.load(spark).toDF()
data: org.apache.spark.sql.DataFrame = [_id: double, item: string ... 1 more field]

scala> data.show(false)
+----+-----------+-----+
|_id |item       |qty  |
+----+-----------+-----+
|10.0|large box  |20.0 |
|11.0|small box  |55.0 |
|12.0|envelope   |100.0|
|13.0|tape       |20.0 |
|14.0|bubble wrap|30.0 |
+----+-----------+-----+

If You are writing a program using sbt or maven include the dependencies as below 

For SBT

scalaVersion := "2.12.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"

For Maven

<dependencies>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.0.0</version>
  </dependency>

  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
  </dependency>

  <dependency>
    <groupId>org.mongodb.spark</groupId>
    <artifactId>mongo-spark-connector_2.12</artifactId>
    <version>2.4.2</version>
  </dependency>
</dependencies>

Create Scala bbject TestSparkMongo as below:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession

object TestSparkMongo extends App {
  val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri","mongodb://localhost/test.products")
    .config("spark.mongodb.output.uri","mongodb://localhost/test.prod")
    .getOrCreate()

    import spark.implicits._
    val df = MongoSpark.load(sparkSession = spark).toDF()

    df.show(false)

    //To write the dataframe as a collection config from sparkSession
    MongoSpark.save(df)

    //To write the dataframe as a collection by passing dataframewriter
    MongoSpark.save(df.write.option("collection", "prod2").mode("overwrite").format("mongo"))
}

Output:

+----+-----------+-----+
|_id |item       |qty  |
+----+-----------+-----+
|10.0|large box  |20.0 |
|11.0|small box  |55.0 |
|12.0|envelope   |100.0|
|13.0|tape       |20.0 |
|14.0|bubble wrap|30.0 |
+----+-----------+-----+

You can check the output by going to MongoDB by db.test.prod.find() and you will see the output as 

{ "_id" : 10, "item" : "large box", "qty" : 20 }
{ "_id" : 11, "item" : "small box", "qty" : 55 }
{ "_id" : 12, "item" : "envelope", "qty" : 100 }
{ "_id" : 13, "item" : "tape", "qty" : 20 }
{ "_id" : 14, "item" : "bubble wrap", "qty" : 30 }

Here you can find more configs that you can use while read and write 

https://docs.mongodb.com/spark-connector/master/configuration/

How to rename multiple columns of Dataframe in Spark Scala?

How to rename multiple columns of dataframe in Spark scala/Sql
Create an entry point as SparkSession object as

val spark = SparkSession
  .builder()
  .appName("Test")
  .master("local[*]")
  .getOrCreate()
import spark.implicits._

Sample data for demo

val df = Seq(
  ("Airi", "Satou", "Accountant", "Tokyo", "28th Nov 08"),
  ("Angelica", "Ramos", "Chief Executive Officer (CEO)", "London", "9th Oct 09"),
  ("Ashton", "Cox", "Junior Technical Author", "San Francisco", "12th Jan 09"),
  ("Bradley", "Greer", "Software Engineer", "London", "13th Oct 12"),
  ("Brenden", "Wagner", "Software Engineer", "San Francisco", "7th Jun 11"),
  ("Brielle", "Williamson", "Integration Specialist", "New York", "2nd Dec 12"),
  ("Bruno", "Nash", "Software Engineer", "London", "3rd May 11"),
  ("Caesar", "Vance", "Pre-Sales Support", "New York", "12th Dec 11"),
  ("Cara", "Stevens", "Sales Assistant", "New York", "6th Dec 11"),
  ("Cedric", "Kelly", "Senior Javascript Developer", "Edinburgh", "29th Mar 12")
).toDF("Name", "Position", "Office", "Age", "StartDate")

One way is to use toDF method to if you have all the columns name in same order as in original order.

df.toDF(columnsName : _*)

Let finalColName be the final column names that we want and se zip to create a list as (oldColumnName, newColName)

val finalColName = List("name", "position", "office", "age", "start_date")
val columnsRenamed = df.schema.fieldNames.zip(finalColName)

Or create a map with both column names

val list = Map("Name" -> "name", "Position" -> "position", "Office" ->; "office", "Age" -> "age", "StartDate" -> "startDate")

You can either use columnsRenamed or list as your preferences

One of the way is to use foldleft function available list takes a default value in this case a dataframe df

and iterate through the list and contains the temporary result as acc and the head of the list

val result = list.foldLeft(df){(acc, names ) =>
  acc.withColumnRenamed(names._1, names._2)
}

// Here is the renamed columns

result.show(false)
+--------+---------+-----------+--------+----------+
|name    |position |office     |age     |startDate |
+--------+---------+-----------+--------+----------+

The best way is to use the select, If you need to select only some columns and rename it this is the another option

val result1 = df.select(list.map(x => col(x._1).alias(x._2)).toList : _*)

Here is the renamed columns with select

result1.show(false)
+--------+---------+-----------+--------+----------+
|name    |position |office     |age     |startDate |
+--------+---------+-----------+--------+----------+

How to enable Spark History Serverin Standalone mode?

Eenable Spark History Serverin Standalone mode

By default, the spark history server is not enabled, which provides the application history from the event logs. It lists all the incomplete and completes an application with all the details, which is very important to debug and analyze the application. Generally, without history server, only the details of running an application can be seen. with history server enabled all the application that has been in past can be view in Web UI.

  1.  To Enable history make sure $SPARK_HOME and $JAVA_HOME environment variable is set to Spark and JRE directory
  2. Open the file spark-default.conf file from $SPARK_HOME/conf/ and edit the following
    spark.eventLog.enabled true (this must be enabled)
    spark.eventLog.dir file:/path to history log directory default path is file:///tmp/spark-events
    spark.eventLog.compress true (optional parameter for compressing the logged events.)
    spark.history.fs.logDirectory file:/path to history log directory This path should be set as spark.eventLog.dir 
  3.  Now start the history server with following scrip
    $SPARK_HOME/sbin/start-history-server.sh

    Browse the site localhost:18080 You should see the spark history web U

  4. To stop history server use
    $SPARK_HOME/sbin/stop-history-server.sh

 

How to read data from Azure Blob Storage with Apache Spark

This article describes the on how to read the files from Amazon blob storage with Apache Spark with a simple example.

For this example, I have used Spark 2.11.8, sbt 0.13, and Spark 2.2.0

The dependencies used for the example are

For SBT

"org.apache.spark" %% "spark-core" % "2.2.0"
"org.apache.spark" %% "spark-sql" % "2.2.0"
"org.apache.hadoop" % "hadoop-azure" % "2.7.3"

For Maven



org.apache.spark
spark-core_2.11
2.2.0


org.apache.spark
spark-sql_2.11
2.2.0


org.apache.hadoop
hadoop-azure
2.7.0


To read the files from blob storage you need to define the file system to be used in the underlying Hadoop configurations.

 

import org.apache.spark.sql.SparkSession

object ReadFilesFromAzureStorage extends App {
  val spark = SparkSession.builder().appName("read azure storage").master("local[*]").getOrCreate()

  spark.sparkContext.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
  spark.sparkContext.hadoopConfiguration.set("fs.azure.account.key.yourAccount.blob.core.windows.net", "yourKey ")

  val baseDir = "wasb[s]://BlobStorageContainer@yourUser.blob.core.windows.net/"

  //Read Parquet
  val dfParquet = spark.read.parquet(baseDir + "pathToParquetFile")

  //Read CSV
  val dfParquet = spark.read.option("header", true).csv(baseDir + "pathToCsvFile")

  //Read text files
  val dfParquet = spark.read.textFile(baseDir + "pathToTextFile")

  dfParquet.select("payload.*").show()

}

Hope this helps!