Hadoop , Hive, Python and Azkaban Scheduler (Data Pipeline Automation Part 1)

Tariqul Islam
11 min readOct 18, 2023

--

Automating the data pipeline is challenging works for any software development environment. To handle Big Data and Unstructured data, developer and organization try to use Hadoop, Hive like Big Data tools to automated their data pipelining. Recently I am planning to write series of Articles, which will provide concept of building Data Pipeline using different tools like hadoop, hive and Azkaban Scheduler and spark. Build the project from scratch so developer and data engineer can understand about How to build the Data Pipeline application from scratch.

The Part 1 contains,

  1. Hive and Hadoop configuration and run the simple query using beeline and hive, hiveserver2.
  2. Sample script Application written with python script to do the data transformation.
  3. Work flow of Scheduler Azkaban. Create sample flow and run it by Azkaban.

Prerequisite

  1. Debian (or Any Linux OS)
  2. Open JDK 11
  3. Python3
  4. Hadoop
  5. Hive
  6. Azkaban Scheduler

Configuring System into Debien and Hive and Hadoop and Azkaban

> sudo adduser hadoop
> sudo adduser haddop sudo
> sudo reboot

After restart, login as hadoop user to run operation

> su - hadoop
> pwd
/home/hadoop

Follow below article for Hive and hadoop Installation

Run the following command to Install Azkaban Scheduler and run the solo server

> cd /home/hadoop
> git clone https://github.com/azkaban/azkaban.git
> cd azkaban
> nano azkaban-web-server/build.gradle
Replease the node url to (distBaseUrl = 'https://direct.nodejs.org/dist/')
> ./gradlew build installDist
> cd azkaban-solo-server/build/install/azkaban-solo-server; bin/start-solo.sh

To learn more about Azkaban you can read that documentation

Create and Configure Data Pipeline Project named diabetic_analysis

Create app folder by following command, app folder name isdiabetic_analysis .

> mkdir -p /home/hadoop/diabetic_analysis
> cd diabetic_analysis
> mkdir -p ddl dml bin flow data conf
> touch README.md
#Folder structure
root <diabetic_analysis>
|-- data
|-- conf
| | -- <file>.csv
| | -- <file>.txt
|-- ddl (Data Definition Language)
| |-- <filename>.hql (table and database structure file)
|-- dml (Data Manipulation Language)
| |-- <filename>.hql (HQL query running file)
|-- bin (Containing Python and Shell script file)
| | -- <filename>.py (Python Script to run the Query)
|-- flow (Contain the Azkaban flow files)
| | -- <scheduler_file>.job ( flow script to run as scheduler)
|-- README.md

All Tools and apps need to be in same folder named /home/hadoop/

Then we need to export variable and path variable into bashrc and source it before running the application.

So to order to do the operation, PASTE those following export environment variable and path variable into end of the file in /etc/bash.bashrc

> sudo nano /etc/bash.bashrc
export PATH=$PATH:$JAVA_HOME/bin:.
#Copy the following Hadoop Related Options
export HADOOP_HOME=/home/hadoop/hadoop-3.2.1
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:.
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/nativ"

export HIVE_HOME=/home/hadoop/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin

Then need to source the bashrc file

> source /bash/bash.bashrc

Process Sample Data

To get the sample data, I use the below website link.

  1. Download csv file by using wget
  2. Rename the file to diabetic.csv .

So we will use sample data file to create data pipeline to hive and hadoop, And then create the data pipeline to insert data into hadoop hdfs and create the query for transform data and insert to other table.

> cd ~/diabetic_analysis/data
> wget https://query.data.world/s/frun6snvi75fbj6egzwzriwnirr23t?dws=00000
> mv frun6snvi75fbj6egzwzriwnirr23t\?dws\=00000 diabetic.csv

Adding the date partition column info into CSV file

  1. Open the file into any text editor
  2. And then add the column named dt ,add the sample date and later we using as partition for hdfs and partition column into hive.
age,gender,diabetic,children,smoker,region,claim,dt
19,female,NO,0,Yes,southwest,171,2023-10-16
18,male,NO,1,No,southwest,140,2023-10-16
28,male,YES,3,No,southwest,149,2023-10-16
33,male,YES,0,No,northwest,146,2023-10-16
32,male,NO,0,No,northwest,176,2023-10-16
31,female,NO,0,No,southwest,106,2023-10-16
46,female,NO,1,No,southwest,106,2023-10-16
37,female,NO,3,No,northwest,141,2023-10-16
37,male,NO,2,No,northwest,151,2023-10-16
60,female,YES,0,No,northwest,139,2023-10-16

Run Hadoop Services

Start Hadoop different services by following command below in terminal. It will start the namenode ,datanode and yarn and resource services.

# Using for run all the services
> start-all.sh
WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: This is not a recommended production deployment configuration.
WARNING: Use CTRL-C to abort.
Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [B4911-VM]
Starting resourcemanager
Starting nodemanagers

# Using for see the hadoop services status
>jps
2755 NodeManager
2165 NameNode
2262 DataNode
2651 ResourceManager
3118 Jps
2462 SecondaryNameNode

Hadoop Components

HDFS: HDFS (Hadoop Distributed File System) is a distributed file system that handles large data sets running on commodity hardware. It is used to scale a single Apache Hadoop cluster to hundreds (and even thousands) of nodes. HDFS is one of the major components of Hadoop.

NameNode: Namenode handle and manage datanode and save block of index for datanode. Name node should be one and it also has backup SecondaryNameNode .

DataNode: Datanode are contains of the HDFS (Hadoop Distributed File System) and actual data is saved into Datanodes. Data Node can be multiple and it will handle NameNode , all the process of data is done into DataNode

Yarn: Yarn is responsible for allocating the system resource to the various application running in a Hadoop cluster and scheduling tasks and queue to be executed on different cluster node. And managing the task and operations.

Build Data Pipeline Application in diabetic_analysis

Create the database creation ddl into following file /home/hadoop/diabetic_analysis/ddl/db.hql

CREATE DATABASE IF NOT EXISTS diabetic_analysis;

To run the hql(hive query language) by hive we have two options

  1. hive -f <filename>.hql
  2. hive -e 'Inline Hive Query'
> hive -f /home/hadoop/diabetic_analysis/ddl/db.hql

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 95fb409b-ad5c-48a2-b67f-b8ba13e50be8

Logging initialized using configuration in jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive Session ID = f2a44b6e-90ea-4871-bf45-2a24f14cdae4
OK
Time taken: 1.155 seconds

> hive -e 'SHOW DATABASES'

Hive Session ID = 7a2d4d48-2bff-42bb-933b-e2b92d89c6de
OK
default
diabetic_analysis
erp_solution
Time taken: 0.966 seconds, Fetched: 3 row(s)

Create the file for tbl_patientstable into following below path in /home/hadoop/diabetic_analysis/ddl/tbl_patients.hql

SET hivevar:DATABASE_NAME=diabetic_analysis;
SET hivevar:TBL_PATIENTS=${DATABASE_NAME}.tbl_patients;
CREATE TABLE IF NOT EXISTS ${TBL_PATIENTS} (
age INTEGER,
gender STRING,
diabetic STRING,
children INTEGER,
smoker STRING,
region STRING,
claim INTEGER
)
PARTITIONED BY(dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES('transactional'='false');

Run the following below command to insert the Table with Partition into Hadoop by hive

> cd /home/hadoop/diabetic_analysis/ddl
> hive -f tbl_patients.hql
> hive -e 'DESCRIBE FORMATTED diabetic_analysis.tbl_patients'

# Detailed Table Information
Database: diabetic_analysis
OwnerType: USER
Owner: hadoop
CreateTime: Mon Oct 16 13:11:56 JST 2023
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/diabetic_analysis.db/tbl_patients
Table Type: MANAGED_TABLE

DESCRIBE FORMATTED <db_name>.<table_name> will provide all the information about partition and location in hdfs the table is created and table type.

We can create two types of table by hive into hadoop cluster

Managed Table: For Managed tables, Hive stores data into hive warehouse directory and into fixed location. If we drop the table, it also delete the record or data of the table.

External Table: Hive stores the data into specified LOCATION into external resource. If we delete the table, It will not delete the data into table. So this table will be suitable for external datasource by hive and hadoop.

Create the file for tbl_patients_children table into /home/hadoop/diabetic_analysis/ddl/tbl_patients_children.hql

SET hivevar:DATABASE_NAME=diabetic_analysis;
SET hivevar:TBL_PATIENTS_CHILDREN = ${DATABASE_NAME}.tbl_patients_children;
CREATE TABLE IF NOT EXISTS ${TBL_PATIENTS_CHILDREN} (
gender STRING,
region STRING,
chidren INTEGER
)
PARTITIONED BY (dt STRING)
TBLPROPERTIES('transactional'='false');

Run command below to create the tbl_patients_children table by hive

> cd /home/hadoop/diabetic_analysis/ddl
> hive -f tbl_patients_children.hql
> hive -e 'DESCRIBE FORMATTED diabetic_analysis.tbl_patients_children'

# Detailed Table Information
Database: diabetic_analysis
OwnerType: USER
Owner: hadoop
CreateTime: Mon Oct 16 13:23:05 JST 2023
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/diabetic_analysis.db/tbl_patients_children
Table Type: MANAGED_TABLE

HDFS Operation

HDFS: To insert External CSV file to HDFS path

> cd /home/hadoop/diabetic_analysis/data
> hdfs dfs -put diabetic.csv /diabetic.csv
> hdfs dfs -cat /diabetic.csv

hdfs dfs -put <source file> <hdfs destination file> command will put the file into hdfs path

hdfs dfs -cat <hdfs file path> will print the file data inside the file which is located into hdfs

Run the hive query to insert data to tbl_patients table

> hive -e \
"LOAD DATA INPATH '/user/hive/warehouse/diabetic.csv' INTO TABLE diabetic_analysis.tbl_patients"

hive -e will provide the functionality to run the hql(hive query language) into hive

Data processing and Insert Into another table (Data Pipelining)

Insert tbl_patients table data to tbl_patients_childrentable by hql

Create the hql file name, which get the data from tbl_patients table and process it and Insert data Into tbl_patientes_children table, the file will into following location /home/hadoop/diabetic_analysis/dml/sc_insert_data_into_patient_children.hql

-- Set the hivevar for database name
SET hivevar:DATABASE_NAME=diabetic_analysis;
-- Set the hivevar for tbl_patients table
SET hivevar:TBL_PATIENTS=${diabetic_analysis}.tbl_patients;
-- Set the hivevar for tbl_patients_children table
SET hivevar:TBL_PATIENTS_CHILDREN=${diabetic_analysis}.tbl_patients_children;
-- We will insert the data with dt parition using Insert OVERWRITE from
-- tbl_patients to tbl_patients_childern table
INSERT OVERWRITE TABLE ${TBL_PATIENTS_CHILDREN} PARTITION (dt="${dt}")
SELECT
gender,
children,
claim
FROM
${TBL_PATIENTS}
WHERE dt="${dt}"

We can create the save the variable into hive by different ways (hivevar, hiveconf) ,the following hql query, I used hivevar to SET the variable to hive

Insert Command for hive

  1. INSERT INTO TABLE <TABLE NAME> [PARTITION(col='val')] VALUES [(), ()]will save multiple rows into table [e.g,INSERT INTO TABLE example VALUES (‘101’, ‘Maik’), (‘102’, ‘TOM’)] , INSERT INTOstatement does not modify the existing data and record.
  2. INSERT OVERWRITE TABLE <TABLE_NAME> [PARTITION(col='val')] [SELECT STATEMENTS] , will replace any existing data into table and partition and insert the new record or rows.

This article, I use INSERT OVERWRITE to insert the data into table.

Notes: SELECT statement need to have same column to insert the data to targeted Table.

Using the Beeline and hiveserver2 to Run the hive Query

Open the another tab or window into terminal and run the following command below, it start the hiveserver2 as service

>hiveserver2
2023-10-16 16:22:14: Starting HiveServer2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Hive Session ID = 39aabdf0-6c83-46fb-99e9-32c59f716bf9

Find the IP address where the hiveserver2 is running on, for debian linux

> nmcli device show

GENERAL.DEVICE: enp0s3
GENERAL.TYPE: ethernet
GENERAL.HWADDR: 08:00:27:63:7D:20
GENERAL.MTU: 1500
GENERAL.STATE: 100 (connected)
GENERAL.CONNECTION: Wired connection 1
GENERAL.CON-PATH: /org/freedesktop/NetworkManager/ActiveConnection/2
WIRED-PROPERTIES.CARRIER: on
IP4.ADDRESS[1]: <IPv4 IP ADDRESS>/24
IP4.GATEWAY: <IPv4 GATEWAY ADDRESS>

Connect with beeline using following command below


> beeline -u jdbc:hive2://<hive2 server IP>:10000 -n <username> -p <password>
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Connecting to jdbc:hive2://<hive2 server IP>:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://<hive2 server IP>:10000>

I have setup the <username> and <password> , hiveserver2 as anonymous and system user into hive-site.xml file so developer can log into hive server by system username and password .

<username> can be use system user like hadoop

<password> can be use system user password like hadoop user password

Python Script implementation to run to insert operation from tbl_patients_children

Create the file into following location /home/hadoop/diabetic_analysis/bin/insert_into_table_patients_children.py

# bin/insert_into_table_patients_children.py
import subprocess
import os

APP_BASE_DIR="/home/hadoop/diabetic_analysis"
HIVE_SERVER_IP=
HIVE_SERVER_USER_NAME=
HIVE_SERVER_PASSWORD=
HQL_SCRIPT_NAME="{0}/dml/sc_insert_data_into_patient_children.hql".format(APP_BASE_DIR)
BEELINE_CMD = "beeline "\
"-u jdbc:hive2://{0}:10000 -n {1} -p {1}"\
.format(HIVE_SERVER_IP, HIVE_SERVER_USER_NAME, HIVE_SERVER_PASSWORD )

class RunQuery:

def __init__(self) -> None:
pass

def cmd_builder(self,dt,file_name) -> String:
return "{0} --hivevar dt={1} -f {2}".format(BEELINE_CMD,dt,file_name)

def run(self):
try:
cmd = self.cmd_builder("2023-10-16", HQL_SCRIPT_NAME)
print(cmd)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid)

stdout, stderr = proc.communicate()
print(stdout)
if stderr:
output = stderr.strip()
value = output.decode("utf-8")
print(value)
except subprocess.CalledProcessError as e:
print(e.output)


if __name__ == "__main__":
run_query = RunQuery()
run_query.run()

For running the script we need to install Python3 into our development environment.

In script,

  1. I am using subprocess to using the system process to run the command with beeline to run the insert hql to insert the data into hadoop by hive
  2. Also piplineing the stdout and stderr for process to we can create the log output by printing it.
  3. Then print the output of the process, which will be saved into azkaban log, so we can easy read the logs of the process from terminal , web view of azkaban and from text file .

Run Script through Python3 to test application

> chmod -R 777 \
/home/hadoop/diabetic_analysis/bin/insert_into_tbl_patients_children.py

> cd /home/hadoop/diabetic_analysis

> python3 bin/insert_into_tbl_patients_children.py

Use Azkaban Scheduler Run Script

Create new project into azkaban by web UI, azkaban is running on localhost:8081 in local environment

  1. Click on Create Project button and popup will appear.
  2. Then add the Name and Description on it and click on Create Project Button
  3. Details Page for Project Information

Create Execution Flow for Python Script for Azkaban Scheduler

> cd /home/hadoop/diabetic_analysis/flow
> mkdir -p example
> cd example
> touch common.properties
> touch run-insert-patient-info.job

Then add following code into common.properties file

# common.properties file
APP_DIR=/home/hadoop/diabetic_analysis/bin
BASERC_FILE=/etc/bash.bashrc

Then add the following below code into run-insert-patient-info.job file

type=command
user.to.proxy=hadoop
command=bash -c "source ${BASERC_FILE} python3 ${APP_DIR}/insert_into_tbl_patients_children.py"
retries=0
retry.backoff=0

Then create the zip file using zip and unzip utility.

Install Zip and Unzip into dabien

> sudo apt install zip unzip

Run command for zipping it

> zip -r example.zip example/
adding: example/ (stored 0%)
adding: example/run-insert-patient-info.job (deflated 10%)
adding: example/common.properties (stored 0%)

Copy the zip file in desktop and Upload the Zip file to Azkaban Project

> cp example.zip /home/hadoop/Desktop/
  1. Click on Upload button to upload flow .zip file
  2. Select the example.zip file from folder and click on upload button
  3. If the .zip file contains the valid jobs and properties file it will automatically create the flow into project.

Run the flow

  1. To Execute the Flow or Script file click on Execute Flow button
  2. Click on the Failure Options
  3. Select Finish All Possible from dropdown
  4. Click on Execute button. It will start the Flow, which will contains the python script to Insert data

Check Success and Error logs of Executed Flow

  1. Click on Executing menu if the process is running or If the process is end with success or error click on History menu item
  2. Then click on execution number to see the details logs of execution information
  3. Please click on Job List button to see the job success and error logs by click on log button from row.
  4. So we can see the job success and error logs from this page

All the source code related to diabetic_analysis are available into git repository following link below

--

--

Tariqul Islam

Tariqul Islam have 9+ years of software development experience. He knows Python, Node Js and Java, C# , PHP.