PySpark and GCP Dataproc Sample(Basic Understanding)
5 min readNov 17, 2023
Dataproc GCP service, which provide the cluster with hadoop,hive, metastore and spark pre installed.Which is a pay as you go service, this service is less expensive and faster to run any data analysis related and spark and hadoop related task. In this article, I will discuss about basic usage of DataProc and Storage usages of GCP service.
- gCloud Installed into Local Development
- GCP(Google Cloud Platform) account
- Enable the Storage and Data Proc Api Service in GCP
Installation of gCloud
Enable Storage and Dataproc api service in GCP
Create the Cluster
> gcloud dataproc clusters create spark-sample-test --region=us-east1
Waiting on operation [projects/data-science-project-405301/regions/us-east1/operations/cdcbab63-1fb0-35df-818f-35721315e455].
Waiting for cluster creation operation...⠧
WARNING: No image specified. Using the default image version. It is recommended to select a specific image version in production, as the default image version may change at any time.
WARNING: Consider using Auto Zone rather than selecting a zone manually. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/auto-zone
WARNING: Permissions are missing for the default service account '355538152299-compute@developer.gserviceaccount.com', missing permissions: [storage.buckets.get, storage.objects.create, storage.objects.delete, storage.objects.get, storage.objects.list, storage.objects.update] on the project 'projects/data-science-project-405301'. This usually happens when a custom resource (ex: custom staging bucket) or a user-managed VM Service account has been provided and the default/user-managed service account hasn't been granted enough permissions on the resource. See https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/service-accounts#VM_service_account.
WARNING: The firewall rules for specified network or subnetwork would allow ingress traffic from 0.0.0.0/0, which could be a security risk.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/spark-sample-test] Cluster placed in zone [us-east1-b].
Create the folder
> mkdir -p dataproc_example
> cd dataproc_example
> touch spark_session_test.py
Create pyspark python script
from pyspark.sql import SparkSession
class RunSparkExampleInDataProc:
def run_spark(self):
spark = SparkSession.builder\
.appName("SparkByExample")\
.getOrCreate()
simpleData=(("Java", 4000, 5), \
("Python", 4600, 10), \
("Scala", 4500, 5),\
("Kotlin", 5000, 3))
columns = ["CourseName", "fee", "discount"]
df = spark.createDataFrame(data=simpleData, schema= columns)
df.printSchema()
df.show(truncate=False)
if __name__ == '__main__':
run_spark = RunSparkExampleInDataProc()
run_spark.run_spark()
Submit Python Script file in dataproc cluster
> gcloud dataproc jobs submit pyspark \
> --cluster=spark-sample-test \
> --region=us-east1 spark_session_test.py
Job [33defe79aeb44389b50830fd3798046e] submitted.
Waiting for job output...
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/11/16 14:09:09 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/11/16 14:09:09 INFO org.sparkproject.jetty.util.log: Logging initialized @4652ms to org.sparkproject.jetty.util.log.Slf4jLog
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_392-b08
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.Server: Started @4778ms
23/11/16 14:09:10 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@461e3540{HTTP/1.1, (http/1.1)}{0.0.0.0:33493}
23/11/16 14:09:11 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-sample-test-m/10.142.0.9:8032
23/11/16 14:09:11 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at spark-sample-test-m/10.142.0.9:10200
23/11/16 14:09:13 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
23/11/16 14:09:13 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
23/11/16 14:09:14 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1700143087224_0001
23/11/16 14:09:15 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-sample-test-m/10.142.0.9:8030
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=492; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history
23/11/16 14:09:18 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_mkdirs. latencyMs=309; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history
23/11/16 14:09:18 INFO com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics: Detected potential high latency for operation op_create. latencyMs=404; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-east1-355538152299-drh1lt8z/b5ce9726-8717-4459-aa94-adb7e99477b9/spark-job-history/application_1700143087224_0001.inprogress
root
|-- CourseName: string (nullable = true)
|-- fee: long (nullable = true)
|-- discount: long (nullable = true)
+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java |4000|5 |
|Python |4600|10 |
|Scala |4500|5 |
|Kotlin |5000|3 |
+----------+----+--------+
23/11/16 14:09:31 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@461e3540{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
Job [33defe79aeb44389b50830fd3798046e] finished successfully.
done: true
driverControlFilesUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/
driverOutputResourceUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/driveroutput
jobUuid: 8d870012-f88a-36bc-b7a8-db219df17b25
placement:
clusterName: spark-sample-test
clusterUuid: b5ce9726-8717-4459-aa94-adb7e99477b9
pysparkJob:
mainPythonFileUri: gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/staging/spark_session_test.py
reference:
jobId: 33defe79aeb44389b50830fd3798046e
projectId: data-science-project-405301
status:
state: DONE
stateStartTime: '2023-11-16T14:09:33.856800Z'
statusHistory:
- state: PENDING
stateStartTime: '2023-11-16T14:09:02.969480Z'
- state: SETUP_DONE
stateStartTime: '2023-11-16T14:09:03.012052Z'
- details: Agent reported job success
state: RUNNING
stateStartTime: '2023-11-16T14:09:03.344825Z'
yarnApplications:
- name: SparkByExample
progress: 1.0
state: FINISHED
trackingUrl: http://spark-sample-test-m:8088/proxy/application_1700143087224_0001/
When we run the job command with gcloud
GCP will automatically upload the file local
file into generated bucket for cluster localtion
> gs://dataproc-staging-us-east1-355538152299-exg82lew/google-cloud-dataproc-metainfo/b5ce9726-8717-4459-aa94-adb7e99477b9/jobs/33defe79aeb44389b50830fd3798046e/staging/spark_session_test.py
And run the script from cluster location
Check Job Log into Dataproc Dashboard
Delete Cluster
gcloud dataproc clusters list --region=us-east1
NAME PLATFORM PRIMARY_WORKER_COUNT SECONDARY_WORKER_COUNT STATUS ZONE SCHEDULED_DELETE
spark-sample-test GCE 2 RUNNING us-east1-b
> gcloud dataproc clusters delete spark-sample-test --region=us-east1
The cluster 'spark-sample-test' and all attached disks will be deleted.
Do you want to continue (Y/n)? Y
Waiting on operation [projects/data-science-project-405301/regions/us-east1/operations/e83260a2-a644-3320-8a7d-ef8e7d3f7ccb].
Waiting for cluster deletion operation...done.
Deleted [https://dataproc.googleapis.com/v1/projects/data-science-project-405301/regions/us-east1/clusters/spark-sample-test].
Delete the related buckets for cluster
> gcloud storage ls
gs://dataproc-staging-us-east1-355538152299-exg82lew/
gs://dataproc-temp-us-east1-355538152299-drh1lt8z
> gcloud storage rm --recursive gs://dataproc-staging-us-east1-355538152299-exg82lew/
> gcloud storage rm --recursive gs://dataproc-staging-us-east1-355538152299-exg82lew/