JAVA App for Spark job and Run on GCP DataProc and Storage (Spark Code Practice 3)
In this article, I will discuss about how to use java application with maven to work with spark, build JAR to use for spark job. and how we can utilize the GCP storage and data proc to run the spark job and generating the output.
Prerequisite
- Java 1.8
- Maven
- GCP Account
- Enabled DataProc and Google Storage Service
- gCloud Cli
- Spark Installed
Enable Dataproc and Storage API
I already installed and configured Java, GCP Account, gCloud CLI, DataProc and Storage service for my development environment
Maven Installation
If you are not familiar with Maven, follow the below article to install maven into your development environment
https://www.baeldung.com/install-maven-on-windows-linux-mac
You can create the maven project manually by following article
I use Intellij Idea to create my project and solve and dependency of the project
POM(Project Object Model)
This file contains dependency named spark-core_2.13
, spark-sql_2.13
and gcs-connector
and org.slf4j
, JAR file build information and which java version will be use for build the JAR file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>location_wise_customer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>3.0.0-RC01</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
</dependencies>
</project>
Datasource we are using for this article
Save data into project directory
# App <APP_ROOT> = /home/hadoop/java-with-spark
> cd <APP_ROOT>
> mkdir -p data
> cd data
> mkdir -p input output bin
# Download the file
# Extract the zip
# COPY the CSV file into input folder
I have added the Main.java
file into project directory /src/main/java/org.example/Main.Java
, the purpose of this class is
- Initialize SparkSession to read the data from local
data/input
folder - Aggregate the data and create the new dataframe
- Generate some report data and saved into local Storage
data/output
folder
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
public class Main {
public static void main(String[] args){
run_spark();
}
/* Spark Session Create with master local with all cluster [*] */
private static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("spark-data-proc-example")
.master("local[*]")
.getOrCreate();
}
private static void run_spark() {
SparkSession spark = getSparkSession();
spark.sparkContext().setLogLevel("WARN");
/*
Notes: Change the <APP_ROOT> to your development PC
Where is application is saved
For me
location is "/home/hadoop/java-with-spark"
*/
String local_file_path = "file:///<APP_ROOT>/data/input/shopping_trends_updated.csv";
// Load the data from csv file from data folder of the project
// Create data frame named df by spark.read() function
Dataset<Row> df = spark.read()
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load(local_file_path);
// We use inferSchema true, so Spark will create schema from dataset
df.printSchema();
// df.show() will shows the 50 data without trancating the character
df.show(50, false);
/* We can create the dataset to
analysis with localtion based and gender based
Payment method
df.agg() --> this function will enable to accept
any aggregate function to dataframe,
such as sum, round, avg, min, max
sum() --> this function return the sum of the values of specified
column from the selected record
round() --> this function round uo the decimal places
with HALF_UP round mode from selected records
avg() --> this function using for find the mean value of field
or column from the selected rows or records
min() --> this function will find the minimum value of field
or column from the selected rows or records
max() --> this function will find the maximum value of field
or column from the selected rows or records
count --> this function count the the number of rows into
specified column, if we use grouping, it will
cound the number of rows by specified column
with grouping
lit --> lit is using at spark to convert a literal value into
a new column
*/
Dataset<Row> df2 = df.groupBy("Location","Gender", "Payment Method")
.agg(
sum("Purchase Amount (USD)").alias("Total Purchase (USD)"),
round(avg("Purchase Amount (USD)"), 2).alias("Average Purchase (USD)"),
max("Purchase Amount (USD)").alias("Max Purchase Amount (USD)"),
min("Purchase Amount (USD)").alias("Min Purchase Amount (USD)"),
count(lit(1)).alias("count")
);
df2.printSchema();
df2.show(false);
/* write the result output to csv file after analysis
repartition --> this method is used for increase or decrease
the number of partition of an RDD or
Dataframe of Spark
df.write() --> this method will provide the functionality to
writeout the record into file system with
specified format
.mode --> we can specify the write ouput forma
such as (csv, json...)
.option --> we can spcify the writing file properties
like header, schema, ....
.save --> function which is using for save the data into
filesystem.
*/
String output_file_path="file:///<APP_ROOT>/data/output/location_gender_payment_wise_sales";
df2.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.format("csv")
.option("header", true)
.save(output_file_path);
}
}
Build the Jar file
Package the JAVA Application with maven by following below command
> mvn --version
> mvn clean
> mvn install
> mvn package
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ location_wise_customer ---
[INFO] Building jar: <APP_ROOT>/target/location_wise_customer-1.0-SNAPSHOT.jar
To Test locally I start the hadoop and spark into my system
>$HADOOP_HOME/sbin start-all.sh
>jps
Then submit the spark job
> export APP_HOME=/home/hadoop/java-with-spark
> spark-submit --class org.example.Main \
--master local \
--deploy-mode client \
${APP_HOME}/target/location_wise_customer-1.0-SNAPSHOT.jar \
--async
Output of the command
Using GCP (Google cloud Service) To run Spark Job
Create Storage bucket in google Cloud Storage by gCloud CLI
> gcloud storage buckets create gs://data_proc_example
Upload the Datasoruce from data
folder to GCP Storage Cloud
> gcloud storage cp data/ gs://data_proc_example/data --recursive
Copying file://data/.DS_Store to gs://data_proc_example/data/.DS_Store
Copying file://data/input/shopping_trends_updated.csv to gs://data_proc_example/data/input/shopping_trends_updated.csv
Copying file://data/output/location_gender_payment_wise_sales/._SUCCESS.crc to gs://data_proc_example/data/output/location_gender_payment_wise_sales/._SUCCESS.crc
Copying file://data/output/location_gender_payment_wise_sales/.part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv.crc to gs://data_proc_example/data/output/location_gender_payment_wise_sales/.part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv.crc
Copying file://data/output/location_gender_payment_wise_sales/part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv to gs://data_proc_example/data/output/location_gender_payment_wise_sales/part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv
Copying file://data/output/location_gender_payment_wise_sales/_SUCCESS to gs://data_proc_example/data/output/location_gender_payment_wise_sales/_SUCCESS
Completed files 6/6 | 437.8kiB/437.8kiB
Average throughput: 1.2MiB/s
Changes into Java Classes for Make is GCP compatible
Create new file named GCPMain.java, And Point Out Datasource file location to gcp buckets, Output report also into GCP Storage Bucket
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
public class GCPMain {
public static void main(String[] args){
run_spark();
}
private static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("spark-data-proc-example")
.master("yarn")
.getOrCreate();
}
private static void run_spark() {
SparkSession spark = getSparkSession();
spark.sparkContext().setLogLevel("WARN");
String input_file = "gs://data_proc_example/data/input/shopping_trends_updated.csv"
// Load the CSV file from Google Storage Bucket
Dataset<Row> df = spark.read()
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load(input_file);
df.printSchema();
df.show(50, false);
Dataset<Row> df2 = df.groupBy("Location","Gender", "Payment Method")
.agg(
sum("Purchase Amount (USD)").alias("Total Purchase (USD)"),
round(avg("Purchase Amount (USD)"), 2).alias("Average Purchase (USD)"),
max("Purchase Amount (USD)").alias("Max Purchase Amount (USD)"),
min("Purchase Amount (USD)").alias("Min Purchase Amount (USD)"),
count(lit(1)).alias("count")
);
String output_file = "gs://data_proc_example/data/output/shopping_trends_updated.csv"
df2.printSchema();
df2.show(false);
// location get client information
df2.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.format("csv")
.option("header", true)
.save(output_file);
}
}
Build the Jar again with Maven command
> mvn --version
> mvn clean
> mvn install
> mvn package
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ location_wise_customer ---
[INFO] Building jar: <APP_ROOT>/target/location_wise_customer-1.0-SNAPSHO
Upload Jar file into GCP storage bucket
> gcloud storage cp target/ \
gs://data_proc_example/bin --recursive
Copying file://target/location_wise_customer-1.0-SNAPSHOT.jar to gs://data_proc_example/bin/target/location_wise_customer-1.0-SNAPSHOT.jar
Copying file://target/classes/org/example/GCPMain.class to gs://data_proc_example/bin/target/classes/org/example/GCPMain.class
Copying file://target/classes/org/example/Main.class to gs://data_proc_example/bin/target/classes/org/example/Main.class
Copying file://target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
Copying file://target/maven-archiver/pom.properties to gs://data_proc_example/bin/target/maven-archiver/pom.properties
Completed files 8/8 | 12.7kiB/12.7kiB
Average throughput: 28.9kiB/s
Create the DataProc Cluster for runing Spark Job
> gcloud dataproc \
clusters create data-proc-example \
--region=us-east1
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/data-proc-example] Cluster placed in zone [us-east1-b]
Run Jar file as job in Dataproc
> export GS_BUCKET=gs://data_proc_example/bin
> export JAR_FILE=location_wise_customer-1.0-SNAPSHOT.jar
> gcloud dataproc jobs submit spark \
--region us-east1 \
--cluster data-proc-example \
--class org.example.GCPMain \
--jars ${GS_BUCKET}/${JAR_FILE} \
--async
Job [d066b88905fd4cfe9eeafbc34e410107] submitted.
driverControlFilesUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/1d962433-a6a2-4b17-a255-a9d6fa1bcc37/jobs/d066b88905fd4cfe9eeafbc34e410107/
driverOutputResourceUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/1d962433-a6a2-4b17-a255-a9d6fa1bcc37/jobs/d066b88905fd4cfe9eeafbc34e410107/driveroutput
jobUuid: 8773d8bf-35e9-3232-8815-9c37d800b0f6
placement:
clusterName: data-proc-example
clusterUuid: 1d962433-a6a2-4b17-a255-a9d6fa1bcc37
reference:
jobId: d066b88905fd4cfe9eeafbc34e410107
projectId: data-science-project-405301
sparkJob:
jarFileUris:
- gs://data_proc_example/bin/location_wise_customer-1.0-SNAPSHOT.jar
mainClass: org.example.GCPMain
status:
state: PENDING
stateStartTime: '2023-11-24T05:45:01.695656Z'
Job file explorer into GCP console DataProc Service
Details explorer for Each Job into DataProc Console
Check file is created into dataproc from storage bucket