Spark on Kubernetes, A Practitioner’s Guide
Introduction
This article discusses the use case for Kubernetes AKA k8s for the deployment of Spark, which is an open-source distributed computing framework. One can utilize code developed in Python, Scala, SQL and others to create applications that can process large amount of data. Spark uses parallel processing to distribute the work-load across a cluster of physical hosts, for example, nodes of Kubernetes cluster.
In this respect Spark has the following cluster managers where the cluster manager is responsible for managing and allocating resources for the cluster of nodes on which the Spark application runs
I will explain the practical implementation of Spark on Kubernetes (Spark-on-k8s) and will leverage Google Kubernetes Engine (GKE) (Google was first to introduce and then open source Kubernetes) and show steps in implementing Spark on GKE cluster-manager by creating a simple yet effective example that uses PySpark to generate random test data in Spark, take that data and post it to Google BigQuery Data Warehouse table.
GitHub reference for the stuff covered in this article
https://github.com/michTalebzadeh/spark_on_gke
Use Case for Kubernetes
With modern services, users expect applications to be available 24/7, and developers expect to deploy new versions of those applications several times a day through CI/CD pipelines. With these in mind, the focus has been switched to Containerization and means to deploy these containers through Kubernetes clusters. In short Kubernetes is a proper cloud-agnostic solution leader in Docker management.
What is a container
A container https://www.docker.com/resources/what-container is a standard unit of software that packages up code and all its dependencies, so the application runs quickly and reliably from one computing environment to another. A Docker container image is a lightweight, standalone, executable package of software that includes everything needed to run an application: code, runtime, system tools, system libraries and settings. Container images become containers at runtime and in the case of Docker containers, images become containers when they run on docker engines. Containerized software will always run the same, regardless of the infrastructure. Containers isolate software from its environment and ensure that software works uniformly despite differences, for instance between development and staging.
What is the difference between container and docker container image
A container and a Docker container image are related concepts within the realm of containerization, but they refer to different aspects of the technology. Here's a breakdown of their differences:
Container:
Docker Container Image:
In summary, a container is the runtime instance of a software package that encapsulates an application and its dependencies, while a Docker container image is a snapshot of an application and its dependencies, which can be instantiated to create running containers. Docker is a widely used containerization platform that allows users to create, manage, and share container images.
The difference between container and microservice
A container is a useful resource allocation and sharing technology. In contrast, a microservice is a software design pattern. So in short:
Note that microservices run within containers.
Microservices operate according to a principle known as bounded context, which is the ability to position a component and its data as a standalone unit or one with limited dependencies. In other words, services are decoupled and function independently. As such, if one web service fails within an application, other services associated with the app will continue to operate as usual.
What is a Kubernetes
?As stated before, Kubernetes https://kubernetes.io/, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications. ?The purpose of Kubernetes in this case is to host your Spark applications in the form of containers in an automated fashion so that you can easily deploy as many instances of your application as required and easily enable communication between different services within your application.
It should be noted that Spark on Kubernetes is work in progress and as of now there are future work outstanding. In this article we would like to leverage on what is on offer currently with Spark on Kubernetes.
What is a pod
Pods https://cloud.google.com/kubernetes-engine/docs/concepts/pod#what_is_a_pod are the smallest, most basic deployable objects in Kubernetes. A Pod represents a single instance of a running process in the Kubernetes cluster. Pods can contain one or more?containers, such as Docker containers.
What is a Docker and a Docker image
Docker https://www.docker.com/ is an open source containerization platform. It?enables developers to package applications into containers. Specifically, Docker?is a set of?platform as a service?(PaaS) products that use?OS-level virtualization?to deliver software in packages called containers.
A Docker image is a file created to execute code in a Docker container. A Docker image contains application code, libraries, tools, dependencies, and other files needed to make an application run and can have considerable size. It is the heart of what is deployed in the Kubernetes Cluster.
The Docker image role
Kubernetes requires users to supply images that can be deployed into containers within pods.?You will need to install Docker Community Edition (CE) or similar.https://docs.docker.com/engine/install/ubuntu/ on your host that will be creating docker images. Docker images will be used for Spark Driver and Spark Executors. This will be covered further down.
?Building and preparing a docker image
?Before building this file, consideration should be given to the following:
?1.??????The version of Spark
2.??????The Scala version
3.??????The Java version
4.??????Packages to be deployed for PySpark
Choosing the version of Spark is important as docker file built should be compatible with the other versions of application to work with. For example, Spark version should be compatible with the database, say if you are going to write to Google BigQuery Data Warehouse https://cloud.google.com/bigquery, what version of Spark will be compatible there? Additionally, does it support Java 8 or Java 11. One can of course build multiple docker files with different versions as needed and, in my opinion, it is the strength of dockerfile that allows multiple versions built.
With Spark, you can build dockerfile for Scala, PySpark and R. In this article, we will not consider dockerfiles for R.?
Spark binaries provide a shell script, docker-image-tool.sh script that can be used to build and publish the Docker images to use with the Kubernetes backend. The shell script is located in $SPARK_HOME/bin
docker-image-tool.sh takes a number of parameters. You can get a list of them by typing $SPARK_HOME/bin/docker-image-tool.sh - h
The parameters of interest are -r, -t, -b and -p.
As stated above, -r specifies the repo, -t is the tag added, -b is build argument to build or push the image, -p specifies the dockerfile to build the image from.
You can create a shell script for different versions of dockerfile. Make sure that you are invoking this script from a correct Spark version. For example, in the script below, The SPARK_VERSION is set to 3.1.1. That should be the binary you are executing i.e. your current Spark version should be 3.1.1
Here, we are using the standard supplied Dockerfile with Spark binaries
BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="Dockerfile"
DOCKERIMAGETAG="11-jre-slim"
# Building Docker image from provided Dockerfile base 11
cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/${DOCKERFILE} \
? ? ? ? ? ? ? ?build
Once the docker image is built it will show as below
docker images
Creating a custom made dockerfile
A custom made dockerfile is often necessary to add additional libraries and be able to create the dockerfile with different versions say Java.
The following dockerfile will create a docker image with Java8 as opposed to Java11. Note the line
ARG java_image_tag=8-jre-slim
ARG java_image_tag=8-jre-slim
FROM openjdk:${java_image_tag}
ARG spark_uid=185
# Before building the docker image, first build and make a Spark distribution following
# the instructions in https://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .
RUN set -ex && \
? ? sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
? ? apt-get update && \
? ? ln -s /lib /lib64 && \
? ? apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \
? ? mkdir -p /opt/spark && \
? ? mkdir -p /opt/spark/examples && \
? ? mkdir -p /opt/spark/work-dir && \
? ? touch /opt/spark/RELEASE && \
? ? rm /bin/sh && \
? ? ln -sv /bin/bash /bin/sh && \
? ? echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
? ? chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
? ? rm -rf /var/cache/apt/*
COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]
# Specify the User that the actual main process will run as
USER ${spark_uid}
docker images
REPOSITORY? ? TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? SIZE
spark/spark? ?3.1.1-scala_2.12-8-jre-slim-buster? ?b9ed343e1d6d? ?10 hours ago? ?599MB
openjdk? ? ? ?8-jre-slim? ? ? ? ? ? ? ? ? ? ? ? ? ?c463ae0fa93d? ?2 weeks ago? ? 194MB
In this case we will be looking at creating a dockerfile for PySpark.
A sample dockerfile for PySpark called Dockerfile is supplied under directory
$SPARK_HOME/kubernetes/dockerfiles/spark/bindings/python
You can create a custom dockefile for PySpark with java 8 as below:
ARG base_img
FROM $base_img
WORKDIR /
# Reset to root to run installation tasks
USER 0
RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
? ? apt install -y python3 python3-pip && \
? ? pip3 install --upgrade pip setuptools && \
? ? # Removed the .cache to save space
? ? rm -r /root/.cache && rm -rf /var/cache/apt/*
RUN mkdir ${SPARK_HOME}/conf
COPY spark-defaults.conf ${SPARK_HOME}/conf/spark-defaults.conf
COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib
ARG spark_uid=185
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
# Specify the User that the actual main process will run as
USER ${spark_uid}
How to use this dockerfile
BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="java8only"
DOCKERIMAGETAG="8-jre-slim"
cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/bindings/python/${DOCKERFILE} \
? ? ? ? ? ? ? ?build
It results in the following image created:
REPOSITORY? ? ? ?TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? ? ? ?SIZ
spark/spark-py? ?3.1.1-scala_2.12-8-jre-slim-buster? ?ebd7beca1ba0? ?About an hour ago? ?951MB
Verifying the image built
You can also log in as root to the image itself and verifying the Java version
docker run -u 0 -it ebd7beca1ba0 bash
root@28dfb9adc1d1:/opt/spark/work-dir# echo $JAVA_HOME
/usr/local/openjdk-8
Getting list of packages installed
pip list
Package? ? Version
---------- -------
pip? ? ? ? 21.3.1
setuptools 59.4.0
wheel? ? ? 0.34.2
if you log in as the default user, you can get a list of environment variables
docker run -it ebd7beca1ba0 bash
185@338a3c6239b4:/opt/spark/work-dir$ env
HOSTNAME=338a3c6239b4
JAVA_HOME=/usr/local/openjdk-8
PWD=/opt/spark/work-dir
HOME=/
LANG=C.UTF-8
TERM=xterm
SHLVL=1
SPARK_HOME=/opt/spark
PATH=/usr/local/openjdk-8/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
JAVA_VERSION=8u312
_=/usr/bin/env
Adding additional Python packages to the Docker image
The most convenient way to add additional packages to the docker image is to add them directly to the docker image at time of creating the image. So external?packages?are bundled as a part of my docker image because it is fixed and if an application requires those set of dependencies every time, they are there.
RUN pip install pyyaml numpy cx_Oracle --no-cache-dir
The --no-cheche-dir option to pip is to prevent the downloaded?binaries from being added to the image, reducing the image size. It is also advisable to install all packages in one line. Every time you put RUN statement it creates an intermediate container and hence it increases build time. So reduce it by putting all packages in one RUN line.
?We will create a custom dockerfile called java8PlusPackages in Python directory
$SPARK_HOME/kubernetes/dockerfiles/spark/bindings/python
The content of this file is shown below:
?cat java8PlusPackages
ARG base_img
FROM $base_img
WORKDIR /
# Reset to root to run installation tasks
USER 0
RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
? ? apt install -y python3 python3-pip && \
? ? pip3 install --upgrade pip setuptools && \
? ? # Removed the .cache to save space
? ? rm -r /root/.cache && rm -rf /var/cache/apt/*
RUN mkdir ${SPARK_HOME}/conf
COPY spark-defaults.conf ${SPARK_HOME}/conf/spark-defaults.conf
RUN pip install pyyaml numpy cx_Oracle --no-cache-dir
COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib
ARG spark_uid=185
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
# Specify the User that the actual main process will run as
USER ${spark_uid}
The shell script used with this dockerfile is shown below:
cat java8PlusPackages.sh
#!/bin/bash
BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="java8PlusPackages"
DOCKERIMAGETAG="8-jre-slim"
cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS}_${DOCKERFILE} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/bindings/python/${DOCKERFILE} \
? ? ? ? ? ? ? ?build
The images are created as below:
docker images
REPOSITORY? ? ? ?TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? ? ? ?SIZE
spark/spark-py? ?3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages? ?f9ddb4b305ef? ?3 minutes ago? ? ? ?1.02GB
spark/spark? ? ? 3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages? ?a8bf56405021? ?About an hour ago? ?599MB
openjdk? ? ? ? ? 8-jre-slim? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?c463ae0fa93d? ?2 weeks ago? ? ? ? ?194MB
Log in to the docker image and check for Python packages installed
docker run -u 0 -it spark/spark-py:3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages bash
root@5bc049af7278:/opt/spark/work-dir# pip list
Package? ? Version
---------- -------
cx-Oracle? 8.3.0
numpy? ? ? 1.21.4
pip? ? ? ? 21.3.1
PyYAML? ? ?6.0
setuptools 59.4.0
wheel? ? ? 0.34.2
Removing unwanted docker images
You can force (-f) a docker image removal by using the following command with IMAGE ID from docker images output:
docker image rm 0f339c43df8f -f
Understanding the Kubernetes Cluster
At the infrastructure level, a Kubernetes cluster (in this case GKE) is comprised of a set of physical or virtual machines, each acting in a specific role. The following diagram depicts this
The?Control Plane controls all the operations and is charged with orchestrating containers that run on all of the?nodes of the cluster. Each node is equipped with a container runtime. The node receives instruction from the Control Plane and then takes actions to either create pods, delete them, or adjust networking rules. Inside the Control Plane, the following components are installed:
Kubernetes Node Components in Detail
To summarize, the node runs the two most important components, the?kubelet?and the?kube-proxy, as well as a container engine in charge of running the containerized applications.
kubelet
The?kubelet?agent handles all communication between the master and the node on which it is running. It receives commands from the master in the form of a?manifest?which defines the workload and the operating parameters. It interfaces with the container runtime that is responsible for creating, starting, and monitoring pods.
The?kubelet?also periodically executes any configured liveness probes and readiness checks. It constantly monitors the state of the pods and, in the event of a problem, launches a new instance instead. The?kubelet?also has an internal HTTP server exposing a read-only view at port 10255. There’s a health check endpoint at?/healthz?and also a few additional status endpoints. For example, we can get a list of running pods at?/pods. We can also get specs of the machine the?kubelet?is running on at?/spec.
cAdvisor
cAdvisor (Container Advisor) is an open-source agent that monitors resource usage and analyzes the performance of containers. Originally created by Google, cAdvisor is now integrated with?kubelet.
The?cAdvisor?instance on each node collects, aggregates, processes, and exports metrics such as CPU, memory, file, and network usage for all running containers. All data is sent to the scheduler to ensure that it knows about the performance and resource usage inside of the node. This information is used to perform various orchestration tasks like scheduling, horizontal pod scaling, and managing container resource limits.
kube-proxy
The?kube-proxy?component runs on each node and proxies UDP, TCP, and SCTP packets. It maintains the network rules on the host and handles transmission of packets between pods, the host, and the outside world. It acts like a network proxy and load balancer for pods.
The?kube-proxy?process stands in between the network Kubernetes is attached to and the pods that are running on that particular node. It is essentially the core networking component of Kubernetes and is responsible for ensuring that communication is maintained efficiently across all elements of the cluster. When a user creates a Kubernetes service object, the?kube-proxy?instance is responsible to translate that object into meaningful rules in the local?iptables?rule set on the worker node.?iptables?is used to translate the virtual IP assigned to the service object to all of the pod IPs mapped by the service.
Container Runtime
The container runtime is responsible for pulling the images (docker images) from public or private registries (in this case Google Container Registry) and running containers based on those images. As previously mentioned,?kubelet?interacts directly with container runtime to start, stop, or delete containers.
Handling the Spark driver and executors creation
Preparing to use Spark with Kubernetes in Google Cloud Platform
According to Spark doc, Spark can run on clusters managed by?Kubernetes. This feature makes use of native Kubernetes scheduler that has been added to Spark. Spark on Kubernetes has been supported since Spark 2.3.
Pre-requisites
Next, you will need to install Kubernetes cluster components. You will broadly need the following
gcloud beta container --project "<PROJECT_ID>" clusters create "spark-demo-gke" --zone "europe-west2-c" --no-enable-basic-auth --cluster-version "1.21.5-gke.1302" --release-channel "regular" --machine-type "e2-medium" --image-type "COS_CONTAINERD" --disk-type "pd-standard" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --max-pods-per-node "110" --num-nodes "3" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/<PROJECT_ID>/global/networks/default" --subnetwork "projects/<PROJECT_ID>/regions/europe-west2/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-shielded-nodes --node-locations "europe-west2-
WARNING: The Pod address range limits the maximum size of the cluster. Please refer to https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr to learn how to optimize IP address allocation.
Creating cluster spark-demo-gke in europe-west2-c...done.
Created
?
[https://container.googleapis.com/v1beta1/projects/<PROJECT_ID>/zones/europe-west2-c/clusters/spark-demo-gke].
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/europe-west2-c/spark-demo-gke?project=<PROJECT_ID>
kubeconfig entry generated for spark-demo-gke.
NAME? ? ? ? ? ? LOCATION? ? ? ? MASTER_VERSION? ?MASTER_IP? ? MACHINE_TYPE? NODE_VERSION? ? ?NUM_NODES? STATUS
spark-demo-gke? europe-west2-c? 1.21.5-gke.1302? 35.246.3.22? e2-medium? ? ?1.21.5-gke.1302? 3? ? ? ? ? RUNNING
In our case we chose for Kubernetes nodes, E2 standard?machines that have?4?GB of system memory per VCPU giving 4 VPCU and 16384MB of RAM on each node of the cluster.
Dependency Management
?Copying your docker images to Google container Registry
In order t be able to use your docker image in GKE, you will need to follow the documentation on how to create and push your docker image to Google Container Registry https://cloud.google.com/container-registry/docs/pushing-and-pulling.
docker push eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
Once you have done so you can get the images created as below:
gcloud container images list-tags eu.gcr.io/<PROJECT_ID>/spark-pyDIGEST? ? ? ? TAGS? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TIMESTAM680242a8def4? 3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages? 2021-12-17T09:32:13P
Making the project and py file available through spark-submit
Some of this for PySpark is discussed in https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html .
For your project you ill need to create a zip file at the root directory of your project as below:
Assuming that the root directory of your project is called spark_on_gke and it resides under $CODE_DIRECTORY, create the zip file of the project as shown below. You also need to copy your python application file ${APPLICATION} to the same location.
source_code="spark_on_gke"
CURRENT_DIRECTORY=`pwd`
CODE_DIRECTORY="/home/hduser/dba/bin/python/"
CODE_DIRECTORY_CLOUD=" gs://<YOUR_BUCKET>/codes/"
cd $CODE_DIRECTORY
[ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
echo `date` ", ===> creating source zip directory from? ${source_code}"
# zip needs to be done at root directory of code
zip -rq ${source_code}.zip ${source_code}
gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION} $CODE_DIRECTORY_CLOUD
Getting the correct credentials
You need to get authentication under the account on VM host for Spark to work with Kubernetes, read and write to storage bucket and be able to write to Google BigQuery database. In the following example, my GKE cluster is called spark-on-gke
Setup the correct authority with gcloud init or gcloud auth login
gcloud config set compute/zone $ZONE. For example
gcloud config set compute/zone europe-west2-c
Get the project ID
export PROJECT=$(gcloud info --format='value(config.project)')
Get GKE credentials
gcloud container clusters get-credentials spark-on-gke --zone $ZONE
Get Kubernetes Master IP
?export KUBERNETES_MASTER_IP=$(gcloud container clusters list --filter name=spark-on-gke --format='value(MASTER_IP)')
A typical spark-submit for Kubernetes is shown below. This is run on VM host where spark binaries are installed.
? ? ? ? spark-submit --verbose \
? ? ? ? ? ?--properties-file ${property_file} \
? ? ? ? ? ?--master k8s://https://$KUBERNETES_MASTER_IP:443 \
? ? ? ? ? ?--deploy-mode cluster \
? ? ? ? ? ?--name sparkBQ \
? ? ? ? ? ?--py-files $CODE_DIRECTORY/spark_on_gke.zip \
? ? ? ? ? ?--conf spark.kubernetes.namespace=$NAMESPACE \
? ? ? ? ? ?--conf spark.network.timeout=300 \
? ? ? ? ? ?--conf spark.executor.instances=$NEXEC \
? ? ? ? ? ?--conf spark.kubernetes.allocation.batch.size=3 \
? ? ? ? ? ?--conf spark.kubernetes.allocation.batch.delay=1 \
? ? ? ? ? ?--conf spark.driver.cores=3 \
? ? ? ? ? ?--conf spark.executor.cores=3 \
? ? ? ? ? ?--conf spark.driver.memory=8192m \
? ? ? ? ? ?--conf spark.executor.memory=8192m \
? ? ? ? ? ?--conf spark.dynamicAllocation.enabled=true \
? ? ? ? ? ?--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
? ? ? ? ? ?--conf spark.kubernetes.driver.container.image=${IMAGEDRIVER} \
? ? ? ? ? ?--conf spark.kubernetes.executor.container.image=${IMAGEDRIVER} \
? ? ? ? ? ?--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
? ? ? ? ? ?--conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
? ? ? ? ? ?--conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
? ? ? ? ? ?$CODE_DIRECTORY/${APPLICATION}
Explanatory notes on spark.driver.cores and spark.executor.cores.
In this model, we want to fit exactly one Spark executor pod per Kubernetes node, meaning that with VCPU = 4 for each Kubernetes node, we should as per classic resource allocation, leave one VCPU for OS itself and set both these two parameters to 3
The PySpark code to generate random data
The PySpark code to generate random data is shown below. It is pretty self explanatory. Once the random values are generated, the underlying RDD is converted into Spark Dataframe with a predefined schema and data is saved to BigQuery database table test.randomData.
from __future__ import print_function
import sys
from spark_on_gke.src.configure import config
from pyspark.sql import functions as F
from pyspark.sql.functions import col, round, current_timestamp, lit
from spark_on_gke.sparkutils import sparkstuff as s
from spark_on_gke.othermisc import usedFunctions as uf
from pyspark.sql.types import *
import datetime
import time
def main():
? ? start_time = time.time()
? ? appName = "RandomDataBigQuery"
? ? spark_session = s.spark_session(appName)
? ? spark_session = s.setSparkConfBQ(spark_session)
? ? spark_context = s.sparkcontext()
? ? spark_context.setLogLevel("ERROR")
? ? lst = (spark_session.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()
? ? print("\nStarted at");uf.println(lst)
? ? randomdatabq = RandomData(spark_session, spark_context)
? ? dfRandom = randomdatabq.generateRamdomData()
? ? #dfRandom.printSchema()
? ? #dfRandom.show(20, False)
? ? randomdatabq.loadIntoBQTable(dfRandom)
? ? lst = (spark_session.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()
? ? print("\nFinished at");uf.println(lst)
? ? end_time = time.time()
? ? time_elapsed = (end_time - start_time)
? ? print(f"""Elapsed time in seconds is {time_elapsed}""")
? ? spark_session.stop()
class RandomData:
? ? def __init__(self, spark_session, spark_context):
? ? ? ? self.spark = spark_session
? ? ? ? self.sc = spark_context
? ? ? ? self.config = config
? ? ? ? self.values = dict()
? ? def readDataFromBQTable(self):
? ? ? ? dataset = "test"
? ? ? ? tableName = "randomData"
? ? ? ? fullyQualifiedTableName = dataset+'.'+tableName
? ? ? ? read_df = s.loadTableFromBQ(self.spark, dataset, tableName)
? ? ? ? return read_df
? ? def getValuesFromBQTable(self):
? ? ? ? read_df = self.readDataFromBQTable()
? ? ? ? read_df.createOrReplaceTempView("tmp_view")
? ? ? ? rows = self.spark.sql("SELECT COUNT(1) FROM tmp_view").collect()[0][0]
? ? ? ? maxID = self.spark.sql("SELECT MAX(ID) FROM tmp_view").collect()[0][0]
? ? ? ? return {"rows":rows,"maxID":maxID}
? ? def generateRamdomData(self):
? ? ? ? self.values = self.getValuesFromBQTable()
? ? ? ? rows = self.values["rows"]
? ? ? ? maxID = self.values["maxID"]
? ? ? ? start = 0
? ? ? ? if (rows == 0):
? ? ? ? ? start = 1
? ? ? ? else:
? ? ? ? ? start = maxID + 1
? ? ? ? numRows = config['GCPVariables']['numRows']
? ? ? ? print(numRows)
? ? ? ? end = start + numRows
? ? ? ? print("starting at ID = ", start, ",ending on = ", end)
? ? ? ? Range = range(start, end)
? ? ? ? ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
? ? ? ? rdd = self.sc.parallelize(Range). \
? ? ? ? ? ? map(lambda x: (x, uf.clustered(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.scattered(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.randomised(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.randomString(50), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.padString(x, " ", 50),
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.padSingleChar("x", 50)))
? ? ? ? Schema = StructType([ StructField("ID", IntegerType(), False),
? ? ? ? ? ? ? ? ? ? ? StructField("CLUSTERED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("SCATTERED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("RANDOMISED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("RANDOM_STRING", StringType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("SMALL_VC", StringType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("PADDING", StringType(), True)
? ? ? ? ? ? ? ? ? ? ])
? ? ? ? df= self.spark.createDataFrame(rdd, schema = Schema)
? ? ? ? df = df. \
? ? ? ? ? ? ?withColumn("op_type", lit(config['MDVariables']['op_type'])). \
? ? ? ? ? ? ?withColumn("op_time", current_timestamp())
? ? ? ? #df.printSchema()
? ? ? ? #df.show(100,False)
? ? ? ? return df
? ? def loadIntoBQTable(self, df2):
? ? ? ? # write to BigQuery table
? ? ? ? dataset = "test"
? ? ? ? tableName = "randomData"
? ? ? ? fullyQualifiedTableName = dataset+'.'+tableName
? ? ? ? print(f"""\n writing to BigQuery table {fullyQualifiedTableName}""")
? ? ? ? s.writeTableToBQ(df2,"append",dataset,tableName)
? ? ? ? print(f"""\n Populated BigQuery table {fullyQualifiedTableName}""")
? ? ? ? print("\n rows written is ",? df2.count())
? ? ? ? print(f"""\n Reading from BigQuery table {fullyQualifiedTableName}\n""")
? ? ? ? # read data to ensure all loaded OK
? ? ? ? read_df = s.loadTableFromBQ(self.spark, dataset, tableName)
? ? ? ? new_rows = read_df.count()
? ? ? ? print("\n rows read in is ",? new_rows)
? ? ? ? read_df.select("ID").show(20,False)
? ? ? ? ## Tally the number of rows in BigQuery table with what is expected after adding new rows
? ? ? ? numRows = config['GCPVariables']['numRows']
? ? ? ? if (new_rows - self.values["rows"] - numRows)? == 0:
? ? ? ? ? print("\nRows tally in BigQuery table")
? ? ? ? else:
? ? ? ? ? print("\nRows do not tally in BigQuery table")
if __name__ == "__main__":
? main()
Submitting the job and observing the output
The Spark job is submitted through the shell script deployment/src/scripts/gke.sh. See the GitHub link.
It is submitted by passing the mode -M and the name PySpark application -A as below
./gke.sh -M gcp -A RandomDataBigQuery.py
First we need to copy the zipped project directory spark_on_gke.zip and the application code RandomDataBigQuery.py to cloud storage
Fri Dec 17 10:24:27 UTC 2021 , ===> creating source zip directory from? spark_on_gke
Copying file://spark_on_gke.zip [Content-Type=application/zip]...
- [1 files][? 1.9 MiB/? 1.9 MiB]
Operation completed over 1 objects/1.9 MiB.
Copying file:///home/hduser/dba/bin/python/spark_on_gke/src/RandomDataBigQuery.py [Content-Type=text/x-python]...
/ [1 files][? 4.9 KiB/? 4.9 KiB]
Operation completed over 1 objects/4.9 KiB.
Then we need to resize the cluster with 3 nodes (this is needed as at the end we resize k8s cluster to zero nodes). From then onwards the driver container is created followed by the executor containers. We can track the progress of pod creation further down
Fri Dec 17 10:24:31 UTC 2021 , ===> Submitting spark job
Fri Dec 17 10:24:31 UTC 2021 , ===> Resizing GKE cluster with 3 nodes
Resizing spark-on-gke...done.
Updated [https://container.googleapis.com/v1/projects/<PROJECT_ID>/zones/europe-west2-c/clusters/spark-on-gke].
Updated property [compute/zone].
Fetching cluster endpoint and auth data.
kubeconfig entry generated for spark-on-gke.
Fri Dec 17 10:25:32 UTC 2021 , ===> Starting spark-submit
21/12/17 10:25:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/12/17 10:25:36 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
21/12/17 10:25:37 INFO KerberosConfDriverFeatureStep: You have not specified a krb5.conf file locally or via a ConfigMap. Make sure that you have the krb5.conf locally on the driver image.
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Pending
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: waiting
? ? ? ? ? ? ? ? ?pending reason: ContainerCreating
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: Waiting for application sparkBQ with submission ID spark:sparkbq-18871d7dc7ecfa41-driver to finish...
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Pending
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: waiting
? ? ? ? ? ? ? ? ?pending reason: ContainerCreating
21/12/17 10:25:39 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Pending)
21/12/17 10:26:17 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Pending)
21/12/17 10:26:17 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Running
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: running
? ? ? ? ? ? ? ? ?container started at: 2021-12-17T10:26:17Z
21/12/17 10:26:18 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Running)
21/12/17 10:27:47 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Running)
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Succeeded
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: terminated
? ? ? ? ? ? ? ? ?container started at: 2021-12-17T10:26:17Z
? ? ? ? ? ? ? ? ?container finished at: 2021-12-17T10:27:47Z
? ? ? ? ? ? ? ? ?exit code: 0
? ? ? ? ? ? ? ? ?termination reason: Completed
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Succeeded)
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Container final statuses:
? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ?container state: terminated
? ? ? ? ?container started at: 2021-12-17T10:26:17Z
? ? ? ? ?container finished at: 2021-12-17T10:27:47Z
? ? ? ? ?exit code: 0
? ? ? ? ?termination reason: Completed
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Application sparkBQ with submission ID spark:sparkbq-18871d7dc7ecfa41-driver finished
21/12/17 10:27:48 INFO ShutdownHookManager: Shutdown hook called
21/12/17 10:27:48 INFO ShutdownHookManager: Deleting directory /tmp/spark-3c107d80-f03b-4914-9303-ed85f31626a7
Fri Dec 17 10:27:48 UTC 2021 , ===> Completed spark-submit
Name:? ? ? ? ?sparkbq-18871d7dc7ecfa41-driver
Namespace:? ? spark
Priority:? ? ?0
Node:? ? ? ? ?gke-spark-on-gke-default-pool-75faa226-6327/10.154.15.218
Start Time:? ?Fri, 17 Dec 2021 10:25:37 +0000
Labels:? ? ? ?spark-app-selector=spark-b304270f8f6a4c99bfcdbad5a8826eb9
? ? ? ? ? ? ? spark-role=driver
Annotations:? <none>
Status:? ? ? ?Succeeded
IP:? ? ? ? ? ?10.64.1.5
IPs:
? IP:? 10.64.1.5
Containers:
? spark-kubernetes-driver:
? ? Container ID:? containerd://90c63e798de626c8dad0a87004994e89c427cd64996201585dfe318fc5d4b66c
? ? Image:? ? ? ? ?eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? Image ID:? ? ? eu.gcr.io/<PROJECT_ID>/spark-py@sha256:680242a8def48c9b52ae29e0de586096b9b0846a6d68407679d96c3860116f35
? ? Ports:? ? ? ? ?7078/TCP, 7079/TCP, 4040/TCP
? ? Host Ports:? ? 0/TCP, 0/TCP, 0/TCP
? ? Args:
? ? ? driver
? ? ? --properties-file
? ? ? /opt/spark/conf/spark.properties
? ? ? --class
? ? ? org.apache.spark.deploy.PythonRunner
? ? ? gs://<PROJECT_ID>-spark-on-k8s/codes/RandomDataBigQuery.py
? ? State:? ? ? ? ? Terminated
? ? ? Reason:? ? ? ?Completed
? ? ? Exit Code:? ? 0
? ? ? Started:? ? ? Fri, 17 Dec 2021 10:26:17 +0000
? ? ? Finished:? ? ?Fri, 17 Dec 2021 10:27:47 +0000
? ? Ready:? ? ? ? ? False
? ? Restart Count:? 0
? ? Limits:
? ? ? memory:? 11468Mi
? ? Requests:
? ? ? cpu:? ? ?3
? ? ? memory:? 11468Mi
? ? Environment:
? ? ? SPARK_USER:? ? ? ? ? ? ? ? ? ? ? hduser
? ? ? SPARK_APPLICATION_ID:? ? ? ? ? ? spark-b304270f8f6a4c99bfcdbad5a8826eb9
? ? ? GOOGLE_APPLICATION_CREDENTIALS:? /mnt/secrets/spark-sa.json
? ? ? GCS_PROJECT_ID:? ? ? ? ? ? ? ? ? <PROJECT_ID>
? ? ? SPARK_DRIVER_BIND_ADDRESS:? ? ? ? (v1:status.podIP)
? ? ? PYSPARK_PYTHON:? ? ? ? ? ? ? ? ? /usr/bin/python3
? ? ? PYSPARK_DRIVER_PYTHON:? ? ? ? ? ?/usr/bin/python3
? ? ? SPARK_LOCAL_DIRS:? ? ? ? ? ? ? ? /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e
? ? ? SPARK_CONF_DIR:? ? ? ? ? ? ? ? ? /opt/spark/conf
? ? Mounts:
? ? ? /mnt/secrets from spark-sa-volume (rw)
? ? ? /opt/spark/conf from spark-conf-volume-driver (rw)
? ? ? /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e from spark-local-dir-1 (rw)
? ? ? /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-vktpv (ro)
Conditions:
? Type? ? ? ? ? ? ? Status
? Initialized? ? ? ?True
? Ready? ? ? ? ? ? ?False
? ContainersReady? ?False
? PodScheduled? ? ? True
Volumes:
? spark-sa-volume:
? ? Type:? ? ? ? Secret (a volume populated by a Secret)
? ? SecretName:? spark-sa
? ? Optional:? ? false
? spark-local-dir-1:
? ? Type:? ? ? ?EmptyDir (a temporary directory that shares a pod's lifetime)
? ? Medium:
? ? SizeLimit:? <unset>
? spark-conf-volume-driver:
? ? Type:? ? ? ConfigMap (a volume populated by a ConfigMap)
? ? Name:? ? ? spark-drv-41ccd77dc7ecfe59-conf-map
? ? Optional:? false
? kube-api-access-vktpv:
? ? Type:? ? ? ? ? ? ? ? ? ? Projected (a volume that contains injected data from multiple sources)
? ? TokenExpirationSeconds:? 3607
? ? ConfigMapName:? ? ? ? ? ?kube-root-ca.crt
? ? ConfigMapOptional:? ? ? ?<nil>
? ? DownwardAPI:? ? ? ? ? ? ?true
QoS Class:? ? ? ? ? ? ? ? ? ?Burstable
Node-Selectors:? ? ? ? ? ? ? <none>
Tolerations:? ? ? ? ? ? ? ? ?node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
? Type? ? ?Reason? ? ? ?Age? ? From? ? ? ? ? ? ? ?Message
? ----? ? ?------? ? ? ?----? ?----? ? ? ? ? ? ? ?-------
? Normal? ?Scheduled? ? 2m12s? default-scheduler? Successfully assigned spark/sparkbq-18871d7dc7ecfa41-driver to gke-spark-on-gke-default-pool-75faa226-6327
? Warning? FailedMount? 2m13s? kubelet? ? ? ? ? ? MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-41ccd77dc7ecfe59-conf-map" not found
? Normal? ?Pulling? ? ? 2m11s? kubelet? ? ? ? ? ? Pulling image "eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages"
? Normal? ?Pulled? ? ? ?102s? ?kubelet? ? ? ? ? ? Successfully pulled image "eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages" in 29.475233796s
? Normal? ?Created? ? ? 93s? ? kubelet? ? ? ? ? ? Created container spark-kubernetes-driver
? Normal? ?Started? ? ? 93s? ? kubelet? ? ? ? ? ? Started container spark-kubernetes-driver
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -z x ']'
+ export PYSPARK_PYTHON
+ '[' -z x ']'
+ export PYSPARK_DRIVER_PYTHON
+ '[' -n '' ']'
+ '[' -z ']'
+ '[' -z x ']'
+ SPARK_CLASSPATH='/opt/spark/conf::/opt/spark/jars/*'
+ case "$1" in
+ shift 1
+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.64.1.5 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner gs://<PROJECT_ID>-spark-on-k8s/codes/RandomDataBigQuery.py
21/12/17 10:26:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/12/17 10:26:24 INFO SparkContext: Running Spark version 3.1.1
21/12/17 10:26:24 INFO ResourceUtils: ==============================================================
21/12/17 10:26:24 INFO ResourceUtils: No custom resources configured for spark.driver.
21/12/17 10:26:24 INFO ResourceUtils: ==============================================================
21/12/17 10:26:24 INFO SparkContext: Submitted application: RandomDataBigQuery
21/12/17 10:26:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 3, script: , vendor: , memory -> name: memory, amount: 8192, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/12/17 10:26:24 INFO ResourceProfile: Limiting resource is cpus at 3 tasks per executor
21/12/17 10:26:24 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/12/17 10:26:24 INFO SecurityManager: Changing view acls to: 185,hduser
21/12/17 10:26:24 INFO SecurityManager: Changing modify acls to: 185,hduser
21/12/17 10:26:24 INFO SecurityManager: Changing view acls groups to:
21/12/17 10:26:24 INFO SecurityManager: Changing modify acls groups to:
21/12/17 10:26:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users? with view permissions: Set(185, hduser); groups with view permissions: Set(); users? with modify permissions: Set(185, hduser); groups with modify permissions: Set()
21/12/17 10:26:24 INFO Utils: Successfully started service 'sparkDriver' on port 7078.
21/12/17 10:26:24 INFO SparkEnv: Registering MapOutputTracker
21/12/17 10:26:24 INFO SparkEnv: Registering BlockManagerMaster
21/12/17 10:26:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/12/17 10:26:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/12/17 10:26:24 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/12/17 10:26:25 INFO DiskBlockManager: Created local directory at /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e/blockmgr-9189136d-85f6-4921-b647-1022783980df
21/12/17 10:26:25 INFO MemoryStore: MemoryStore started with capacity 4.1 GiB
21/12/17 10:26:25 INFO SparkEnv: Registering OutputCommitCoordinator
21/12/17 10:26:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/12/17 10:26:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at https://sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:4040
21/12/17 10:26:25 INFO SparkContext: Added file file:/tmp/spark-0d374b2f-af80-4bfc-bc98-453967dc64c0/spark_on_gke.zip at spark://sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7078/files/spark_on_gke.zip with timestamp 1639736784143
21/12/17 10:26:25 INFO Utils: Copying /tmp/spark-0d374b2f-af80-4bfc-bc98-453967dc64c0/spark_on_gke.zip to /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e/spark-f7c35f69-2321-4cae-b581-13e7484e75a6/userFiles-e1dd9b3f-7688-4ae6-9775-7d7eb34add54/spark_on_gke.zip
21/12/17 10:26:25 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
21/12/17 10:26:27 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/12/17 10:26:27 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 0.
21/12/17 10:26:27 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
21/12/17 10:26:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
21/12/17 10:26:27 INFO NettyBlockTransferService: Server created on sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7079
21/12/17 10:26:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/12/17 10:26:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManagerMasterEndpoint: Registering block manager sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7079 with 4.1 GiB RAM, BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/12/17 10:26:27 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
21/12/17 10:26:27 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
21/12/17 10:26:57 INFO KubernetesClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000000000(ns)
21/12/17 10:26:57 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/opt/spark/work-dir/spark-warehouse').
21/12/17 10:26:57 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
Started at
17/12/2021 10:27:02.02
10
starting at ID =? 1321 ,ending on =? 1331
?writing to BigQuery table test.randomData
?Populated BigQuery table test.randomData
?rows written is? 10
?Reading from BigQuery table test.randomData
?rows read in is? 1330
+---+
|ID |
+---+
|2? |
|1? |
|3? |
|4? |
|8? |
|7? |
|9? |
|5? |
|6? |
|10 |
|16 |
|17 |
|11 |
|12 |
|18 |
|19 |
|20 |
|13 |
|14 |
|15 |
+---+
only showing top 20 rows
Rows tally in BigQuery table
Finished at
17/12/2021 10:27:46.46
Elapsed time in seconds is 82.47383785247803
pod "sparkbq-18871d7dc7ecfa41-driver" deleted
Pod creation stages
The pod creation starts with the driver container being created. Once the driver container is created, the scheduler through kube apiserver starts scheduling the executors as seen below and they are created in batches of three with one second in between. Finally the driver and two executors here are created and running.
hduser@ctpvm: /tmp> k get pods -n spark
NAME? ? ? ? ? ? ? ? ? ? ? ? ? ? ? READY? ?STATUS? ? ? ? ? ? ? RESTARTS? ?AGE
--> creating the driver
sparkbq-6d33537dc950a8ae-driver? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 12s
sparkbq-6d33537dc950a8ae-driver? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 43s
--> driver running
sparkbq-6d33537dc950a8ae-driver? ?1/1? ? ?Running? ?0? ? ? ? ? 45s
--> driver requests apiserver to schedule two pods for executors as below
randomdatabigquery-728f397dc9517c01-exec-1? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 0s
randomdatabigquery-728f397dc9517c01-exec-2? ?0/1? ? ?Pending? ? ? ? ? ? ?0? ? ? ? ? 0s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ? ? ? ? ? ?0? ? ? ? ? 55s
--> All executor pods are created and running
randomdatabigquery-728f397dc9517c01-exec-1? ?1/1? ? ?Running? ?0? ? ? ? ? 32s
randomdatabigquery-728f397dc9517c01-exec-2? ?1/1? ? ?Running? ?0? ? ? ? ? 32s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ?0? ? ? ? ? 87s
randomdatabigquery-728f397dc9517c01-exec-1? ?1/1? ? ?Running? ?0? ? ? ? ? 81s
randomdatabigquery-728f397dc9517c01-exec-2? ?1/1? ? ?Running? ?0? ? ? ? ? 81s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ?0? ? ? ? ? 2m16s
--> Job is completed and pods are terminating
randomdatabigquery-728f397dc9517c01-exec-1? ?0/1? ? ?Terminating? ?0? ? ? ? ? 82s
randomdatabigquery-728f397dc9517c01-exec-2? ?0/1? ? ?Terminating? ?0? ? ? ? ? 82s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ? ? ?0? ? ? ? ? 2m17s
Eventually the Spark job completes and the 10 rows are posted to BigQuery table.
Accessing Driver UI
The Spark UI can be accessed locally using?kubectl port-forward.
DRIVER_POD_NAME=`kubectl get pods -n spark |grep driver|awk '{print $1}'`
kubectl port-forward $DRIVER_POD_NAME 4040:4040 -n spark
Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 -> 4040
The Spark driver UI can be accessed on?https://localhost:4040. Example, the executors are shown below:
Debugging
To get some basic information about the scheduling decisions made around the driver pod, you can run:
NAMESPACE="spark"
DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
kubectl describe pod $DRIVER_POD_NAME -n $NAMESPACE
kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
?`
This is also shown in gke.sh https://github.com/michTalebzadeh/spark_on_gke/blob/main/deployment/src/scripts/gke.sh
Scaling down the number of nodes in your cluster to zero?
You can temporarily scale the number of nodes in your cluster down to zero by running the following command to save the cluster cost
gcloud container clusters resize spark-on-gke --num-nodes=0 --zone $ZONE --quiet
You can scale it back to the required number of nodes as below:
gcloud container clusters resize spark-on-gke --num-nodes=$NODES --zone europe-west2-c --quiet
Where $NODES could be 3 or any other number
Sizing the driver & executor cores and memory in Kubernetes cluster
Assuming that you have a three node Kubernetes cluster in Google cloud with E2 standard?machines that have?4?GB of system memory per VCPU giving 4 VPCU and 16,384MB of RAM.
An optimum sizing of the number of executors, CPU and memory allocation is important here. These are the assumptions:
?I did some tests on this with leaving?1 VCPU out of 4 VCPUS to the OS on each node (Container & executor). The RAM allocated to each node was 16GB. I then set the initial container AND executor (memory starting from 10% of RAM) and incremented these in steps of 10% from 10% to 50% and measured the time taken for the code to finish (from start to finish). Basically a simple Python timing in the code
start_time = time.time()
end_time = time.time()
time_elapsed = (end_time - start_time)
Which measured the completion time in seconds. The systematics were kept the same for all measurements and only one measurement taken at each memory setting i.e.?
--conf spark.driver.memory= <Memory in MB> \
--conf spark.executor.memory= <Memory in MB> \
Memories were set the same for both the container and executors.
The result I got were as follows:
So it appears that allocating 50-60% of RAM to both the driver and executors, provides an optimum value. Increasing the memory above 50% (say @60% = 9830MB) will result in the container never been created (stuck at pending), assuming it is trying to grab the memory as shown below:
kubectl describe pod sparkbq-b506ac7dc521b667-driver -n spark
?Events:
? Type? ? ?Reason? ? ? ? ? ? ?Age? ? ? ? ? ? ? ? ? ?From? ? ? ? ? ? ? ? Message
? ----? ? ?------? ? ? ? ? ? ?----? ? ? ? ? ? ? ? ? ----? ? ? ? ? ? ? ? -------
? Warning? FailedScheduling? ?17m? ? ? ? ? ? ? ? ? ?default-scheduler? ?0/3 nodes are available: 3 Insufficient memory.
? Warning? FailedScheduling? ?17m? ? ? ? ? ? ? ? ? ?default-scheduler? ?0/3 nodes are available: 3 Insufficient memory.
? Normal? ?NotTriggerScaleUp? 2m28s (x92 over 17m)? cluster-autoscaler? pod didn't trigger scale-up:
Conclusion
In this article I provided typical steps on how to process your data using the container orchestration system Kubernetes. As some of you may be aware I have preciously used Spark on Infrastructure as a Service on Google Dataproc. For example see this article https://www.dhirubhai.net/pulse/end-solution-using-google-dataproc-spark-sql-bigquery-mich/.
Touching on unique benefits orchestrating Spark Jobs on Kubernetes compared to other cluster managers like YARN:
Currently, the default scheduler of Kubernetes only supports pod-based scheduling, it cannot provide job-based scheduling for Spark applications. Thus, it is work in progress so to speak. Having said that, Kubernetes can offer more as it natively supports running other schedulers. For example, through support for Volcano, see Spark-36057 https://github.com/volcano-sh/volcano/issues/1704. No doubt, you will hear more about Spark on Kubernetes in the future releases of Apache Spark.
Disclaimer:?Great care has been taken to make sure that the technical information presented in this?article is accurate, but any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on its content?is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Full Stack | Big Data | Cloud Native | Architect
1 年Hi, Mich, How do you handle this error ? Warning? FailedMount? 2m13s? kubelet? ? ? ? ? ? MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-41ccd77dc7ecfe59-conf-map" not found or just ignore it. Thanks.
I recently published an addition to this article titled "Running Google Dataproc on Google Kubernetes Engine (GKE) with Spark" https://www.dhirubhai.net/pulse/running-google-dataproc-kubernetes-engine-gke-spark-mich/?trackingId=8uuJ%2BuIuTgqTJAy2%2ByrT9Q%3D%3D You ,may find it helpful.
I have also created multiple docker files for Spark that are now available for download from the the docker hub: https://hub.docker.com/repository/docker/michtalebzadeh/spark_dockerfiles/tags?page=1&ordering=last_updated
Lead Data Software Engineer at Booking.com
3 年Great article. Check Argo for orchestration of spark jobs in k8s environment.
OpenAi Langchain Pinecone - OLP | Tooling and Observability |WEB3 | Cloud Native | 2x GCP Certification,3x K8S Certification),AWS | Cloud Native | Big Data Architect | Reactive API | SecDevOps | gRPC | AI/ML | FP
3 年Shouldn’t be there a dedicated CI and a dedicated CD pipeline for it ? Also there should be a way to make it cloud native or more cloud agnostic.