Apache Spark (PySpark) with JSON and Hadoop (Code Practice Part 2)

Tariqul Islam
7 min readNov 15, 2023

In this code practice, I will discuss about the several functions about spark sql json function, which support to read json file and field from data source, write dataset to file by spark and save into local disk for further analysis. Those function are include from_json , to_json , get_json_object and json_tuple . Also discuss about different approach about how to dill with json and json related data

Prerequisite

  1. Java (JDK)
  2. Debian
  3. Hadoop and Yarn
  4. Spark

You can follow to install spark and hadoop into your system by following article

Create application structure

> mkdir -p spark-json-example
> cd spark-json-example
> mkdir -p data src

Project Structure

spark-json-example
|-- data
| |-- *.json
| |-- *.csv
|-- src
| |--*.py

Start Spark Standalone

> ${HADOOP_DIR}/sbin/start-all.sh
> ${SPARK_DIR}/sbin/start-master.sh
> ${SPARK_DIR}/sbin/start-worker.sh spark://localhost:7077

Datasource

Explanation of Handling JSON functions

Using from_json function to read json field code are followin below src/test-json-from_json.py


from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

APP_DIR="/home/hadoop/spark-json-example"

class ReadCSVJson:

def read(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("ReadCSVJsonData")\
.getOrCreate()

# Read the csv file from data folder
# add the schema option inferSchema True, It will create schema
# from data source
df=spark.read.format('csv')\
.option("inferSchema", True)\
.option('header', True)\
.load(f"file://{APP_DIR}/data/flim.csv")
df.show()
df.printSchema()

# create the schema to read the json text field from flim
# dataframe using struct type and struct field
schema = T.StructType(
[
T.StructField('Title', T.StringType(), True),
T.StructField('Year', T.StringType(), True),
T.StructField('Rated', T.StringType(), True),
T.StructField('Released', T.StringType(), True),
T.StructField('Genre', T.StringType(), True)
]
)

# Using the withColumn function
# which transforms string json field (movie)
# to data structure which is defined into schema variable
movie_df_mapped = df.withColumn("movie", F.from_json("movie", schema))
movie_df_mapped.show(truncate=False)
movie_df_mapped.printSchema()

# Using the select to get the specific columns to build
# new Data frame which will contains following below field
movie_df = movie_df_mapped.select(F.col('id'), \
F.col("movie.Title"),\
F.col("movie.Year"),\
F.col("movie.Rated"), \
F.col("movie.Released"),\
F.col("movie.Genre"))
movie_df.show(truncate=False)
movie_df.printSchema()


if __name__ == "__main__":
rcj = ReadCSVJson()
rcj.read()

Create JSON field from csv column by to_json function in /src/test-json-to_json.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

APP_DIR="/home/hadoop/spark-json-example"

class CreateJsonField:

def test_to_json_func(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonField")\
.getOrCreate()

# data source file location
read_file=f"file://{APP_DIR}/data/modified-movies.csv"

# Read the csv file from data folder
# add the schema option inferSchema True, It will create schema
# from data source
df=spark.read.format('csv')\
.option("inferSchema", True)\
.option('header', True)\
.load(read_file)
df.show()
df.printSchema()


# Using the select
# and to_json function to make create the json text field
# from different column
# in to new data frame
df2=df.select(F.col("id"), F.to_json(\
F.struct("Title",\
"Year", \
"Rated", \
"Released",\
"Genre"))\
.alias("movie"))
df2.printSchema()
df2.show()

if __name__ == "__main__":
cjf = CreateJsonField()
cjf.test_to_json_func()

Read JSON field with get_json_object and json_tuple function in src/test_json_json_tuple_get_json_object.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

APP_DIR="/home/hadoop/spark-json-example"

class TestGetJsonTextExtract:

# test the get json object and json tuple function
def test_get_json_object_tuple_func(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("TestGetJsonTextExtract")\
.getOrCreate()

# Read the csv file from data folder
# add the schema option inferSchema True, It will create schema
# from data source
df=spark.read.format('csv')\
.option("inferSchema", True)\
.option('header', True)\
.load(f"file://{APP_DIR}/data/flim.csv")
df.show()
df.printSchema()


# Using get_json_object
# SparkSQL function (column_name, "$.json_attribute_name")
# Extract the different json attribute and value
# to new dataframe columns and value
df2 = df.select(
F.col("id"),
F.get_json_object(F.col("movie"), "$.Title").alias("Title"),
F.get_json_object(F.col("movie"), "$.Rated").alias("Rated"),
F.get_json_object(F.col("movie"), "$.Year").alias("Year"),
F.get_json_object(F.col("movie"), "$.Released").alias("Released")
)
df2.show()
df2.printSchema()

# Using Json Tuple function
# json_tuple(column_name, "json_attr_1", "json_attr_2", ....)
# Extract the each attribute into new dataframe column
df3 = df.select(F.col("id"),
F.json_tuple(F.col("movie"),
"Title", "Rated", "Released", "Year")
.alias("Title", "Rated", "Released", "Year"))
df3.show()
df3.printSchema()

if __name__ == '__main__':
test_class = TestGetJsonTextExtract()
test_class.test_get_json_object_tuple_func()

Read remote http response JSONN by spark and create the data frame into src/test_remote_json_api.py

from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen

class ReadRemoteJsonFile:

def read_http_json(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonFormatField")\
.getOrCreate()

# Read the Json file from remote api
# I am consuming dummy json com github open source project
# data and datasource, I choose products api endpoint
url = 'https://dummyjson.com/products'
# Using the urlopen python library function to read the json
# Endpont data
jsonData = urlopen(url).read().decode('utf-8')

# Create the RDD(Reslilient Distributed DataSource)
# SparkContext can provide the parallelize function
# which will create the RDD from source data
rdd = spark.sparkContext.parallelize([jsonData])
# Create the data frame from RDD, Our RDD contains the
# Json Data Source
df = spark.read.json(rdd)
df.show()
df.printSchema()

# Using the withColumn() function with explode()
# which transforms json field map or array (products) to
# return a new row for each element of products array
df2 = df.withColumn("products", F.explode("products"))

# Normalized the multiple element into array or map field
# of product data frame
# we have image map field into product data frame
# we are using for loop with range to
# create multiple column like img_1, img_2
# into data frame
df3=df2.select("products.id","products.brand",\
*[F.col('products.images').getItem(i)\
.alias(f'img{i+1}') for i in range(0,len('products.images'))])
df3.printSchema()
df3.show()

if __name__ == '__main__':
read_remote_json_file = ReadRemoteJsonFile()
read_remote_json_file.read_http_json()

Write Json file into Local Directory, code is available into src/test_json_write_files.py

from pyspark.sql import SparkSession, \
functions as F, \
types as T

APP_DIR="/home/hadoop/spark-json-example"

class WriteJsonFile:
def test_write_json_file(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonFormatField")\
.getOrCreate()

# Read the csv file from data folder
# add the schema option inferSchema True, It will create schema
# from data source
df=spark.read.format('csv')\
.option("inferSchema", True)\
.option('header', True)\
.load(f"file://{APP_DIR}/data/flim.csv")
df.show()
df.printSchema()

# create the schema to read the json text field from flim
# dataframe using struct type and struct field
schema = T.StructType(
[
T.StructField('Title', T.StringType(), True),
T.StructField('Year', T.StringType(), True),
T.StructField('Rated', T.StringType(), True),
T.StructField('Released', T.StringType(), True),
T.StructField('Genre', T.StringType(), True)
]
)

# Using the withColumn function
# which transforms string json field (movie)
# to data structure which is defined into schema variable
movie_df_mapped = df.withColumn("movie", F.from_json("movie", schema))
movie_df_mapped.show(truncate=False)
movie_df_mapped.printSchema()

# Create the New Data frame to generate the Json file by
# Select function
movie_df = movie_df_mapped.select(F.col('id'), \
F.col("movie.Title"),\
F.col("movie.Year"),\
F.col("movie.Rated"),\
F.col("movie.Released"),\
F.col("movie.Genre"))
movie_df.show(truncate=False)
movie_df.printSchema()
# using the spark Data frame write function
# with mode is overwrite (which will replace the data everytime
# write the datasource into specific location
# Json write will by default take english format datetime
# we have to specifiy the specify the specific datetime
# format is the data frame has datetime field
movie_df.write\
.mode("overwrite")\
.option("dateFormat", "MM/dd/yyyy")\
.json(f"file://{APP_DIR}/data/modified-movie")

if __name__ == '__main__':
write_json_file = WriteJsonFile()
write_json_file.test_write_json_file()

After running the following script, we can see folder is created with json and _success file.

Read files from directory created json directory, code are following file below src/test_read_json_from_directory.py

from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen

APP_DIR="/home/hadoop/spark-json-example"

class ReadWriteJsonDirectory:

def read_json_dir(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonFormatField")\
.getOrCreate()

# Read the file by spark.read function with format "json"
# Add the folder name where we generated the json
# folder(modified-movie)
# by last Script
df_read_data = spark.read\
.format("json")\
.load(f"file://{APP_DIR}/data/modified-movie")
df_read_data.printSchema()
df_read_data.show()

if __name__ == '__main__':
read_json_directory = ReadWriteJsonDirectory()
read_json_directory.read_json_dir()

Read Multiline json src/test_read_multiline_json.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T


class ReadMultilineJson:

def test_read_multiline_json(self):

# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonFormatField")\
.getOrCreate()

# Read the multi line json by enable the open multiline True during
# Read the Json file from json array files
df = spark.read\
.option("multiline", True) \
.option("inferSchema", True)\
.json(f"file://{APP_DIR}/data/multiline-json.json")

df.printSchema()
df.show()

if __name__ == '__main__':
read_multiline_json = ReadMultilineJson()
read_multiline_json.test_read_multiline_json()

Read JSON Multiple file from directory of By Spark Function, code are following below file src/test_multiple_json_file_from_directory.py

from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen

APP_DIR="/home/hadoop/spark-json-example"

class ReadMultipleJsonFromDirectory:

def read_multiple_file_from_dir(self):
# Create the Spark Session to handle the spark Operationn
# Using spark "spark://localhost:7707" as master to handle
# Resource and operation
spark = SparkSession.builder\
.master("spark://localhost:7077")\
.appName("CreateJsonFormatField")\
.getOrCreate()


# Read the multiple json file with same format data
# I have create the directory multi-json-record
# enable the multiline True
# we can read the multiple file with same format json file
df_read_json = spark.read\
.format("json")\
.option("inferSchema", True)\
.option("multiline", True)\
.load(f"file://{APP_DIR}/data/multi-json-record/*.json")
df_read_json.printSchema()
df_read_json.show()

if __name__ == '__main__':
read_dir = ReadMultipleJsonFromDirectory()
read_dir.read_multiple_file_from_dir()

All the source code you will found at below github link

--

--

Tariqul Islam

Tariqul Islam have 9+ years of software development experience. He knows Python, Node Js and Java, C# , PHP.