JAVA App for Spark job and Run on GCP DataProc and Storage (Spark Code Practice 3)

Tariqul Islam
8 min readNov 28, 2023

--

In this article, I will discuss about how to use java application with maven to work with spark, build JAR to use for spark job. and how we can utilize the GCP storage and data proc to run the spark job and generating the output.

Prerequisite

  1. Java 1.8
  2. Maven
  3. GCP Account
  4. Enabled DataProc and Google Storage Service
  5. gCloud Cli
  6. Spark Installed

Enable Dataproc and Storage API

I already installed and configured Java, GCP Account, gCloud CLI, DataProc and Storage service for my development environment

Maven Installation

If you are not familiar with Maven, follow the below article to install maven into your development environment

https://www.baeldung.com/install-maven-on-windows-linux-mac

You can create the maven project manually by following article

I use Intellij Idea to create my project and solve and dependency of the project

POM(Project Object Model)

This file contains dependency named spark-core_2.13 , spark-sql_2.13 and gcs-connector and org.slf4j , JAR file build information and which java version will be use for build the JAR file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>location_wise_customer</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>3.0.0-RC01</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
</dependencies>

</project>

Datasource we are using for this article

Save data into project directory

# App <APP_ROOT> = /home/hadoop/java-with-spark
> cd <APP_ROOT>
> mkdir -p data
> cd data
> mkdir -p input output bin

# Download the file
# Extract the zip
# COPY the CSV file into input folder

I have added the Main.java file into project directory /src/main/java/org.example/Main.Java , the purpose of this class is

  1. Initialize SparkSession to read the data from local data/input folder
  2. Aggregate the data and create the new dataframe
  3. Generate some report data and saved into local Storage data/output folder
package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;

public class Main {
public static void main(String[] args){
run_spark();
}

/* Spark Session Create with master local with all cluster [*] */
private static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("spark-data-proc-example")
.master("local[*]")
.getOrCreate();
}

private static void run_spark() {
SparkSession spark = getSparkSession();
spark.sparkContext().setLogLevel("WARN");
/*
Notes: Change the <APP_ROOT> to your development PC
Where is application is saved
For me
location is "/home/hadoop/java-with-spark"
*/
String local_file_path = "file:///<APP_ROOT>/data/input/shopping_trends_updated.csv";
// Load the data from csv file from data folder of the project
// Create data frame named df by spark.read() function
Dataset<Row> df = spark.read()
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load(local_file_path);
// We use inferSchema true, so Spark will create schema from dataset
df.printSchema();
// df.show() will shows the 50 data without trancating the character
df.show(50, false);

/* We can create the dataset to
analysis with localtion based and gender based
Payment method
df.agg() --> this function will enable to accept
any aggregate function to dataframe,
such as sum, round, avg, min, max
sum() --> this function return the sum of the values of specified
column from the selected record

round() --> this function round uo the decimal places
with HALF_UP round mode from selected records

avg() --> this function using for find the mean value of field
or column from the selected rows or records

min() --> this function will find the minimum value of field
or column from the selected rows or records

max() --> this function will find the maximum value of field
or column from the selected rows or records

count --> this function count the the number of rows into
specified column, if we use grouping, it will
cound the number of rows by specified column
with grouping

lit --> lit is using at spark to convert a literal value into
a new column
*/
Dataset<Row> df2 = df.groupBy("Location","Gender", "Payment Method")
.agg(
sum("Purchase Amount (USD)").alias("Total Purchase (USD)"),
round(avg("Purchase Amount (USD)"), 2).alias("Average Purchase (USD)"),
max("Purchase Amount (USD)").alias("Max Purchase Amount (USD)"),
min("Purchase Amount (USD)").alias("Min Purchase Amount (USD)"),
count(lit(1)).alias("count")
);

df2.printSchema();
df2.show(false);
/* write the result output to csv file after analysis
repartition --> this method is used for increase or decrease
the number of partition of an RDD or
Dataframe of Spark
df.write() --> this method will provide the functionality to
writeout the record into file system with
specified format
.mode --> we can specify the write ouput forma
such as (csv, json...)
.option --> we can spcify the writing file properties
like header, schema, ....
.save --> function which is using for save the data into
filesystem.
*/
String output_file_path="file:///<APP_ROOT>/data/output/location_gender_payment_wise_sales";
df2.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.format("csv")
.option("header", true)
.save(output_file_path);
}
}

Build the Jar file

Package the JAVA Application with maven by following below command

> mvn --version
> mvn clean
> mvn install
> mvn package
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ location_wise_customer ---
[INFO] Building jar: <APP_ROOT>/target/location_wise_customer-1.0-SNAPSHOT.jar

To Test locally I start the hadoop and spark into my system

>$HADOOP_HOME/sbin start-all.sh
>jps

Then submit the spark job

> export APP_HOME=/home/hadoop/java-with-spark
> spark-submit --class org.example.Main \
--master local \
--deploy-mode client \
${APP_HOME}/target/location_wise_customer-1.0-SNAPSHOT.jar \
--async

Output of the command

Using GCP (Google cloud Service) To run Spark Job

Create Storage bucket in google Cloud Storage by gCloud CLI

> gcloud storage buckets create gs://data_proc_example

Upload the Datasoruce from data folder to GCP Storage Cloud

> gcloud storage cp data/ gs://data_proc_example/data --recursive
Copying file://data/.DS_Store to gs://data_proc_example/data/.DS_Store
Copying file://data/input/shopping_trends_updated.csv to gs://data_proc_example/data/input/shopping_trends_updated.csv
Copying file://data/output/location_gender_payment_wise_sales/._SUCCESS.crc to gs://data_proc_example/data/output/location_gender_payment_wise_sales/._SUCCESS.crc
Copying file://data/output/location_gender_payment_wise_sales/.part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv.crc to gs://data_proc_example/data/output/location_gender_payment_wise_sales/.part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv.crc
Copying file://data/output/location_gender_payment_wise_sales/part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv to gs://data_proc_example/data/output/location_gender_payment_wise_sales/part-00000-b9efd5bd-79dc-4b7d-b23a-30b8c385fcbc-c000.csv
Copying file://data/output/location_gender_payment_wise_sales/_SUCCESS to gs://data_proc_example/data/output/location_gender_payment_wise_sales/_SUCCESS
Completed files 6/6 | 437.8kiB/437.8kiB

Average throughput: 1.2MiB/s

Changes into Java Classes for Make is GCP compatible

Create new file named GCPMain.java, And Point Out Datasource file location to gcp buckets, Output report also into GCP Storage Bucket

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;

public class GCPMain {

public static void main(String[] args){
run_spark();
}

private static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("spark-data-proc-example")
.master("yarn")
.getOrCreate();
}
private static void run_spark() {
SparkSession spark = getSparkSession();
spark.sparkContext().setLogLevel("WARN");
String input_file = "gs://data_proc_example/data/input/shopping_trends_updated.csv"
// Load the CSV file from Google Storage Bucket
Dataset<Row> df = spark.read()
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load(input_file);
df.printSchema();
df.show(50, false);

Dataset<Row> df2 = df.groupBy("Location","Gender", "Payment Method")
.agg(
sum("Purchase Amount (USD)").alias("Total Purchase (USD)"),
round(avg("Purchase Amount (USD)"), 2).alias("Average Purchase (USD)"),
max("Purchase Amount (USD)").alias("Max Purchase Amount (USD)"),
min("Purchase Amount (USD)").alias("Min Purchase Amount (USD)"),
count(lit(1)).alias("count")
);

String output_file = "gs://data_proc_example/data/output/shopping_trends_updated.csv"

df2.printSchema();
df2.show(false);
// location get client information
df2.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.format("csv")
.option("header", true)
.save(output_file);
}

}

Build the Jar again with Maven command

> mvn --version
> mvn clean
> mvn install
> mvn package
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ location_wise_customer ---
[INFO] Building jar: <APP_ROOT>/target/location_wise_customer-1.0-SNAPSHO

Upload Jar file into GCP storage bucket

> gcloud storage cp target/  \
gs://data_proc_example/bin --recursive

Copying file://target/location_wise_customer-1.0-SNAPSHOT.jar to gs://data_proc_example/bin/target/location_wise_customer-1.0-SNAPSHOT.jar
Copying file://target/classes/org/example/GCPMain.class to gs://data_proc_example/bin/target/classes/org/example/GCPMain.class
Copying file://target/classes/org/example/Main.class to gs://data_proc_example/bin/target/classes/org/example/Main.class
Copying file://target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst
Copying file://target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst to gs://data_proc_example/bin/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
Copying file://target/maven-archiver/pom.properties to gs://data_proc_example/bin/target/maven-archiver/pom.properties
Completed files 8/8 | 12.7kiB/12.7kiB

Average throughput: 28.9kiB/s

Create the DataProc Cluster for runing Spark Job

> gcloud dataproc \
clusters create data-proc-example \
--region=us-east1

Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/data-proc-example] Cluster placed in zone [us-east1-b]

Run Jar file as job in Dataproc

> export GS_BUCKET=gs://data_proc_example/bin
> export JAR_FILE=location_wise_customer-1.0-SNAPSHOT.jar
> gcloud dataproc jobs submit spark \
--region us-east1 \
--cluster data-proc-example \
--class org.example.GCPMain \
--jars ${GS_BUCKET}/${JAR_FILE} \
--async

Job [d066b88905fd4cfe9eeafbc34e410107] submitted.
driverControlFilesUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/1d962433-a6a2-4b17-a255-a9d6fa1bcc37/jobs/d066b88905fd4cfe9eeafbc34e410107/
driverOutputResourceUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/1d962433-a6a2-4b17-a255-a9d6fa1bcc37/jobs/d066b88905fd4cfe9eeafbc34e410107/driveroutput
jobUuid: 8773d8bf-35e9-3232-8815-9c37d800b0f6
placement:
clusterName: data-proc-example
clusterUuid: 1d962433-a6a2-4b17-a255-a9d6fa1bcc37
reference:
jobId: d066b88905fd4cfe9eeafbc34e410107
projectId: data-science-project-405301
sparkJob:
jarFileUris:
- gs://data_proc_example/bin/location_wise_customer-1.0-SNAPSHOT.jar
mainClass: org.example.GCPMain
status:
state: PENDING
stateStartTime: '2023-11-24T05:45:01.695656Z'

Job file explorer into GCP console DataProc Service

Details explorer for Each Job into DataProc Console

Check file is created into dataproc from storage bucket

All codebase is saved into following git repository

--

--

Tariqul Islam
Tariqul Islam

Written by Tariqul Islam

Tariqul Islam have 9+ years of software development experience. He knows Python, Node Js and Java, C# , PHP.

No responses yet