Hadoop , Hive, Python and Azkaban Scheduler (Data Pipeline Automation Part 1)
Automating the data pipeline is challenging works for any software development environment. To handle Big Data and Unstructured data, developer and organization try to use Hadoop, Hive like Big Data tools to automated their data pipelining. Recently I am planning to write series of Articles, which will provide concept of building Data Pipeline using different tools like hadoop, hive and Azkaban Scheduler and spark. Build the project from scratch so developer and data engineer can understand about How to build the Data Pipeline application from scratch.
The Part 1 contains,
- Hive and Hadoop configuration and run the simple query using beeline and hive, hiveserver2.
- Sample script Application written with python script to do the data transformation.
- Work flow of Scheduler Azkaban. Create sample flow and run it by Azkaban.
Prerequisite
- Debian (or Any Linux OS)
- Open JDK 11
- Python3
- Hadoop
- Hive
- Azkaban Scheduler
Configuring System into Debien and Hive and Hadoop and Azkaban
> sudo adduser hadoop
> sudo adduser haddop sudo
> sudo reboot
After restart, login as hadoop user to run operation
> su - hadoop
> pwd
/home/hadoop
Follow below article for Hive and hadoop Installation
Run the following command to Install Azkaban Scheduler and run the solo server
> cd /home/hadoop
> git clone https://github.com/azkaban/azkaban.git
> cd azkaban
> nano azkaban-web-server/build.gradle
Replease the node url to (distBaseUrl = 'https://direct.nodejs.org/dist/')
> ./gradlew build installDist
> cd azkaban-solo-server/build/install/azkaban-solo-server; bin/start-solo.sh
To learn more about Azkaban you can read that documentation
Create and Configure Data Pipeline Project named diabetic_analysis
Create app folder by following command, app folder name isdiabetic_analysis
.
> mkdir -p /home/hadoop/diabetic_analysis
> cd diabetic_analysis
> mkdir -p ddl dml bin flow data conf
> touch README.md
#Folder structure
root <diabetic_analysis>
|-- data
|-- conf
| | -- <file>.csv
| | -- <file>.txt
|-- ddl (Data Definition Language)
| |-- <filename>.hql (table and database structure file)
|-- dml (Data Manipulation Language)
| |-- <filename>.hql (HQL query running file)
|-- bin (Containing Python and Shell script file)
| | -- <filename>.py (Python Script to run the Query)
|-- flow (Contain the Azkaban flow files)
| | -- <scheduler_file>.job ( flow script to run as scheduler)
|-- README.md
All Tools and apps need to be in same folder named /home/hadoop/
Then we need to export variable and path variable into bashrc and source it before running the application.
So to order to do the operation, PASTE those following export environment variable and path variable into end of the file in /etc/bash.bashrc
> sudo nano /etc/bash.bashrc
export PATH=$PATH:$JAVA_HOME/bin:.
#Copy the following Hadoop Related Options
export HADOOP_HOME=/home/hadoop/hadoop-3.2.1
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:.
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/nativ"
export HIVE_HOME=/home/hadoop/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin
Then need to source
the bashrc file
> source /bash/bash.bashrc
Process Sample Data
To get the sample data, I use the below website link.
- Download csv file by using
wget
- Rename the file to
diabetic.csv
.
So we will use sample data file to create data pipeline to hive and hadoop, And then create the data pipeline to insert data into hadoop hdfs and create the query for transform data and insert to other table.
> cd ~/diabetic_analysis/data
> wget https://query.data.world/s/frun6snvi75fbj6egzwzriwnirr23t?dws=00000
> mv frun6snvi75fbj6egzwzriwnirr23t\?dws\=00000 diabetic.csv
Adding the date partition column info into CSV file
- Open the file into any text editor
- And then add the column named
dt
,add the sample date and later we using as partition for hdfs and partition column into hive.
age,gender,diabetic,children,smoker,region,claim,dt
19,female,NO,0,Yes,southwest,171,2023-10-16
18,male,NO,1,No,southwest,140,2023-10-16
28,male,YES,3,No,southwest,149,2023-10-16
33,male,YES,0,No,northwest,146,2023-10-16
32,male,NO,0,No,northwest,176,2023-10-16
31,female,NO,0,No,southwest,106,2023-10-16
46,female,NO,1,No,southwest,106,2023-10-16
37,female,NO,3,No,northwest,141,2023-10-16
37,male,NO,2,No,northwest,151,2023-10-16
60,female,YES,0,No,northwest,139,2023-10-16
Run Hadoop Services
Start Hadoop different services by following command below in terminal. It will start the namenode
,datanode
and yarn
and resource services.
# Using for run all the services
> start-all.sh
WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: This is not a recommended production deployment configuration.
WARNING: Use CTRL-C to abort.
Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [B4911-VM]
Starting resourcemanager
Starting nodemanagers
# Using for see the hadoop services status
>jps
2755 NodeManager
2165 NameNode
2262 DataNode
2651 ResourceManager
3118 Jps
2462 SecondaryNameNode
Hadoop Components
HDFS: HDFS (Hadoop Distributed File System) is a distributed file system that handles large data sets running on commodity hardware. It is used to scale a single Apache Hadoop cluster to hundreds (and even thousands) of nodes. HDFS is one of the major components of Hadoop.
NameNode: Namenode handle and manage datanode and save block of index for datanode. Name node should be one and it also has backup SecondaryNameNode
.
DataNode: Datanode are contains of the HDFS (Hadoop Distributed File System) and actual data is saved into Datanodes. Data Node can be multiple and it will handle NameNode
, all the process of data is done into DataNode
Yarn: Yarn is responsible for allocating the system resource to the various application running in a Hadoop cluster and scheduling tasks and queue to be executed on different cluster node. And managing the task and operations.
Build Data Pipeline Application in diabetic_analysis
Create the database creation ddl into following file /home/hadoop/diabetic_analysis/ddl/db.hql
CREATE DATABASE IF NOT EXISTS diabetic_analysis;
To run the hql(hive query language)
by hive
we have two options
hive -f <filename>.hql
hive -e 'Inline Hive Query'
> hive -f /home/hadoop/diabetic_analysis/ddl/db.hql
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 95fb409b-ad5c-48a2-b67f-b8ba13e50be8
Logging initialized using configuration in jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = f2a44b6e-90ea-4871-bf45-2a24f14cdae4
OK
Time taken: 1.155 seconds
> hive -e 'SHOW DATABASES'
Hive Session ID = 7a2d4d48-2bff-42bb-933b-e2b92d89c6de
OK
default
diabetic_analysis
erp_solution
Time taken: 0.966 seconds, Fetched: 3 row(s)
Create the file for tbl_patients
table into following below path in /home/hadoop/diabetic_analysis/ddl/tbl_patients.hql
SET hivevar:DATABASE_NAME=diabetic_analysis;
SET hivevar:TBL_PATIENTS=${DATABASE_NAME}.tbl_patients;
CREATE TABLE IF NOT EXISTS ${TBL_PATIENTS} (
age INTEGER,
gender STRING,
diabetic STRING,
children INTEGER,
smoker STRING,
region STRING,
claim INTEGER
)
PARTITIONED BY(dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES('transactional'='false');
Run the following below command to insert the Table with Partition into Hadoop by hive
> cd /home/hadoop/diabetic_analysis/ddl
> hive -f tbl_patients.hql
> hive -e 'DESCRIBE FORMATTED diabetic_analysis.tbl_patients'
# Detailed Table Information
Database: diabetic_analysis
OwnerType: USER
Owner: hadoop
CreateTime: Mon Oct 16 13:11:56 JST 2023
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/diabetic_analysis.db/tbl_patients
Table Type: MANAGED_TABLE
DESCRIBE FORMATTED <db_name>.<table_name>
will provide all the information about partition and location in hdfs
the table is created and table type.
We can create two types of table by hive
into hadoop cluster
Managed Table: For Managed tables, Hive stores data into hive warehouse directory and into fixed location. If we drop the table, it also delete the record or data of the table.
External Table: Hive stores the data into specified LOCATION into external resource. If we delete the table, It will not delete the data into table. So this table will be suitable for external datasource by hive and hadoop.
Create the file for tbl_patients_children
table into /home/hadoop/diabetic_analysis/ddl/tbl_patients_children.hql
SET hivevar:DATABASE_NAME=diabetic_analysis;
SET hivevar:TBL_PATIENTS_CHILDREN = ${DATABASE_NAME}.tbl_patients_children;
CREATE TABLE IF NOT EXISTS ${TBL_PATIENTS_CHILDREN} (
gender STRING,
region STRING,
chidren INTEGER
)
PARTITIONED BY (dt STRING)
TBLPROPERTIES('transactional'='false');
Run command below to create the tbl_patients_children
table by hive
> cd /home/hadoop/diabetic_analysis/ddl
> hive -f tbl_patients_children.hql
> hive -e 'DESCRIBE FORMATTED diabetic_analysis.tbl_patients_children'
# Detailed Table Information
Database: diabetic_analysis
OwnerType: USER
Owner: hadoop
CreateTime: Mon Oct 16 13:23:05 JST 2023
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/diabetic_analysis.db/tbl_patients_children
Table Type: MANAGED_TABLE
HDFS Operation
HDFS: To insert External CSV file to HDFS path
> cd /home/hadoop/diabetic_analysis/data
> hdfs dfs -put diabetic.csv /diabetic.csv
> hdfs dfs -cat /diabetic.csv
hdfs dfs -put <source file> <hdfs destination file>
command will put the file into hdfs path
hdfs dfs -cat <hdfs file path>
will print the file data inside the file which is located into hdfs
Run the hive query to insert data to tbl_patients table
> hive -e \
"LOAD DATA INPATH '/user/hive/warehouse/diabetic.csv' INTO TABLE diabetic_analysis.tbl_patients"
hive -e
will provide the functionality to run the hql(hive query language)
into hive
Data processing and Insert Into another table (Data Pipelining)
Insert tbl_patients
table data to tbl_patients_children
table by hql
Create the hql
file name, which get the data from tbl_patients
table and process it and Insert
data Into tbl_patientes_children
table, the file will into following location /home/hadoop/diabetic_analysis/dml/sc_insert_data_into_patient_children.hql
-- Set the hivevar for database name
SET hivevar:DATABASE_NAME=diabetic_analysis;
-- Set the hivevar for tbl_patients table
SET hivevar:TBL_PATIENTS=${diabetic_analysis}.tbl_patients;
-- Set the hivevar for tbl_patients_children table
SET hivevar:TBL_PATIENTS_CHILDREN=${diabetic_analysis}.tbl_patients_children;
-- We will insert the data with dt parition using Insert OVERWRITE from
-- tbl_patients to tbl_patients_childern table
INSERT OVERWRITE TABLE ${TBL_PATIENTS_CHILDREN} PARTITION (dt="${dt}")
SELECT
gender,
children,
claim
FROM
${TBL_PATIENTS}
WHERE dt="${dt}"
We can create the save the variable into hive by different ways (hivevar
, hiveconf
) ,the following hql
query, I used hivevar
to SET
the variable to hive
Insert Command for hive
INSERT INTO TABLE <TABLE NAME> [PARTITION(col='val')] VALUES [(), ()]
will save multiple rows into table[e.g,INSERT INTO TABLE example VALUES (‘101’, ‘Maik’), (‘102’, ‘TOM’)]
,INSERT INTO
statement does not modify the existing data and record.INSERT OVERWRITE TABLE <TABLE_NAME> [PARTITION(col='val')] [SELECT STATEMENTS]
, will replace any existing data into table and partition and insert the new record or rows.
This article, I use INSERT OVERWRITE
to insert the data into table.
Notes: SELECT statement need to have same column to insert the data to targeted Table.
Using the Beeline and hiveserver2 to Run the hive Query
Open the another tab or window into terminal
and run the following command below, it start the hiveserver2 as service
>hiveserver2
2023-10-16 16:22:14: Starting HiveServer2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 39aabdf0-6c83-46fb-99e9-32c59f716bf9
Find the IP address where the hiveserver2
is running on, for debian linux
> nmcli device show
GENERAL.DEVICE: enp0s3
GENERAL.TYPE: ethernet
GENERAL.HWADDR: 08:00:27:63:7D:20
GENERAL.MTU: 1500
GENERAL.STATE: 100 (connected)
GENERAL.CONNECTION: Wired connection 1
GENERAL.CON-PATH: /org/freedesktop/NetworkManager/ActiveConnection/2
WIRED-PROPERTIES.CARRIER: on
IP4.ADDRESS[1]: <IPv4 IP ADDRESS>/24
IP4.GATEWAY: <IPv4 GATEWAY ADDRESS>
Connect with beeline using following command below
> beeline -u jdbc:hive2://<hive2 server IP>:10000 -n <username> -p <password>
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Connecting to jdbc:hive2://<hive2 server IP>:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://<hive2 server IP>:10000>
I have setup the <username>
and <password>
, hiveserver2 as anonymous
and system user
into hive-site.xml
file so developer can log into hive server by system username
and password
.
<username>
can be use system user
like hadoop
<password>
can be use system user password
like hadoop user password
Python Script implementation to run to insert operation from tbl_patients_children
Create the file into following location /home/hadoop/diabetic_analysis/bin/insert_into_table_patients_children.py
# bin/insert_into_table_patients_children.py
import subprocess
import os
APP_BASE_DIR="/home/hadoop/diabetic_analysis"
HIVE_SERVER_IP=
HIVE_SERVER_USER_NAME=
HIVE_SERVER_PASSWORD=
HQL_SCRIPT_NAME="{0}/dml/sc_insert_data_into_patient_children.hql".format(APP_BASE_DIR)
BEELINE_CMD = "beeline "\
"-u jdbc:hive2://{0}:10000 -n {1} -p {1}"\
.format(HIVE_SERVER_IP, HIVE_SERVER_USER_NAME, HIVE_SERVER_PASSWORD )
class RunQuery:
def __init__(self) -> None:
pass
def cmd_builder(self,dt,file_name) -> String:
return "{0} --hivevar dt={1} -f {2}".format(BEELINE_CMD,dt,file_name)
def run(self):
try:
cmd = self.cmd_builder("2023-10-16", HQL_SCRIPT_NAME)
print(cmd)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)
stdout, stderr = proc.communicate()
print(stdout)
if stderr:
output = stderr.strip()
value = output.decode("utf-8")
print(value)
except subprocess.CalledProcessError as e:
print(e.output)
if __name__ == "__main__":
run_query = RunQuery()
run_query.run()
For running the script we need to install Python3 into our development environment.
In script,
- I am using
subprocess
to using the system process to run the command with beeline to run theinsert hql
to insert the data intohadoop
byhive
- Also piplineing the
stdout
andstderr
for process to we can create the log output by printing it. - Then print the
output
of the process, which will be saved intoazkaban
log, so we can easy read the logs of the process fromterminal
,web view of azkaban
andfrom text file
.
Run Script through Python3 to test application
> chmod -R 777 \
/home/hadoop/diabetic_analysis/bin/insert_into_tbl_patients_children.py
> cd /home/hadoop/diabetic_analysis
> python3 bin/insert_into_tbl_patients_children.py
Use Azkaban Scheduler Run Script
Create new project into azkaban by web UI, azkaban is running on localhost:8081
in local environment
- Click on
Create Project
button and popup will appear. - Then add the
Name
andDescription
on it and click onCreate Project
Button - Details Page for
Project Information
Create Execution Flow for Python Script for Azkaban Scheduler
> cd /home/hadoop/diabetic_analysis/flow
> mkdir -p example
> cd example
> touch common.properties
> touch run-insert-patient-info.job
Then add following code into common.properties
file
# common.properties file
APP_DIR=/home/hadoop/diabetic_analysis/bin
BASERC_FILE=/etc/bash.bashrc
Then add the following below code into run-insert-patient-info.job
file
type=command
user.to.proxy=hadoop
command=bash -c "source ${BASERC_FILE} python3 ${APP_DIR}/insert_into_tbl_patients_children.py"
retries=0
retry.backoff=0
Then create the zip file using zip and unzip utility.
Install Zip and Unzip into dabien
> sudo apt install zip unzip
Run command for zipping it
> zip -r example.zip example/
adding: example/ (stored 0%)
adding: example/run-insert-patient-info.job (deflated 10%)
adding: example/common.properties (stored 0%)
Copy the zip file in desktop and Upload the Zip file to Azkaban Project
> cp example.zip /home/hadoop/Desktop/
- Click on Upload button to upload flow .zip file
- Select the example.zip file from folder and click on upload button
- If the .zip file contains the valid jobs and properties file it will automatically create the flow into project.
Run the flow
- To Execute the Flow or Script file click on
Execute Flow
button - Click on the
Failure Options
- Select
Finish All Possible
from dropdown - Click on
Execute
button. It will start the Flow, which will contains the python script to Insert data
Check Success
and Error
logs of Executed Flow
- Click on
Executing
menu if the process is running or If the process is end withsuccess
orerror
click onHistory
menu item - Then click on
execution number
to see the details logs of execution information - Please click on
Job List
button to see the jobsuccess
anderror
logs by click onlog
button from row. - So we can see the
job
success
anderror
logs from this page
All the source code related to diabetic_analysis
are available into git repository following link below