Apache Spark with HDFS, HIVE and Hadoop (Code Practice Part 1)

Tariqul Islam
13 min readOct 20, 2023

--

Apache Spark is data analysis engine, we can use it process the bigdata with it, Spark is 100 times faster than hive, better tool selection for data analysis project building.

In this article, I discuss about how we can configure spark with hive to run Hive Query using Spark.

The article contains

  1. About configuration of Hadoop, hive and Spark
  2. Creating Sample Datasets
  3. Connecting to PySpark with Yarn as Master to process data
  4. Build Sample Apps Spark with Pyspark Repl
  5. Running the different Query to Spark and Spark Functionality

Prerequisite

  1. Dabine 10
  2. Open JDK 11
  3. Hadoop
  4. Hive
  5. Spark
  6. Scala
  7. Python Repl and Pyspark

Configuring System into Debien and Hive and Hadoop

> sudo adduser hadoop
> sudo adduser haddop sudo
> sudo reboot

After restart, login as hadoop user to run operation

> su - hadoop
> pwd
/home/hadoop

Follow below article for Hive and hadoop Installation

Follow the below article to Installation of spark

PASTE those following environment and path variable into end of the file in /etc/bash.bashrc

> sudo nano /etc/bash.bashrc
export PATH=$PATH:$JAVA_HOME/bin:.
#Copy the following Hadoop Related Options
export HADOOP_HOME=/home/hadoop/hadoop-3.2.1
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:.
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/nativ"
export HIVE_HOME=/home/hadoop/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin

export SPARK_HOME=/home/hadoop/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

Then source /etc/bash.bashrc

> source /etc/bash.bashrc

Run Hadoop, Hive and resource manager with yarn

# Using for run all the services
> start-all.sh
WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: This is not a recommended production deployment configuration.
WARNING: Use CTRL-C to abort.
Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [B4911-VM]
Starting resourcemanager
Starting nodemanagers

# Using for see the hadoop services status
>jps
2755 NodeManager
2165 NameNode
2262 DataNode
2651 ResourceManager
3118 Jps
2462 SecondaryNameNode

Using the Yarn as Master for Apache Spark (PySpark) by terminal

> pyspark --master yarn --queue default
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.1
/_/

Using Python version 3.7.3 (default, Oct 11 2023 09:51:27)
Spark context Web UI available at http://<IPv4 Address>:4040
Spark context available as 'sc' (master = yarn, app id = application_1697478063141_0007).
SparkSession available as 'spark'.There is two way Enable the Hive warehouse

>>>

--master yarn: Using yarn as master for SparkSession. Yarn will handle the all the process for spark, and Spark Create the Job in Yarn to handle the session.

--queue default : Yarn by default pre configured with scheduled queue named default , Yarn will take default queue and it’s resource to handle the job in SparkSession.

When we create the Spark Session, We can also get the Spark Context WEB UI into http://<spark Master IPv4 Address>:4040

Using Python3 repl, To Create Spark Session

We can also use Python3 with Pyspark Installation to create the Spark Session. But Pyspark and Python 3 Pyspark version need to be same, Otherwise we will get exception during PySpark package usage. I use PySpark 3.2.1, so i have to install same package version by pip or other package manager.

> pip3 install pyspark==3.2.1

Then we can use Python3 repl to access PySpark

> python3
Python 3.7.3 (default, Oct 11 2023, 09:51:27)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession\
.builder\
.master("yarn")\
.config("spark.yarn.queue", "default")\
.appName("Connect with Yarn with Default Queue")\
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

>>> spark
<pyspark.sql.session.SparkSession object at 0x7f103ad3b828>


We have to import SparkSession from pyspark.sql package

we can Initialize the SparkSession with using different way, those are following below

.master("local[2]") run spark localy with 4 core
or
.master("yarn") run spark with yarn resource manager
or
.master("spark://master:7077") means run Spark with Master with Cluster Server.

We can configure the any properties for Spark By following function for Spark

.config("<config>", "Value") support to add any configuration 
related to spark Session

Yarn Web UI is running on http://<Hadoop Hosted IPv4 Address>:8088

Configuring Hive with Spark

There is two way to configure hive into Spark

First one, Add the config spark.sql.warehouse.dir to SparkSession builder, need to specify .enableHiveSupport() to builder also.

> pyspark --master yarn --queue default
> spark = SparkSession\
.builder\
.appName("EnableHive")
.config("spark.sql.warehouse.dir", \
"hdfs://localhost:9000/user/hive/warehouse")\
.enableHiveSupport()
.getOrCreate()

Second one, Configure hive warehouse dir into spark-default.conf file

# go to spark config directory for me
> cd /home/hadoop/spark/conf
> nano spark-default.conf

Add following configuration at end of the spark-default.conf file

# Add the information ware house into spark-default.conf
spark.sql.warehouse.dir=hdfs://localhost:9000/user/hive/warehouse

So, we do not need to specify config during initlalizing SparkSession

> pyspark --master yarn --queue default
>>> spark = SparkSession\
.builder\
.appName("Hive")\
.enableHiveSupport()\
.getOrCreate()

Sample Data creation for Experimenting on PySpark

Create employee dataset into following file /home/hadoop/spark_example/data/employees.csv

employee_id,employee_name,salary,department
101,ALAX,500000.00,CONSUMER_SALES
102,TOM,500000.00,CONSUMER_SALES
103,JIM,600000.00,CONSUMER_SALES
104,MARK,500000.00,CONSUMER_SALES
105,TOMI,700000.00,ELETRONIC_SALES
106,AMALI,800000.00,CONSUMER_SALES
107,ROY,900000.00,CONSUMER_SALES
108,FILIP,300000.00,AGRICULTURE_PORDUCT_SALES
109,MONNA,800000.00,CONSUMER_SALES
110,RONME,500000.00,ELETRONIC_SALES
111,SAMULE,500000.00,AGRICULTURE_PORDUCT_SALES
112,TOD,900000.00,AGRICULTURE_PORDUCT_SALES
113,MILI,500000.00,ELETRONIC_SALES
114,KIM,900000.00,AGRICULTURE_PORDUCT_SALES
115,SESELIA,400000.00,ELETRONIC_SALES
116,BRIGHT,300000.00,ELETRONIC_SALES
117,KEN,700000.00,AGRICULTURE_PORDUCT_SALES
118,NATALI,900000.00,ELETRONIC_SALES

Create Sales Info dataset 1 /home/hadoop/spark_example/data/sales_info_2023–10–18.csv

employee_id,product_name,product_type,sales_count,sales_price
101,MANGO,Food,300,400.50
101,BANANA,Food,300,200.60
102,MANGO,Food,350,400.50
103,ORANGE,Food,500,350.67
104,GRAPE,Food,700,800.45
103,MANGO,Food,500,400.50
105,SMART TV,ELETRONIC,5,50000
105,MOBILE,ELETRONIC,8,60000
113,RICE COOKER,ELETRONIC,5,4000
112,RICE SEED 10 KG BAG,AGRICULTURE,30,2000

Create Sales Info dataset 2 /home/hadoop/spark_example/data/sales_info_2023–10–19.csv

employee_id,product_name,product_type,sales_count,sales_price
101,MANGO,Food,300,400.50
101,BANANA,Food,300,200.60
102,MANGO,Food,350,400.50
103,ORANGE,Food,500,350.67
104,GRAPE,Food,700,800.45
103,MANGO,Food,500,400.50
105,SMART TV,ELETRONIC,5,50000
105,MOBILE,ELETRONIC,8,60000
113,RICE COOKER,ELETRONIC,5,4000
112,RICE SEED 10 KG BAG,AGRICULTURE,30,2000

Using PySpark to Handle Hive Query

> pyspark --master yarn --queue default

>>> spark = SparkSession\
.builder\
.appName("CreateDatabaseAndTables")\
.config("spark.sql.warehouse.dir", \
"hdfs://localhost:9000/user/hive/warehouse")\
.config("spark.hadoop.hive.exec.dynamic.partition", True)\
.config("spark.hadoop.hive.exec.dynamic.partition.mode", True)\
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.enableHiveSupport()\
.getOrCreate()

>>> spark.sql("SHOW DATABASES").show()
>>> spark.sql("CREATE DATABASE IF NOT EXISTS spark_example").show()
>>> spark.sql("DESCRIBE DATABASE spark_example").show(truncate=False)
+-------------+----------------------------------------------------------+
|info_name |info_value |
+-------------+----------------------------------------------------------+
|Database Name|spark_example |
|Comment | |
|Location |hdfs://localhost:9000/user/hive/warehouse/spark_example.db|
|Owner |hadoop |
+-------------+----------------------------------------------------------+
spark.sql() takes the sql command to run into any database and datasource.

Create hql file for employees table in following file /home/hadoop/spark_example/ddl/employees.hql

CREATE TABLE IF NOT EXISTS spark_example.employees (
employee_id INT,
employee_name STRING,
salary DECIMAL,
department STRING
)
COMMENT "EMPLOYEE TABLE"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

Run the following code below to create employee table into Hadoop by Spark with Hive Query

>>> hql_path="/home/hadoop/spark_example/ddl/employees.hql"
>>> spark.sql(open(hql_path).read())\
.show(truncate=False)
>>> spark.sql("DESCRIBE FORMATTED spark_example.employees")\
.show(truncate=False);
+----------------------------+--------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+--------------------------------------------------------------------+-------+
|employee_id |int |null |
|employee_name |string |null |
|salary |decimal(10,0) |null |
|department |string |null |
| | | |
|# Detailed Table Information| | |
|Database |spark_example | |
|Table |employees | |
|Owner |hadoop | |
|Created Time |Thu Oct 19 17:48:48 JST 2023 | |
|Last Access |UNKNOWN | |
|Created By |Spark 3.2.1 | |
|Type |MANAGED | |
|Provider |hive | |
|Comment |Author: Tariqul Islam, Table name: Employees Table | |
|Table Properties |[transient_lastDdlTime=1697705328] | |
|Location |hdfs://localhost:9000/user/hive/warehouse/spark_example.db/employees| |
|Serde Library |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
|InputFormat |org.apache.hadoop.mapred.TextInputFormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | |
+----------------------------+--------------------------------------------------------------------+-------+
# truncate take boolean value to show whole string in every field or not.
.show(<record or row length>, truncate=[True,False])

# We can open any file from file system and read the internal
# HQL command run it to with .sql() command and provide output

> spark.sql(open(hql_path).read()).show(truncate=False)

DESCRIBE FORMATTED <table name> will shows the table structure

Load DATA into Employee table and Retrieve Information

>>> df = spark.read.format("csv")\
.option("header", True)\
.option("inferSchema", True)\
.load("file:///home/hadoop/spark_example/data/employees.csv")

>>> df.show(truncate=False)
+-----------+-------------+--------+-------------------------+
|employee_id|employee_name|salary |department |
+-----------+-------------+--------+-------------------------+
|101 |ALAX |500000.0|CONSUMER_SALES |
|102 |TOM |500000.0|CONSUMER_SALES |
|103 |JIM |600000.0|CONSUMER_SALES |
|104 |MARK |500000.0|CONSUMER_SALES |
|105 |TOMI |700000.0|ELETRONIC_SALES |
|106 |AMALI |800000.0|CONSUMER_SALES |
|107 |ROY |900000.0|CONSUMER_SALES |
|108 |FILIP |300000.0|AGRICULTURE_PORDUCT_SALES|
|109 |MONNA |800000.0|CONSUMER_SALES |
|110 |RONME |500000.0|ELETRONIC_SALES |
|111 |SAMULE |500000.0|AGRICULTURE_PORDUCT_SALES|
|112 |TOD |900000.0|AGRICULTURE_PORDUCT_SALES|
|113 |MILI |500000.0|ELETRONIC_SALES |
|114 |KIM |900000.0|AGRICULTURE_PORDUCT_SALES|
|115 |SESELIA |400000.0|ELETRONIC_SALES |
|116 |BRIGHT |300000.0|ELETRONIC_SALES |
|117 |KEN |700000.0|AGRICULTURE_PORDUCT_SALES|
|118 |NATALI |900000.0|ELETRONIC_SALES |
+-----------+-------------+--------+-------------------------+

>>> df.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- employee_name: string (nullable = true)
|-- salary: double (nullable = true)
|-- department: string (nullable = true)
# enable the Header into dataframe
.option("header", True)

# enable the feature to create schema
#(Data Type from each column) for each field by record
.option("inferSchema", True)

# loads function will load the data from any source
# source such as file://, hdfs://, http(s):// s3://
.load("<file or data soruce location>)

# It will print the dataframe schema (column and data type)
df.printSchema()

Insert Data into employee table by PySpark

Procedure
1. Create Temp Table for DataFrame so we can run SQL Query
2. Create Insert Query By HiveQL
3. Insert data by pyspark.sql
4. Write the SQL query for Show Hive Table Data
5. Show the data from table
>>> df.createOrReplaceTempView("temp_employee_dataframe")
>>> emp_insert_query="INSERT OVERWRITE TABLE spark_example.employees \
SELECT\
employee_id,\
employee_name,\
salary,\
department \
FROM temp_employee_dataframe;"
>>> spark.sql(emp_insert_query).show()
>>> emp_query = "SELECT * FROM spark_example.employees"
>>> spark.sql(emp_query).show(truncate=False)

+-----------+-------------+--------+-------------------------+
|employee_id|employee_name|salary |department |
+-----------+-------------+--------+-------------------------+
|101 |ALAX |500000.0|CONSUMER_SALES |
|102 |TOM |500000.0|CONSUMER_SALES |
|103 |JIM |600000.0|CONSUMER_SALES |
|104 |MARK |500000.0|CONSUMER_SALES |
|105 |TOMI |700000.0|ELETRONIC_SALES |
|106 |AMALI |800000.0|CONSUMER_SALES |
|107 |ROY |900000.0|CONSUMER_SALES |
|108 |FILIP |300000.0|AGRICULTURE_PORDUCT_SALES|
|109 |MONNA |800000.0|CONSUMER_SALES |
|110 |RONME |500000.0|ELETRONIC_SALES |
|111 |SAMULE |500000.0|AGRICULTURE_PORDUCT_SALES|
|112 |TOD |900000.0|AGRICULTURE_PORDUCT_SALES|
|113 |MILI |500000.0|ELETRONIC_SALES |
|114 |KIM |900000.0|AGRICULTURE_PORDUCT_SALES|
|115 |SESELIA |400000.0|ELETRONIC_SALES |
|116 |BRIGHT |300000.0|ELETRONIC_SALES |
|117 |KEN |700000.0|AGRICULTURE_PORDUCT_SALES|
|118 |NATALI |900000.0|ELETRONIC_SALES |
+-----------+-------------+--------+-------------------------+
# It creates the temporary table for dataframe to memory
# So We can query it by Standard SQL Query
.createOrReplaceTempView("viewname")

Handling the Partition Table for Hive By Spark

Create hql file for sales_info table into following directory /home/hadoop/spark_example/ddl/sales_info.hql

CREATE TABLE IF NOT EXISTS spark_example.sales_info (
employee_id INT,
product_name STRING,
product_type STRING,
sales_count INT,
sales_price DOUBLE
)
COMMENT "EMPLOYEE SALES INFO"
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

Create table sales_info using Hive Query by spark

>>> sales_info_hql="/home/hadoop/spark_example/ddl/sales_info.hql"
>>> spark.sql(open(sales_info_hql).read())\
.show(truncate=False)
>>> desc_tbl_hql ="DESCRIBE FORMATTED spark_example.sales_info"
>>> spark.sql(desc_tbl_hql).show(30,truncate=False)
+----------------------------+---------------------------------------------------------------------+-------+
|col_name |data_type |comment|
+----------------------------+---------------------------------------------------------------------+-------+
|employee_id |int |null |
|product_name |string |null |
|product_type |string |null |
|sales_count |int |null |
|sales_price |double |null |
|dt |string |null |
|# Partition Information | | |
|# col_name |data_type |comment|
|dt |string |null |
| | | |
|# Detailed Table Information| | |
|Database |spark_example | |
|Table |sales_info | |
|Owner |hadoop | |
|Created Time |Thu Oct 19 18:13:59 JST 2023 | |
|Last Access |UNKNOWN | |
|Created By |Spark 3.2.1 | |
|Type |MANAGED | |
|Provider |hive | |
|Comment |EMPLOYEE SALES INFO | |
|Table Properties |[transient_lastDdlTime=1697706839] | |
|Location |hdfs://localhost:9000/user/hive/warehouse/spark_example.db/sales_info| |
|Serde Library |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | |
|InputFormat |org.apache.hadoop.mapred.TextInputFormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat | |
|Storage Properties |[serialization.format=,, line.delim=\n, field.delim=,] | |
|Partition Provider |Catalog | |
+----------------------------+---------------------------------------------------------------------+-------+

Define Column structure and applied into spark sql to save the data info sales_info table

Procedure
1. Import spark sql types
2. Create spark schema by StructType and StructField
3. Load data from CSV to Spark Dataframe
4. Check and Show dataframe data by .show() method
5. Print The schema of dataframe by .printSchema()
>>> from pyspark.sql.types import StructType,\
StructField,\
StringType,\
IntegerType,\
DoubleType

>>> sales_info_schema = StructType([\
StructField('employee_id', IntegerType(), True),\
StructField('product_name', StringType(), True),\
StructField('product_type', StringType(), True),\
StructField('sales_count', IntegerType(), True),\
StructField('sales_price', DoubleType(), True)\
])

>>> csv_file_loc ="file:///home/hadoop/spark_example/data/sales_info_2023-10-18.csv"
>>> sales_day_one_df = spark.read\
.format("csv")\
.option("header", True)\
.schema(sales_info_schema)\
.load(csv_file_loc)

>>> sales_day_one_df.show(20, truncate=False)

+-----------+-------------------+------------+-----------+-----------+
|employee_id|product_name |product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
|101 |MANGO |Food |300 |400.5 |
|101 |BANANA |Food |300 |200.6 |
|102 |MANGO |Food |350 |400.5 |
|103 |ORANGE |Food |500 |350.67 |
|104 |GRAPE |Food |700 |800.45 |
|103 |MANGO |Food |500 |400.5 |
|105 |SMART TV |ELETRONIC |5 |50000.0 |
|105 |MOBILE |ELETRONIC |8 |60000.0 |
|113 |RICE COOKER |ELETRONIC |5 |4000.0 |
|112 |RICE SEED 10 KG BAG|AGRICULTURE |30 |2000.0 |
+-----------+-------------------+------------+-----------+-----------+

>>> sales_day_one_df.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- product_name: string (nullable = true)
|-- product_type: string (nullable = true)
|-- sales_count: integer (nullable = true)
|-- sales_price: double (nullable = true)

StructType([StructField(),...])
We can create spark dataframe schema by StructType and StructField.

Using Spark to create Hive Table with Partitions and Save Data with Partition

Procedure
1. Create the Temp Table into Spark
2. Write the Insert Query to Insert Data from Temp Table
3. Run the Query to Insert Data
4. Check the Data is saved into Hive and Hadoop into HDFS
>>> sales_day_one_df.createOrReplaceTempView("temp_sales_info_20231018")
>>> sales_insert_query= "INSERT OVERWRITE TABLE spark_example.sales_info \
PARTITION(dt='2023-10-18') \
SELECT employee_id,\
product_name,\
product_type,\
sales_count,\
sales_price \
FROM temp_sales_info_20231018"

>>> spark.sql(sales_insert_query).show()
>>> query = "SELECT * FROM spark_example.sales_info WHERE dt='2023-10-18'"
>>> spark.sql(query)\
.show(truncate=False)
+-----------+-------------------+------------+-----------+-----------+----------+
|employee_id|product_name |product_type|sales_count|sales_price|dt |
+-----------+-------------------+------------+-----------+-----------+----------+
|101 |MANGO |Food |300 |400.5 |2023-10-18|
|101 |BANANA |Food |300 |200.6 |2023-10-18|
|102 |MANGO |Food |350 |400.5 |2023-10-18|
|103 |ORANGE |Food |500 |350.67 |2023-10-18|
|104 |GRAPE |Food |700 |800.45 |2023-10-18|
|103 |MANGO |Food |500 |400.5 |2023-10-18|
|105 |SMART TV |ELETRONIC |5 |50000.0 |2023-10-18|
|105 |MOBILE |ELETRONIC |8 |60000.0 |2023-10-18|
|113 |RICE COOKER |ELETRONIC |5 |4000.0 |2023-10-18|
|112 |RICE SEED 10 KG BAG|AGRICULTURE |30 |2000.0 |2023-10-18|
+-----------+-------------------+------------+-----------+-----------+----------+

Remove TempView Table

We need to use spark.catalog.dropTempView("tmp_table") function to delete the temporary table in spark.

>>> spark.catalog.dropTempView("temp_sales_info_20231018")
>>> spark.sql("SELECT * FROM temp_sales_info_20231018").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/hadoop/.local/lib/python3.7/site-packages/pyspark/sql/session.py", line 723, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/home/hadoop/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/hadoop/.local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Table or view not found: temp_sales_info_20231018; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [temp_sales_info_20231018], [], false

Check Partition Directory is created on HDFS

# hdfs dfs -ls we can check the list of file or file is exists into HDFS
# by Following below command we can see the sales_info dt parition directory
# Of HDFS
> hdfs dfs -ls /user/hive/warehouse/spark_example.db/sales_info/dt=2023-10-18

Add Another Partitioned sales_info table

Process
1. Load the data from CSV to Spark Dataframe
2. Create the Temp Table into Spark
3. Write the Insert Query to Insert Data from Temp Table
4. Run the Query to Insert Data
5. Check the Data is saved into Hive and Hadoop into HDFS

>>> sales_data_second_df = spark.read\
.format("csv")\
.option("header", True)\
.schema(sales_info_schema)\
.load("file:///home/hadoop/spark_example/data/sales_info_2023-10-19.csv")
>>> sales_data_second_df.show(truncate=False)
+-----------+-------------------+------------+-----------+-----------+
|employee_id|product_name |product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
|101 |MANGO |Food |300 |400.5 |
|101 |BANANA |Food |300 |200.6 |
|102 |MANGO |Food |350 |400.5 |
|103 |ORANGE |Food |500 |350.67 |
|104 |GRAPE |Food |700 |800.45 |
|103 |MANGO |Food |500 |400.5 |
|105 |SMART TV |ELETRONIC |5 |50000.0 |
|105 |MOBILE |ELETRONIC |8 |60000.0 |
|113 |RICE COOKER |ELETRONIC |5 |4000.0 |
|112 |RICE SEED 10 KG BAG|AGRICULTURE |30 |2000.0 |
+-----------+-------------------+------------+-----------+-----------+

# Check HDFS parition in sales_info
> hdfs dfs -ls /user/hive/warehouse/spark_example.db/sales_info
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2023-10-19 18:53 /user/hive/warehouse/spark_example.db/sales_info/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2023-10-19 18:43 /user/hive/warehouse/spark_example.db/sales_info/dt=2023-10-18
drwxr-xr-x - hadoop supergroup 0 2023-10-19 18:53 /user/hive/warehouse/spark_example.db/sales_info/dt=2023-10-19

Apply Custom Function to Spark Dataframe

spark.sql has library named `functions`, we can use it, modify the data into
dataframe.
We need to import the functions
>>> from pyspark.sql import functions as F

we can call
>>> cus_func = F.udf(<any method or function>, DataType)

Then use withColumn to change data (df=Dataframe)
>>> df = df.withColumn("<Column Name>", cus_func("<Column Name>"))

>>> from pyspark.sql import functions as F
>>> udf = F.udf(lambda x: x.lower(), StringType())
>>> sales_data_second_df = sales_data_second_df\
.withColumn("product_name", udf("product_name"))
>>> sales_data_second_df.show()
+-----------+-------------------+------------+-----------+-----------+
|employee_id| product_name|product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
| 101| mango| Food| 300| 400.5|
| 101| banana| Food| 300| 200.6|
| 102| mango| Food| 350| 400.5|
| 103| orange| Food| 500| 350.67|
| 104| grape| Food| 700| 800.45|
| 103| mango| Food| 500| 400.5|
| 105| smart tv| ELETRONIC| 5| 50000.0|
| 105| mobile| ELETRONIC| 8| 60000.0|
| 113| rice cooker| ELETRONIC| 5| 4000.0|
| 112|rice seed 10 kg bag| AGRICULTURE| 30| 2000.0|
+-----------+-------------------+------------+-----------+-----------+

Increase the sales price for sales spark dataframe by external user defiend function

First, you have to create the python function
def <custom_function_name>:
return <output>

Second, assign to udf
>>> udf_func = F.udf(<custom_function_name>, DataType)

Third, Use withColumn to change data (df=Dataframe)
>>> df = df.withColumn("<Column Name>", udf_func("<Column Name>"))
>>> def increate_sales_price_100(sales_price):
... return sales_price + 100
...
>>> sale_price_udf = F.udf(increate_sales_price_100, DoubleType())
>>> sales_data_second_df=sales_data_second_df\
.withColumn("sales_price", sale_price_udf("sales_price"))
>>> sales_data_second_df.show()
+-----------+-------------------+------------+-----------+-----------+
|employee_id| product_name|product_type|sales_count|sales_price|
+-----------+-------------------+------------+-----------+-----------+
| 101| mango| Food| 300| 500.5|
| 101| banana| Food| 300| 300.6|
| 102| mango| Food| 350| 500.5|
| 103| orange| Food| 500| 450.67|
| 104| grape| Food| 700| 900.45|
| 103| mango| Food| 500| 500.5|
| 105| smart tv| ELETRONIC| 5| 50100.0|
| 105| mobile| ELETRONIC| 8| 60100.0|
| 113| rice cooker| ELETRONIC| 5| 4100.0|
| 112|rice seed 10 kg bag| AGRICULTURE| 30| 2100.0|
+-----------+-------------------+------------+-----------+-----------+

Save the data into HDFS

>>> sales_data_second_df.createOrReplaceTempView("temp_sales_info_20231019")

>>> sales_insert_query_two = "INSERT OVERWRITE TABLE spark_example.sales_info \
PARTITION(dt='2023-10-19') \
SELECT \
employee_id,\
product_name,\
product_type,\
sales_count,\
sales_price \
FROM temp_sales_info_20231019"

>>> spark.sql(sales_insert_query_two).show()
>>> modify_query = "SELECT * FROM \
spark_example.sales_info \
WHERE dt='2023-10-19'"
>>> spark.sql(modify_query).show(truncate=False)
+-----------+-------------------+------------+-----------+-----------+----------+
|employee_id|product_name |product_type|sales_count|sales_price|dt |
+-----------+-------------------+------------+-----------+-----------+----------+
|101 |mango |Food |300 |500.5 |2023-10-19|
|101 |banana |Food |300 |300.6 |2023-10-19|
|102 |mango |Food |350 |500.5 |2023-10-19|
|103 |orange |Food |500 |450.67 |2023-10-19|
|104 |grape |Food |700 |900.45 |2023-10-19|
|103 |mango |Food |500 |500.5 |2023-10-19|
|105 |smart tv |ELETRONIC |5 |50100.0 |2023-10-19|
|105 |mobile |ELETRONIC |8 |60100.0 |2023-10-19|
|113 |rice cooker |ELETRONIC |5 |4100.0 |2023-10-19|
|112 |rice seed 10 kg bag|AGRICULTURE |30 |2100.0 |2023-10-19|
+-----------+-------------------+------------+-----------+-----------+----------+

Remove the temp table sales

>>> spark.catalog.dropTempView("temp_sales_info_20231019")
>>> spark.sql("SELECT * FROM temp_sales_info_20231019").show()

Truncate table Unpartitioned Table

Procedure
1. Create spark dataframe schema by StructType and StructField
2. Create Empty RDD from spark context
3. Create Empty DataFrame by .createDataFrame()
4. Check Schema by .printSchema()
5. Create the Temp View or Table by .createOrReplaceTempView()
6. Create the HQL (Hive Query) for Insert Empty Record into Table
7. Run the Insert HQL by Spark .sql() function
8. Check the Table has data or not
>>> employee_schema = StructType([\
StructField('employee_id', IntegerType(), True),\
StructField('employee_name', StringType(), True),\
StructField('salary', IntegerType(), True),\
StructField('department', StringType(), True),\
])
>>> emptyEmpRDD=spark.sparkContext.emptyRDD()
>>> emptyEmpDF = spark.createDataFrame(emptyEmpRDD,employee_schema)
>>> emptyEmpDF.printSchema()
root
|-- employee_id: integer (nullable = true)
|-- employee_name: string (nullable = true)
|-- salary: integer (nullable = true)
|-- department: string (nullable = true)

>>> emptyEmpDF.createOrReplaceTempView("empty_employee")
>>> emp_delete_query = "INSERT OVERWRITE \
TABLE spark_example.employees \
SELECT * FROM empty_employee;"
>>> spark.sql(emp_delete_query).show()
>>> spark.sql("SELECT * FROM spark_example.employees").show()
+-----------+-------------+------+----------+
|employee_id|employee_name|salary|department|
+-----------+-------------+------+----------+
+-----------+-------------+------+----------+

--

--

Tariqul Islam
Tariqul Islam

Written by Tariqul Islam

Tariqul Islam have 9+ years of software development experience. He knows Python, Node Js and Java, C# , PHP.

No responses yet