PySpark and GCP Dataproc Sample(Basic Understanding)

Tariqul Islam
5 min readNov 17, 2023

--

Dataproc GCP service, which provide the cluster with hadoop,hive, metastore and spark pre installed.Which is a pay as you go service, this service is less expensive and faster to run any data analysis related and spark and hadoop related task. In this article, I will discuss about basic usage of DataProc and Storage usages of GCP service.

  1. gCloud Installed into Local Development
  2. GCP(Google Cloud Platform) account
  3. Enable the Storage and Data Proc Api Service in GCP

Installation of gCloud

Enable Storage and Dataproc api service in GCP

Create the Cluster

> gcloud dataproc clusters create spark-sample-test --region=us-east1
Waiting on operation [projects/data-science-project-405301/regions/us-east1/operations/cdcbab63-1fb0-35df-818f-35721315e455].
Waiting for cluster creation operation...⠧
WARNING: No image specified. Using the default image version. It is recommended to select a specific image version in production, as the default image version may change at any time.
WARNING: Consider using Auto Zone rather than selecting a zone manually. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/auto-zone
WARNING: Permissions are missing for the default service account '355538152299-compute@developer.gserviceaccount.com', missing permissions: [storage.buckets.get, storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list, storage.objects.update] on the project 'projects/data-science-project-405301'. This usually happens when a custom resource (ex: custom staging bucket) or a user-managed VM Service account has been provided and the default/user-managed service account hasn't been granted enough permissions on the resource. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/service-accounts#VM_service_account.
WARNING: The firewall rules for specified network or subnetwork would allow ingress traffic from 0.0.0.0/0, which could be a security risk.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/spark-sample-test] Cluster placed in zone [us-east1-b].

Create the folder

> mkdir -p dataproc_example
> cd dataproc_example
> touch spark_session_test.py

Create pyspark python script

from pyspark.sql import SparkSession

class RunSparkExampleInDataProc:

def run_spark(self):
spark = SparkSession.builder\
.appName("SparkByExample")\
.getOrCreate()

simpleData=(("Java", 4000, 5), \
("Python", 4600, 10), \
("Scala", 4500, 5),\
("Kotlin", 5000, 3))

columns = ["CourseName", "fee", "discount"]


df = spark.createDataFrame(data=simpleData, schema= columns)
df.printSchema()
df.show(truncate=False)


if __name__ == '__main__':
run_spark = RunSparkExampleInDataProc()
run_spark.run_spark()

Submit Python Script file in dataproc cluster

> gcloud dataproc jobs submit pyspark \
> --cluster=spark-sample-test \
> --region=us-east1 spark_session_test.py

Job [33defe79aeb44389b50830fd3798046e] submitted.
Waiting for job output...
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/11/16 14:09:09 INFO org.sparkproject.jetty.util.log: Logging initialized @4652ms to org.sparkproject.jetty.util.log.Slf4jLog
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_392-b08
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.Server: Started @4778ms
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@461e3540{HTTP/1.1, (http/1.1)}{0.0.0.0:33493}
23/11/16 14:09:11 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-sample-test-m/10.142.0.9:8032
23/11/16 14:09:11 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at spark-sample-test-m/10.142.0.9:10200
23/11/16 14:09:13 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
23/11/16 14:09:13 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
23/11/16 14:09:14 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1700143087224_0001
23/11/16 14:09:15 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-sample-test-m/10.142.0.9:8030
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=492; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history
23/11/16 14:09:18 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_mkdirs. latencyMs=309; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_create. latencyMs=404; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history/application_1700143087224_0001.inprogress
root
|-- CourseName: string (nullable = true)
|-- fee: long (nullable = true)
|-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java |4000|5 |
|Python |4600|10 |
|Scala |4500|5 |
|Kotlin |5000|3 |
+----------+----+--------+

23/11/16 14:09:31 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@461e3540{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
Job [33defe79aeb44389b50830fd3798046e] finished successfully.
done: true
driverControlFilesUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/
driverOutputResourceUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/driveroutput
jobUuid: 8d870012-f88a-36bc-b7a8-db219df17b25
placement:
clusterName: spark-sample-test
clusterUuid: b5ce9726-8717-4459-aa94-adb7e99477b9
pysparkJob:
mainPythonFileUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/staging/spark_session_test.py
reference:
jobId: 33defe79aeb44389b50830fd3798046e
projectId: data-science-project-405301
status:
state: DONE
stateStartTime: '2023-11-16T14:09:33.856800Z'
statusHistory:
- state: PENDING
stateStartTime: '2023-11-16T14:09:02.969480Z'
- state: SETUP_DONE
stateStartTime: '2023-11-16T14:09:03.012052Z'
- details: Agent reported job success
state: RUNNING
stateStartTime: '2023-11-16T14:09:03.344825Z'
yarnApplications:
- name: SparkByExample
progress: 1.0
state: FINISHED
trackingUrl: http://spark-sample-test-m:8088/proxy/application_1700143087224_0001/
When we run the job command with gcloud
GCP will automatically upload the file local
file into generated bucket for cluster localtion
> gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/staging/spark_session_test.py
And run the script from cluster location

Check Job Log into Dataproc Dashboard

Delete Cluster

gcloud dataproc  clusters list --region=us-east1
NAME PLATFORM PRIMARY_WORKER_COUNT SECONDARY_WORKER_COUNT STATUS ZONE SCHEDULED_DELETE
spark-sample-test GCE 2 RUNNING us-east1-b
 > gcloud dataproc  clusters  delete spark-sample-test --region=us-east1
The cluster 'spark-sample-test' and all attached disks will be deleted.

Do you want to continue (Y/n)? Y

Waiting on operation [projects/data-science-project-405301/regions/us-east1/operations/e83260a2-a644-3320-8a7d-ef8e7d3f7ccb].
Waiting for cluster deletion operation...done.
Deleted [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/spark-sample-test].

Delete the related buckets for cluster

> gcloud storage ls
gs://dataproc-staging-us-east1-355538152299-exg82lew/
gs://dataproc-temp-us-east1-355538152299-drh1lt8z
> gcloud storage rm --recursive gs://dataproc-staging-us-east1-355538152299-exg82lew/
> gcloud storage rm --recursive gs://dataproc-staging-us-east1-355538152299-exg82lew/

--

--

Tariqul Islam

Tariqul Islam have 9+ years of software development experience. He knows Python, Node Js and Java, C# , PHP.