Spark on Kubernetes, A Practitioner’s Guide

Spark on Kubernetes, A Practitioner’s Guide

Introduction

This article discusses the use case for Kubernetes AKA k8s for the deployment of Spark, which is an open-source distributed computing framework. One can utilize code developed in Python, Scala, SQL and others to create applications that can process large amount of data. Spark uses parallel processing to distribute the work-load across a cluster of physical hosts, for example, nodes of Kubernetes cluster.

In this respect Spark has the following cluster managers where the cluster manager is responsible for managing and allocating resources for the cluster of nodes on which the Spark application runs

  • Standalone: A cluster-manager, limited in features, shipped with Spark.
  • Apache Hadoop YARN (Yet Another Resource Manager): is the most widely used resource manager not just for Spark but for other artefacts as well. On-premise YARN is used extensively. In Cloud it is also used widely in Infrastructure as a Service such as Google Dataproc.
  • Kubernetes: Spark runs natively on Kubernetes since version Spark 2.3. This deployment mode is popularized as stated by the adoption of containerization, gaining momentum. It is basically the new kid in the block
  • Apache Mesos: An open source cluster-manager which was once popular but now in decline.

I will explain the practical implementation of Spark on Kubernetes (Spark-on-k8s) and will leverage Google Kubernetes Engine (GKE) (Google was first to introduce and then open source Kubernetes) and show steps in implementing Spark on GKE cluster-manager by creating a simple yet effective example that uses PySpark to generate random test data in Spark, take that data and post it to Google BigQuery Data Warehouse table.

GitHub reference for the stuff covered in this article

https://github.com/michTalebzadeh/spark_on_gke

Use Case for Kubernetes

With modern services, users expect applications to be available 24/7, and developers expect to deploy new versions of those applications several times a day through CI/CD pipelines. With these in mind, the focus has been switched to Containerization and means to deploy these containers through Kubernetes clusters. In short Kubernetes is a proper cloud-agnostic solution leader in Docker management.

What is a container

A container https://www.docker.com/resources/what-container is a standard unit of software that packages up code and all its dependencies, so the application runs quickly and reliably from one computing environment to another. A Docker container image is a lightweight, standalone, executable package of software that includes everything needed to run an application: code, runtime, system tools, system libraries and settings. Container images become containers at runtime and in the case of Docker containers, images become containers when they run on docker engines. Containerized software will always run the same, regardless of the infrastructure. Containers isolate software from its environment and ensure that software works uniformly despite differences, for instance between development and staging.

What is the difference between container and docker container image

A container and a Docker container image are related concepts within the realm of containerization, but they refer to different aspects of the technology. Here's a breakdown of their differences:

Container:

  • A container is a lightweight, standalone, and executable software package that contains everything needed to run a piece of software, including the code, runtime, system tools, system libraries, and settings.
  • Containers isolate applications from the underlying host system and from other containers, ensuring consistent behavior regardless of the environment.
  • Containers share the host operating system's kernel but have their own file system, processes, and network interfaces.
  • Containers offer a portable and consistent environment across different systems, making it easier to develop, test, and deploy applications.
  • Containers are managed and orchestrated using container runtimes and orchestration platforms like Docker, Kubernetes, and others.

Docker Container Image:

  • A Docker container image is a lightweight, stand-alone, executable software package that contains an application and its dependencies.
  • Images are created using a layered filesystem and a set of instructions defined in a Dockerfile. These instructions include adding files, installing software, setting environment variables, and more.
  • Docker images are read-only and can be versioned, allowing for reproducible deployments and easy rollbacks.
  • Images can be stored in Docker registries, which are repositories for sharing and distributing container images.
  • To run a Docker container, an instance of an image is created. This instance is a runnable environment that encapsulates the application and its runtime, isolated from the host and other containers.
  • Docker images provide a portable and consistent way to package and distribute applications.

In summary, a container is the runtime instance of a software package that encapsulates an application and its dependencies, while a Docker container image is a snapshot of an application and its dependencies, which can be instantiated to create running containers. Docker is a widely used containerization platform that allows users to create, manage, and share container images.

The difference between container and microservice

A container is a useful resource allocation and sharing technology. In contrast, a microservice is a software design pattern. So in short:

  • Microservices are about the design of software. --> developers
  • Containers are about packaging software for deployment. --> DevOps

Note that microservices run within containers.

Microservices operate according to a principle known as bounded context, which is the ability to position a component and its data as a standalone unit or one with limited dependencies. In other words, services are decoupled and function independently. As such, if one web service fails within an application, other services associated with the app will continue to operate as usual.

What is a Kubernetes

?As stated before, Kubernetes https://kubernetes.io/, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications. ?The purpose of Kubernetes in this case is to host your Spark applications in the form of containers in an automated fashion so that you can easily deploy as many instances of your application as required and easily enable communication between different services within your application.

It should be noted that Spark on Kubernetes is work in progress and as of now there are future work outstanding. In this article we would like to leverage on what is on offer currently with Spark on Kubernetes.

What is a pod

Pods https://cloud.google.com/kubernetes-engine/docs/concepts/pod#what_is_a_pod are the smallest, most basic deployable objects in Kubernetes. A Pod represents a single instance of a running process in the Kubernetes cluster. Pods can contain one or more?containers, such as Docker containers.

What is a Docker and a Docker image

Docker https://www.docker.com/ is an open source containerization platform. It?enables developers to package applications into containers. Specifically, Docker?is a set of?platform as a service?(PaaS) products that use?OS-level virtualization?to deliver software in packages called containers.

A Docker image is a file created to execute code in a Docker container. A Docker image contains application code, libraries, tools, dependencies, and other files needed to make an application run and can have considerable size. It is the heart of what is deployed in the Kubernetes Cluster.

The Docker image role

Kubernetes requires users to supply images that can be deployed into containers within pods.?You will need to install Docker Community Edition (CE) or similar.https://docs.docker.com/engine/install/ubuntu/ on your host that will be creating docker images. Docker images will be used for Spark Driver and Spark Executors. This will be covered further down.

?Building and preparing a docker image

?Before building this file, consideration should be given to the following:

?1.??????The version of Spark

2.??????The Scala version

3.??????The Java version

4.??????Packages to be deployed for PySpark

Choosing the version of Spark is important as docker file built should be compatible with the other versions of application to work with. For example, Spark version should be compatible with the database, say if you are going to write to Google BigQuery Data Warehouse https://cloud.google.com/bigquery, what version of Spark will be compatible there? Additionally, does it support Java 8 or Java 11. One can of course build multiple docker files with different versions as needed and, in my opinion, it is the strength of dockerfile that allows multiple versions built.

With Spark, you can build dockerfile for Scala, PySpark and R. In this article, we will not consider dockerfiles for R.?

Spark binaries provide a shell script, docker-image-tool.sh script that can be used to build and publish the Docker images to use with the Kubernetes backend. The shell script is located in $SPARK_HOME/bin

docker-image-tool.sh takes a number of parameters. You can get a list of them by typing $SPARK_HOME/bin/docker-image-tool.sh - h

No alt text provided for this image

The parameters of interest are -r, -t, -b and -p.

As stated above, -r specifies the repo, -t is the tag added, -b is build argument to build or push the image, -p specifies the dockerfile to build the image from.

You can create a shell script for different versions of dockerfile. Make sure that you are invoking this script from a correct Spark version. For example, in the script below, The SPARK_VERSION is set to 3.1.1. That should be the binary you are executing i.e. your current Spark version should be 3.1.1

Here, we are using the standard supplied Dockerfile with Spark binaries

BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="Dockerfile"
DOCKERIMAGETAG="11-jre-slim"


# Building Docker image from provided Dockerfile base 11
cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/${DOCKERFILE} \
? ? ? ? ? ? ? ?build        

Once the docker image is built it will show as below

docker images        
No alt text provided for this image

Creating a custom made dockerfile

A custom made dockerfile is often necessary to add additional libraries and be able to create the dockerfile with different versions say Java.

The following dockerfile will create a docker image with Java8 as opposed to Java11. Note the line

ARG java_image_tag=8-jre-slim        


ARG java_image_tag=8-jre-slim


FROM openjdk:${java_image_tag}


ARG spark_uid=185


# Before building the docker image, first build and make a Spark distribution following
# the instructions in https://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .


RUN set -ex && \
? ? sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
? ? apt-get update && \
? ? ln -s /lib /lib64 && \
? ? apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \
? ? mkdir -p /opt/spark && \
? ? mkdir -p /opt/spark/examples && \
? ? mkdir -p /opt/spark/work-dir && \
? ? touch /opt/spark/RELEASE && \
? ? rm /bin/sh && \
? ? ln -sv /bin/bash /bin/sh && \
? ? echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
? ? chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
? ? rm -rf /var/cache/apt/*


COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data


ENV SPARK_HOME /opt/spark


WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh


ENTRYPOINT [ "/opt/entrypoint.sh" ]


# Specify the User that the actual main process will run as
USER ${spark_uid}        


docker images
REPOSITORY? ? TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? SIZE
spark/spark? ?3.1.1-scala_2.12-8-jre-slim-buster? ?b9ed343e1d6d? ?10 hours ago? ?599MB
openjdk? ? ? ?8-jre-slim? ? ? ? ? ? ? ? ? ? ? ? ? ?c463ae0fa93d? ?2 weeks ago? ? 194MB        

In this case we will be looking at creating a dockerfile for PySpark.

A sample dockerfile for PySpark called Dockerfile is supplied under directory

$SPARK_HOME/kubernetes/dockerfiles/spark/bindings/python

You can create a custom dockefile for PySpark with java 8 as below:

ARG base_img


FROM $base_img
WORKDIR /


# Reset to root to run installation tasks
USER 0


RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
? ? apt install -y python3 python3-pip && \
? ? pip3 install --upgrade pip setuptools && \
? ? # Removed the .cache to save space
? ? rm -r /root/.cache && rm -rf /var/cache/apt/*


RUN mkdir ${SPARK_HOME}/conf
COPY spark-defaults.conf ${SPARK_HOME}/conf/spark-defaults.conf




COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib


ARG spark_uid=185


WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]


# Specify the User that the actual main process will run as
USER ${spark_uid}        

How to use this dockerfile

BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="java8only"
DOCKERIMAGETAG="8-jre-slim"


cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/bindings/python/${DOCKERFILE} \
? ? ? ? ? ? ? ?build
        

It results in the following image created:

REPOSITORY? ? ? ?TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? ? ? ?SIZ
spark/spark-py? ?3.1.1-scala_2.12-8-jre-slim-buster? ?ebd7beca1ba0? ?About an hour ago? ?951MB        

Verifying the image built

You can also log in as root to the image itself and verifying the Java version

docker run -u 0 -it ebd7beca1ba0 bash
root@28dfb9adc1d1:/opt/spark/work-dir# echo $JAVA_HOME
/usr/local/openjdk-8        

Getting list of packages installed

pip list
Package? ? Version
---------- -------
pip? ? ? ? 21.3.1
setuptools 59.4.0
wheel? ? ? 0.34.2        

if you log in as the default user, you can get a list of environment variables

docker run -it ebd7beca1ba0 bash

185@338a3c6239b4:/opt/spark/work-dir$ env
HOSTNAME=338a3c6239b4
JAVA_HOME=/usr/local/openjdk-8
PWD=/opt/spark/work-dir
HOME=/
LANG=C.UTF-8
TERM=xterm
SHLVL=1
SPARK_HOME=/opt/spark
PATH=/usr/local/openjdk-8/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
JAVA_VERSION=8u312
_=/usr/bin/env        

Adding additional Python packages to the Docker image

The most convenient way to add additional packages to the docker image is to add them directly to the docker image at time of creating the image. So external?packages?are bundled as a part of my docker image because it is fixed and if an application requires those set of dependencies every time, they are there.

RUN pip install pyyaml numpy cx_Oracle --no-cache-dir        

The --no-cheche-dir option to pip is to prevent the downloaded?binaries from being added to the image, reducing the image size. It is also advisable to install all packages in one line. Every time you put RUN statement it creates an intermediate container and hence it increases build time. So reduce it by putting all packages in one RUN line.

?We will create a custom dockerfile called java8PlusPackages in Python directory

$SPARK_HOME/kubernetes/dockerfiles/spark/bindings/python

The content of this file is shown below:

?cat java8PlusPackages

ARG base_img


FROM $base_img
WORKDIR /


# Reset to root to run installation tasks
USER 0


RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
? ? apt install -y python3 python3-pip && \
? ? pip3 install --upgrade pip setuptools && \
? ? # Removed the .cache to save space
? ? rm -r /root/.cache && rm -rf /var/cache/apt/*


RUN mkdir ${SPARK_HOME}/conf
COPY spark-defaults.conf ${SPARK_HOME}/conf/spark-defaults.conf


RUN pip install pyyaml numpy cx_Oracle --no-cache-dir


COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib


ARG spark_uid=185


WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]


# Specify the User that the actual main process will run as
USER ${spark_uid}        

The shell script used with this dockerfile is shown below:

cat java8PlusPackages.sh
#!/bin/bash


BASE_OS="buster"
SPARK_VERSION="3.1.1"
SCALA_VERSION="scala_2.12"
DOCKERFILE="java8PlusPackages"
DOCKERIMAGETAG="8-jre-slim"


cd $SPARK_HOME
./bin/docker-image-tool.sh \
? ? ? ? ? ? ? -r spark \
? ? ? ? ? ? ? -t ${SPARK_VERSION}-${SCALA_VERSION}-${DOCKERIMAGETAG}-${BASE_OS}_${DOCKERFILE} \
? ? ? ? ? ? ? -b java_image_tag=${DOCKERIMAGETAG} \
? ? ? ? ? ? ? -p ./kubernetes/dockerfiles/spark/bindings/python/${DOCKERFILE} \
? ? ? ? ? ? ? ?build
        

The images are created as below:

docker images
REPOSITORY? ? ? ?TAG? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IMAGE ID? ? ? ?CREATED? ? ? ? ? ? ?SIZE
spark/spark-py? ?3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages? ?f9ddb4b305ef? ?3 minutes ago? ? ? ?1.02GB
spark/spark? ? ? 3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages? ?a8bf56405021? ?About an hour ago? ?599MB
openjdk? ? ? ? ? 8-jre-slim? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?c463ae0fa93d? ?2 weeks ago? ? ? ? ?194MB        

Log in to the docker image and check for Python packages installed

docker run -u 0 -it spark/spark-py:3.1.1-scala_2.12-8-jre-slim-buster_java8PlusPackages bash
root@5bc049af7278:/opt/spark/work-dir# pip list
Package? ? Version
---------- -------
cx-Oracle? 8.3.0
numpy? ? ? 1.21.4
pip? ? ? ? 21.3.1
PyYAML? ? ?6.0
setuptools 59.4.0
wheel? ? ? 0.34.2
        

Removing unwanted docker images

You can force (-f) a docker image removal by using the following command with IMAGE ID from docker images output:

docker image rm 0f339c43df8f -f        

Understanding the Kubernetes Cluster

At the infrastructure level, a Kubernetes cluster (in this case GKE) is comprised of a set of physical or virtual machines, each acting in a specific role. The following diagram depicts this

No alt text provided for this image

The?Control Plane controls all the operations and is charged with orchestrating containers that run on all of the?nodes of the cluster. Each node is equipped with a container runtime. The node receives instruction from the Control Plane and then takes actions to either create pods, delete them, or adjust networking rules. Inside the Control Plane, the following components are installed:

  • kube-apiserver?– the main component, exposing APIs for the other Control components.
  • etcd?– A distributed key/value store which Kubernetes uses for persistent storage of all cluster information.
  • scheduler?– uses information in the pod specification to decide on which node to run a pod.
  • cloud-controller-manager?– daemon acting like an abstraction layer between the APIs and the different cloud providers’ tools (storage volumes, load balancers etc.)
  • kube controller manager?– responsible for node management (detecting if a node fails), pod replication, and endpoint creation.

  1. Nodes are servers in Kubernetes and are managed by the Control Plane. A node may be a virtual machine (VM) or physical machine. Each node contains the necessary components to run pods.

Kubernetes Node Components in Detail

To summarize, the node runs the two most important components, the?kubelet?and the?kube-proxy, as well as a container engine in charge of running the containerized applications.

No alt text provided for this image

kubelet

The?kubelet?agent handles all communication between the master and the node on which it is running. It receives commands from the master in the form of a?manifest?which defines the workload and the operating parameters. It interfaces with the container runtime that is responsible for creating, starting, and monitoring pods.

The?kubelet?also periodically executes any configured liveness probes and readiness checks. It constantly monitors the state of the pods and, in the event of a problem, launches a new instance instead. The?kubelet?also has an internal HTTP server exposing a read-only view at port 10255. There’s a health check endpoint at?/healthz?and also a few additional status endpoints. For example, we can get a list of running pods at?/pods. We can also get specs of the machine the?kubelet?is running on at?/spec.

cAdvisor

cAdvisor (Container Advisor) is an open-source agent that monitors resource usage and analyzes the performance of containers. Originally created by Google, cAdvisor is now integrated with?kubelet.

The?cAdvisor?instance on each node collects, aggregates, processes, and exports metrics such as CPU, memory, file, and network usage for all running containers. All data is sent to the scheduler to ensure that it knows about the performance and resource usage inside of the node. This information is used to perform various orchestration tasks like scheduling, horizontal pod scaling, and managing container resource limits.

kube-proxy

The?kube-proxy?component runs on each node and proxies UDP, TCP, and SCTP packets. It maintains the network rules on the host and handles transmission of packets between pods, the host, and the outside world. It acts like a network proxy and load balancer for pods.

The?kube-proxy?process stands in between the network Kubernetes is attached to and the pods that are running on that particular node. It is essentially the core networking component of Kubernetes and is responsible for ensuring that communication is maintained efficiently across all elements of the cluster. When a user creates a Kubernetes service object, the?kube-proxy?instance is responsible to translate that object into meaningful rules in the local?iptables?rule set on the worker node.?iptables?is used to translate the virtual IP assigned to the service object to all of the pod IPs mapped by the service.

Container Runtime

The container runtime is responsible for pulling the images (docker images) from public or private registries (in this case Google Container Registry) and running containers based on those images. As previously mentioned,?kubelet?interacts directly with container runtime to start, stop, or delete containers.

Handling the Spark driver and executors creation

  1. As labelled in the diagram above, processes 1-3 will be dealing with the request for Kube?apiserver to schedule the creation of the driver pod and at the completion, process 4 originating from the driver will request kube apiserver to schedule creation of the executors.
  2. ?Depending on the number of executors requested through spark.executor.instances property, the driver will ask for these in chunks i.e. it will ask for N executors, wait briefly and then ask for another N executors.?Where N is a controllable batch size (default 5) managed via the spark.kubernetes.allocation.batch.size property and the delay between batches of executors (default 1s) controlled by the spark.kubernetes.allocation.batch.delay property.?This batch based allocation is done to avoid overwhelming the Kube apiserver and effectively overloading the cluster by asking for all the pods at once.
  3. Processes 5 and 6 will be dealing with creating executors through the scheduler.

Preparing to use Spark with Kubernetes in Google Cloud Platform

According to Spark doc, Spark can run on clusters managed by?Kubernetes. This feature makes use of native Kubernetes scheduler that has been added to Spark. Spark on Kubernetes has been supported since Spark 2.3.

Pre-requisites

Next, you will need to install Kubernetes cluster components. You will broadly need the following

  • A Google Cloud Platform (GCP) account
  • A host where Spark-submit will be running and will act as the compute server. You can use a VM in a public cloud. A e2-standard-2 (2 vCPUs, 8 GB memory) with 100GB disk space will do. To be able to access this VM using Putty or MobaXterm, I added an external static IP address to this host.
  • On the same host you, will need to download and install Spark binaries. If you intend to use Google BigQuery as the Data Warehouse, choose Spark 3.1.1 for now.
  • Install Google Cloud SDK https://cloud.google.com/sdk/docs/install. Verify this with gcloud init and gcloud auth login
  • Install Docker CE https://docs.docker.com/engine/install/ubuntu/. If you want to run Docker as a non-root user, follow these instructions in https://www.thegeekdiary.com/run-docker-as-a-non-root-user/.
  • Install kubectl https://kubernetes.io/docs/tasks/tools/
  • Create Google service account https://cloud.google.com/iam/docs/service-accounts with adequate permissions to:

  1. read and write to cloud storage, see IAM roles https://cloud.google.com/iam/docs/understanding-roles for cloud Storage
  2. Kubernetes Engine Admin see Create IAM policies? https://cloud.google.com/iam/docs/policies
  3. BigQuery Job User. See BigQuery predefined IAM role
  4. Any other role as needed

  • ?A Kubernetes cluster running on Google Kubernetes Engine https://cloud.google.com/kubernetes-engine. At its simplest form you can create a cluster using the following note. This will create a three node Kubernetes cluster on GKE called spark-demo-gke for project my_project in zone europe-west2-c, e2-medium 2 CPUs with 4GB of RAM and 100GB of disk space each. It is far easier if you create on the console itself following the above link.

gcloud beta container --project "<PROJECT_ID>" clusters create "spark-demo-gke" --zone "europe-west2-c" --no-enable-basic-auth --cluster-version "1.21.5-gke.1302" --release-channel "regular" --machine-type "e2-medium" --image-type "COS_CONTAINERD" --disk-type "pd-standard" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --max-pods-per-node "110" --num-nodes "3" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/<PROJECT_ID>/global/networks/default" --subnetwork "projects/<PROJECT_ID>/regions/europe-west2/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-shielded-nodes --node-locations "europe-west2-
WARNING: The Pod address range limits the maximum size of the cluster. Please refer to https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr to learn how to optimize IP address allocation.
Creating cluster spark-demo-gke in europe-west2-c...done.
Created
?
[https://container.googleapis.com/v1beta1/projects/<PROJECT_ID>/zones/europe-west2-c/clusters/spark-demo-gke].
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/europe-west2-c/spark-demo-gke?project=<PROJECT_ID>
kubeconfig entry generated for spark-demo-gke.
NAME? ? ? ? ? ? LOCATION? ? ? ? MASTER_VERSION? ?MASTER_IP? ? MACHINE_TYPE? NODE_VERSION? ? ?NUM_NODES? STATUS
spark-demo-gke? europe-west2-c? 1.21.5-gke.1302? 35.246.3.22? e2-medium? ? ?1.21.5-gke.1302? 3? ? ? ? ? RUNNING        

In our case we chose for Kubernetes nodes, E2 standard?machines that have?4?GB of system memory per VCPU giving 4 VPCU and 16384MB of RAM on each node of the cluster.

Dependency Management

?Copying your docker images to Google container Registry

In order t be able to use your docker image in GKE, you will need to follow the documentation on how to create and push your docker image to Google Container Registry https://cloud.google.com/container-registry/docs/pushing-and-pulling.

docker push eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages        

Once you have done so you can get the images created as below:

gcloud container images list-tags eu.gcr.io/<PROJECT_ID>/spark-pyDIGEST? ? ? ? TAGS? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TIMESTAM680242a8def4? 3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages? 2021-12-17T09:32:13P        

Making the project and py file available through spark-submit

Some of this for PySpark is discussed in https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html .

For your project you ill need to create a zip file at the root directory of your project as below:

Assuming that the root directory of your project is called spark_on_gke and it resides under $CODE_DIRECTORY, create the zip file of the project as shown below. You also need to copy your python application file ${APPLICATION} to the same location.

source_code="spark_on_gke"
CURRENT_DIRECTORY=`pwd`
CODE_DIRECTORY="/home/hduser/dba/bin/python/"
CODE_DIRECTORY_CLOUD=" gs://<YOUR_BUCKET>/codes/"
cd $CODE_DIRECTORY
[ -f ${source_code}.zip ] && rm -r -f ${source_code}.zip
echo `date` ", ===> creating source zip directory from? ${source_code}"
# zip needs to be done at root directory of code
zip -rq ${source_code}.zip ${source_code}
gsutil cp ${source_code}.zip $CODE_DIRECTORY_CLOUD
gsutil cp /home/hduser/dba/bin/python/${source_code}/src/${APPLICATION} $CODE_DIRECTORY_CLOUD        

Getting the correct credentials

You need to get authentication under the account on VM host for Spark to work with Kubernetes, read and write to storage bucket and be able to write to Google BigQuery database. In the following example, my GKE cluster is called spark-on-gke

Setup the correct authority with gcloud init or gcloud auth login

gcloud config set compute/zone $ZONE. For example

gcloud config set compute/zone europe-west2-c        

Get the project ID

export PROJECT=$(gcloud info --format='value(config.project)')        

Get GKE credentials

gcloud container clusters get-credentials spark-on-gke --zone $ZONE        

Get Kubernetes Master IP

?export KUBERNETES_MASTER_IP=$(gcloud container clusters list --filter name=spark-on-gke --format='value(MASTER_IP)')        

A typical spark-submit for Kubernetes is shown below. This is run on VM host where spark binaries are installed.

? ? ? ? spark-submit --verbose \
? ? ? ? ? ?--properties-file ${property_file} \
? ? ? ? ? ?--master k8s://https://$KUBERNETES_MASTER_IP:443 \
? ? ? ? ? ?--deploy-mode cluster \
? ? ? ? ? ?--name sparkBQ \
? ? ? ? ? ?--py-files $CODE_DIRECTORY/spark_on_gke.zip \
? ? ? ? ? ?--conf spark.kubernetes.namespace=$NAMESPACE \
? ? ? ? ? ?--conf spark.network.timeout=300 \
? ? ? ? ? ?--conf spark.executor.instances=$NEXEC \
? ? ? ? ? ?--conf spark.kubernetes.allocation.batch.size=3 \
? ? ? ? ? ?--conf spark.kubernetes.allocation.batch.delay=1 \
? ? ? ? ? ?--conf spark.driver.cores=3 \
? ? ? ? ? ?--conf spark.executor.cores=3 \
? ? ? ? ? ?--conf spark.driver.memory=8192m \
? ? ? ? ? ?--conf spark.executor.memory=8192m \
? ? ? ? ? ?--conf spark.dynamicAllocation.enabled=true \
? ? ? ? ? ?--conf spark.dynamicAllocation.shuffleTracking.enabled=true \
? ? ? ? ? ?--conf spark.kubernetes.driver.container.image=${IMAGEDRIVER} \
? ? ? ? ? ?--conf spark.kubernetes.executor.container.image=${IMAGEDRIVER} \
? ? ? ? ? ?--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
? ? ? ? ? ?--conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
? ? ? ? ? ?--conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
? ? ? ? ? ?$CODE_DIRECTORY/${APPLICATION}        

  • --verbose option, print fine-grained debugging information.
  • The?spark-submit?script can load default?Spark configuration values?from a properties file --properties-file, and pass them on to your application.
  • --master k8s://https://$KUBERNETES_MASTER_IP:443 is the Kubenetes address and port. See the explanation here.
  • --deploy-mode cluster means Run on a Kubernetes cluster in cluster deploy mode.
  • --name sparkBQ is the name of this job.
  • --py-files is the location of python application zipped file. With Python support, it is expected to distribute?.egg,?.zip?and?.py?libraries to executors via the?--py-files?option
  • spark.network.timeout is the timeout for all network interactions. See here.
  • spark.executor.instances is the number of executors requested in the cluster.
  • spark.kubernetes.allocation.batch.size is Number of pods to launch at once in each round of executor pod allocation. Defaults 5
  • spark.kubernetes.allocation.batch.delay - Time to wait between each round of executor pod allocation. Specifying values less than 1 second may lead to excessive CPU usage on the spark driver.
  • spark.kubernetes.driver.limit.cores - Specifies a hard CPU limit?for the driver pod.
  • spark.driver.cores - is the driver pod CPU request.
  • spark.executor.cores - specifies the executor pod CPU request.

Explanatory notes on spark.driver.cores and spark.executor.cores.

In this model, we want to fit exactly one Spark executor pod per Kubernetes node, meaning that with VCPU = 4 for each Kubernetes node, we should as per classic resource allocation, leave one VCPU for OS itself and set both these two parameters to 3

  • spark.driver.memory - The memory allocated to the driver.
  • spark.executor.memory - The memory allocated to each executor.
  • spark.kubernetes.driver.container.image - Custom container image to use for the driver.
  • ?conf spark.kubernetes.executor.container.image Custom container image to use for executors.
  • spark.kubernetes.container.image - If you specify?both?the driver and executor images, then there is no need to set spark.kubernetes.container.image. In the absence of either you must specify spark.kubernetes.container.image. So either one specifies the driver AND executor images explicitly (as above) and excludes the container image or specifies one of the driver?OR?container images explicitly and then you have to set the container image as well for the default to work. See here.
  • spark.kubernetes.authenticate.driver.serviceAccountName This is the service account name that allows the driver to communicate with Kube apiserver. See here.
  • spark.driver.extraJavaOptions - A string of extra JVM options to pass to the driver. This is needed when arrow code is used to read from columnar database BigQuery.
  • spark.executor.extraJavaOptions - A string of extra JVM options to pass to the executors. This is needed when arrow code is used to read from columnar database BigQuery.

The PySpark code to generate random data

The PySpark code to generate random data is shown below. It is pretty self explanatory. Once the random values are generated, the underlying RDD is converted into Spark Dataframe with a predefined schema and data is saved to BigQuery database table test.randomData.

from __future__ import print_function
import sys
from spark_on_gke.src.configure import config
from pyspark.sql import functions as F
from pyspark.sql.functions import col, round, current_timestamp, lit
from spark_on_gke.sparkutils import sparkstuff as s
from spark_on_gke.othermisc import usedFunctions as uf
from pyspark.sql.types import *
import datetime
import time
def main():
? ? start_time = time.time()
? ? appName = "RandomDataBigQuery"
? ? spark_session = s.spark_session(appName)
? ? spark_session = s.setSparkConfBQ(spark_session)
? ? spark_context = s.sparkcontext()
? ? spark_context.setLogLevel("ERROR")
? ? lst = (spark_session.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()
? ? print("\nStarted at");uf.println(lst)
? ? randomdatabq = RandomData(spark_session, spark_context)
? ? dfRandom = randomdatabq.generateRamdomData()
? ? #dfRandom.printSchema()
? ? #dfRandom.show(20, False)
? ? randomdatabq.loadIntoBQTable(dfRandom)
? ? lst = (spark_session.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ")).collect()
? ? print("\nFinished at");uf.println(lst)
? ? end_time = time.time()
? ? time_elapsed = (end_time - start_time)
? ? print(f"""Elapsed time in seconds is {time_elapsed}""")
? ? spark_session.stop()


class RandomData:
? ? def __init__(self, spark_session, spark_context):
? ? ? ? self.spark = spark_session
? ? ? ? self.sc = spark_context
? ? ? ? self.config = config
? ? ? ? self.values = dict()


? ? def readDataFromBQTable(self):
? ? ? ? dataset = "test"
? ? ? ? tableName = "randomData"
? ? ? ? fullyQualifiedTableName = dataset+'.'+tableName
? ? ? ? read_df = s.loadTableFromBQ(self.spark, dataset, tableName)
? ? ? ? return read_df


? ? def getValuesFromBQTable(self):
? ? ? ? read_df = self.readDataFromBQTable()
? ? ? ? read_df.createOrReplaceTempView("tmp_view")
? ? ? ? rows = self.spark.sql("SELECT COUNT(1) FROM tmp_view").collect()[0][0]
? ? ? ? maxID = self.spark.sql("SELECT MAX(ID) FROM tmp_view").collect()[0][0]
? ? ? ? return {"rows":rows,"maxID":maxID}


? ? def generateRamdomData(self):
? ? ? ? self.values = self.getValuesFromBQTable()
? ? ? ? rows = self.values["rows"]
? ? ? ? maxID = self.values["maxID"]
? ? ? ? start = 0
? ? ? ? if (rows == 0):
? ? ? ? ? start = 1
? ? ? ? else:
? ? ? ? ? start = maxID + 1
? ? ? ? numRows = config['GCPVariables']['numRows']
? ? ? ? print(numRows)
? ? ? ? end = start + numRows
? ? ? ? print("starting at ID = ", start, ",ending on = ", end)
? ? ? ? Range = range(start, end)
? ? ? ? ## This traverses through the Range and increment "x" by one unit each time, and that x value is used in the code to generate random data through Python functions in a class
? ? ? ? rdd = self.sc.parallelize(Range). \
? ? ? ? ? ? map(lambda x: (x, uf.clustered(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.scattered(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.randomised(x, numRows), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.randomString(50), \
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.padString(x, " ", 50),
? ? ? ? ? ? ? ? ? ? ? ? ? ?uf.padSingleChar("x", 50)))


? ? ? ? Schema = StructType([ StructField("ID", IntegerType(), False),
? ? ? ? ? ? ? ? ? ? ? StructField("CLUSTERED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("SCATTERED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("RANDOMISED", FloatType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("RANDOM_STRING", StringType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("SMALL_VC", StringType(), True),
? ? ? ? ? ? ? ? ? ? ? StructField("PADDING", StringType(), True)
? ? ? ? ? ? ? ? ? ? ])
? ? ? ? df= self.spark.createDataFrame(rdd, schema = Schema)
? ? ? ? df = df. \
? ? ? ? ? ? ?withColumn("op_type", lit(config['MDVariables']['op_type'])). \
? ? ? ? ? ? ?withColumn("op_time", current_timestamp())
? ? ? ? #df.printSchema()
? ? ? ? #df.show(100,False)
? ? ? ? return df


? ? def loadIntoBQTable(self, df2):
? ? ? ? # write to BigQuery table
? ? ? ? dataset = "test"
? ? ? ? tableName = "randomData"
? ? ? ? fullyQualifiedTableName = dataset+'.'+tableName
? ? ? ? print(f"""\n writing to BigQuery table {fullyQualifiedTableName}""")
? ? ? ? s.writeTableToBQ(df2,"append",dataset,tableName)
? ? ? ? print(f"""\n Populated BigQuery table {fullyQualifiedTableName}""")
? ? ? ? print("\n rows written is ",? df2.count())
? ? ? ? print(f"""\n Reading from BigQuery table {fullyQualifiedTableName}\n""")
? ? ? ? # read data to ensure all loaded OK
? ? ? ? read_df = s.loadTableFromBQ(self.spark, dataset, tableName)
? ? ? ? new_rows = read_df.count()
? ? ? ? print("\n rows read in is ",? new_rows)
? ? ? ? read_df.select("ID").show(20,False)
? ? ? ? ## Tally the number of rows in BigQuery table with what is expected after adding new rows
? ? ? ? numRows = config['GCPVariables']['numRows']
? ? ? ? if (new_rows - self.values["rows"] - numRows)? == 0:
? ? ? ? ? print("\nRows tally in BigQuery table")
? ? ? ? else:
? ? ? ? ? print("\nRows do not tally in BigQuery table")

if __name__ == "__main__":
? main()        

Submitting the job and observing the output

The Spark job is submitted through the shell script deployment/src/scripts/gke.sh. See the GitHub link.

It is submitted by passing the mode -M and the name PySpark application -A as below

./gke.sh -M gcp -A RandomDataBigQuery.py        

First we need to copy the zipped project directory spark_on_gke.zip and the application code RandomDataBigQuery.py to cloud storage

Fri Dec 17 10:24:27 UTC 2021 , ===> creating source zip directory from? spark_on_gke
Copying file://spark_on_gke.zip [Content-Type=application/zip]...
- [1 files][? 1.9 MiB/? 1.9 MiB]
Operation completed over 1 objects/1.9 MiB.
Copying file:///home/hduser/dba/bin/python/spark_on_gke/src/RandomDataBigQuery.py [Content-Type=text/x-python]...
/ [1 files][? 4.9 KiB/? 4.9 KiB]
Operation completed over 1 objects/4.9 KiB.        

Then we need to resize the cluster with 3 nodes (this is needed as at the end we resize k8s cluster to zero nodes). From then onwards the driver container is created followed by the executor containers. We can track the progress of pod creation further down

Fri Dec 17 10:24:31 UTC 2021 , ===> Submitting spark job
Fri Dec 17 10:24:31 UTC 2021 , ===> Resizing GKE cluster with 3 nodes
Resizing spark-on-gke...done.
Updated [https://container.googleapis.com/v1/projects/<PROJECT_ID>/zones/europe-west2-c/clusters/spark-on-gke].
Updated property [compute/zone].
Fetching cluster endpoint and auth data.
kubeconfig entry generated for spark-on-gke.
Fri Dec 17 10:25:32 UTC 2021 , ===> Starting spark-submit
21/12/17 10:25:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/12/17 10:25:36 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
21/12/17 10:25:37 INFO KerberosConfDriverFeatureStep: You have not specified a krb5.conf file locally or via a ConfigMap. Make sure that you have the krb5.conf locally on the driver image.
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Pending
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: waiting
? ? ? ? ? ? ? ? ?pending reason: ContainerCreating
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: Waiting for application sparkBQ with submission ID spark:sparkbq-18871d7dc7ecfa41-driver to finish...
21/12/17 10:25:38 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Pending
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: waiting
? ? ? ? ? ? ? ? ?pending reason: ContainerCreating
21/12/17 10:25:39 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Pending)
21/12/17 10:26:17 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Pending)
21/12/17 10:26:17 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Running
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: running
? ? ? ? ? ? ? ? ?container started at: 2021-12-17T10:26:17Z
21/12/17 10:26:18 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Running)
21/12/17 10:27:47 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Running)
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: State changed, new state:
? ? ? ? ?pod name: sparkbq-18871d7dc7ecfa41-driver
? ? ? ? ?namespace: spark
? ? ? ? ?labels: spark-app-selector -> spark-b304270f8f6a4c99bfcdbad5a8826eb9, spark-role -> driver
? ? ? ? ?pod uid: f2b52a4a-1129-4f95-bf60-276445fd121c
? ? ? ? ?creation time: 2021-12-17T10:25:37Z
? ? ? ? ?service account name: spark-bq
? ? ? ? ?volumes: spark-sa-volume, spark-local-dir-1, spark-conf-volume-driver, kube-api-access-vktpv
? ? ? ? ?node name: gke-spark-on-gke-default-pool-75faa226-6327
? ? ? ? ?start time: 2021-12-17T10:25:37Z
? ? ? ? ?phase: Succeeded
? ? ? ? ?container status:
? ? ? ? ? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ? ? ? ? ?container state: terminated
? ? ? ? ? ? ? ? ?container started at: 2021-12-17T10:26:17Z
? ? ? ? ? ? ? ? ?container finished at: 2021-12-17T10:27:47Z
? ? ? ? ? ? ? ? ?exit code: 0
? ? ? ? ? ? ? ? ?termination reason: Completed
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Application status for spark-b304270f8f6a4c99bfcdbad5a8826eb9 (phase: Succeeded)
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Container final statuses:




? ? ? ? ?container name: spark-kubernetes-driver
? ? ? ? ?container image: eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? ? ? ?container state: terminated
? ? ? ? ?container started at: 2021-12-17T10:26:17Z
? ? ? ? ?container finished at: 2021-12-17T10:27:47Z
? ? ? ? ?exit code: 0
? ? ? ? ?termination reason: Completed
21/12/17 10:27:48 INFO LoggingPodStatusWatcherImpl: Application sparkBQ with submission ID spark:sparkbq-18871d7dc7ecfa41-driver finished
21/12/17 10:27:48 INFO ShutdownHookManager: Shutdown hook called
21/12/17 10:27:48 INFO ShutdownHookManager: Deleting directory /tmp/spark-3c107d80-f03b-4914-9303-ed85f31626a7
Fri Dec 17 10:27:48 UTC 2021 , ===> Completed spark-submit
Name:? ? ? ? ?sparkbq-18871d7dc7ecfa41-driver
Namespace:? ? spark
Priority:? ? ?0
Node:? ? ? ? ?gke-spark-on-gke-default-pool-75faa226-6327/10.154.15.218
Start Time:? ?Fri, 17 Dec 2021 10:25:37 +0000
Labels:? ? ? ?spark-app-selector=spark-b304270f8f6a4c99bfcdbad5a8826eb9
? ? ? ? ? ? ? spark-role=driver
Annotations:? <none>
Status:? ? ? ?Succeeded
IP:? ? ? ? ? ?10.64.1.5
IPs:
? IP:? 10.64.1.5
Containers:
? spark-kubernetes-driver:
? ? Container ID:? containerd://90c63e798de626c8dad0a87004994e89c427cd64996201585dfe318fc5d4b66c
? ? Image:? ? ? ? ?eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages
? ? Image ID:? ? ? eu.gcr.io/<PROJECT_ID>/spark-py@sha256:680242a8def48c9b52ae29e0de586096b9b0846a6d68407679d96c3860116f35
? ? Ports:? ? ? ? ?7078/TCP, 7079/TCP, 4040/TCP
? ? Host Ports:? ? 0/TCP, 0/TCP, 0/TCP
? ? Args:
? ? ? driver
? ? ? --properties-file
? ? ? /opt/spark/conf/spark.properties
? ? ? --class
? ? ? org.apache.spark.deploy.PythonRunner
? ? ? gs://<PROJECT_ID>-spark-on-k8s/codes/RandomDataBigQuery.py
? ? State:? ? ? ? ? Terminated
? ? ? Reason:? ? ? ?Completed
? ? ? Exit Code:? ? 0
? ? ? Started:? ? ? Fri, 17 Dec 2021 10:26:17 +0000
? ? ? Finished:? ? ?Fri, 17 Dec 2021 10:27:47 +0000
? ? Ready:? ? ? ? ? False
? ? Restart Count:? 0
? ? Limits:
? ? ? memory:? 11468Mi
? ? Requests:
? ? ? cpu:? ? ?3
? ? ? memory:? 11468Mi
? ? Environment:
? ? ? SPARK_USER:? ? ? ? ? ? ? ? ? ? ? hduser
? ? ? SPARK_APPLICATION_ID:? ? ? ? ? ? spark-b304270f8f6a4c99bfcdbad5a8826eb9
? ? ? GOOGLE_APPLICATION_CREDENTIALS:? /mnt/secrets/spark-sa.json
? ? ? GCS_PROJECT_ID:? ? ? ? ? ? ? ? ? <PROJECT_ID>
? ? ? SPARK_DRIVER_BIND_ADDRESS:? ? ? ? (v1:status.podIP)
? ? ? PYSPARK_PYTHON:? ? ? ? ? ? ? ? ? /usr/bin/python3
? ? ? PYSPARK_DRIVER_PYTHON:? ? ? ? ? ?/usr/bin/python3
? ? ? SPARK_LOCAL_DIRS:? ? ? ? ? ? ? ? /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e
? ? ? SPARK_CONF_DIR:? ? ? ? ? ? ? ? ? /opt/spark/conf
? ? Mounts:
? ? ? /mnt/secrets from spark-sa-volume (rw)
? ? ? /opt/spark/conf from spark-conf-volume-driver (rw)
? ? ? /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e from spark-local-dir-1 (rw)
? ? ? /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-vktpv (ro)
Conditions:
? Type? ? ? ? ? ? ? Status
? Initialized? ? ? ?True
? Ready? ? ? ? ? ? ?False
? ContainersReady? ?False
? PodScheduled? ? ? True
Volumes:
? spark-sa-volume:
? ? Type:? ? ? ? Secret (a volume populated by a Secret)
? ? SecretName:? spark-sa
? ? Optional:? ? false
? spark-local-dir-1:
? ? Type:? ? ? ?EmptyDir (a temporary directory that shares a pod's lifetime)
? ? Medium:
? ? SizeLimit:? <unset>
? spark-conf-volume-driver:
? ? Type:? ? ? ConfigMap (a volume populated by a ConfigMap)
? ? Name:? ? ? spark-drv-41ccd77dc7ecfe59-conf-map
? ? Optional:? false
? kube-api-access-vktpv:
? ? Type:? ? ? ? ? ? ? ? ? ? Projected (a volume that contains injected data from multiple sources)
? ? TokenExpirationSeconds:? 3607
? ? ConfigMapName:? ? ? ? ? ?kube-root-ca.crt
? ? ConfigMapOptional:? ? ? ?<nil>
? ? DownwardAPI:? ? ? ? ? ? ?true
QoS Class:? ? ? ? ? ? ? ? ? ?Burstable
Node-Selectors:? ? ? ? ? ? ? <none>
Tolerations:? ? ? ? ? ? ? ? ?node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
? Type? ? ?Reason? ? ? ?Age? ? From? ? ? ? ? ? ? ?Message
? ----? ? ?------? ? ? ?----? ?----? ? ? ? ? ? ? ?-------
? Normal? ?Scheduled? ? 2m12s? default-scheduler? Successfully assigned spark/sparkbq-18871d7dc7ecfa41-driver to gke-spark-on-gke-default-pool-75faa226-6327
? Warning? FailedMount? 2m13s? kubelet? ? ? ? ? ? MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-41ccd77dc7ecfe59-conf-map" not found
? Normal? ?Pulling? ? ? 2m11s? kubelet? ? ? ? ? ? Pulling image "eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages"
? Normal? ?Pulled? ? ? ?102s? ?kubelet? ? ? ? ? ? Successfully pulled image "eu.gcr.io/<PROJECT_ID>/spark-py:3.1.1-scala_2.12-8-jre-slim-buster-java8PlusPackages" in 29.475233796s
? Normal? ?Created? ? ? 93s? ? kubelet? ? ? ? ? ? Created container spark-kubernetes-driver
? Normal? ?Started? ? ? 93s? ? kubelet? ? ? ? ? ? Started container spark-kubernetes-driver
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=\(.*\)/\1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -z x ']'
+ export PYSPARK_PYTHON
+ '[' -z x ']'
+ export PYSPARK_DRIVER_PYTHON
+ '[' -n '' ']'
+ '[' -z ']'
+ '[' -z x ']'
+ SPARK_CLASSPATH='/opt/spark/conf::/opt/spark/jars/*'
+ case "$1" in
+ shift 1
+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.64.1.5 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner gs://<PROJECT_ID>-spark-on-k8s/codes/RandomDataBigQuery.py
21/12/17 10:26:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/12/17 10:26:24 INFO SparkContext: Running Spark version 3.1.1
21/12/17 10:26:24 INFO ResourceUtils: ==============================================================
21/12/17 10:26:24 INFO ResourceUtils: No custom resources configured for spark.driver.
21/12/17 10:26:24 INFO ResourceUtils: ==============================================================
21/12/17 10:26:24 INFO SparkContext: Submitted application: RandomDataBigQuery
21/12/17 10:26:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 3, script: , vendor: , memory -> name: memory, amount: 8192, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/12/17 10:26:24 INFO ResourceProfile: Limiting resource is cpus at 3 tasks per executor
21/12/17 10:26:24 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/12/17 10:26:24 INFO SecurityManager: Changing view acls to: 185,hduser
21/12/17 10:26:24 INFO SecurityManager: Changing modify acls to: 185,hduser
21/12/17 10:26:24 INFO SecurityManager: Changing view acls groups to:
21/12/17 10:26:24 INFO SecurityManager: Changing modify acls groups to:
21/12/17 10:26:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users? with view permissions: Set(185, hduser); groups with view permissions: Set(); users? with modify permissions: Set(185, hduser); groups with modify permissions: Set()
21/12/17 10:26:24 INFO Utils: Successfully started service 'sparkDriver' on port 7078.
21/12/17 10:26:24 INFO SparkEnv: Registering MapOutputTracker
21/12/17 10:26:24 INFO SparkEnv: Registering BlockManagerMaster
21/12/17 10:26:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/12/17 10:26:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/12/17 10:26:24 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/12/17 10:26:25 INFO DiskBlockManager: Created local directory at /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e/blockmgr-9189136d-85f6-4921-b647-1022783980df
21/12/17 10:26:25 INFO MemoryStore: MemoryStore started with capacity 4.1 GiB
21/12/17 10:26:25 INFO SparkEnv: Registering OutputCommitCoordinator
21/12/17 10:26:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/12/17 10:26:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at https://sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:4040
21/12/17 10:26:25 INFO SparkContext: Added file file:/tmp/spark-0d374b2f-af80-4bfc-bc98-453967dc64c0/spark_on_gke.zip at spark://sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7078/files/spark_on_gke.zip with timestamp 1639736784143
21/12/17 10:26:25 INFO Utils: Copying /tmp/spark-0d374b2f-af80-4bfc-bc98-453967dc64c0/spark_on_gke.zip to /var/data/spark-506aea67-1592-4fb0-9e78-2001212f490e/spark-f7c35f69-2321-4cae-b581-13e7484e75a6/userFiles-e1dd9b3f-7688-4ae6-9775-7d7eb34add54/spark_on_gke.zip
21/12/17 10:26:25 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
21/12/17 10:26:27 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/12/17 10:26:27 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 0.
21/12/17 10:26:27 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
21/12/17 10:26:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
21/12/17 10:26:27 INFO NettyBlockTransferService: Server created on sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7079
21/12/17 10:26:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/12/17 10:26:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManagerMasterEndpoint: Registering block manager sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc:7079 with 4.1 GiB RAM, BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, sparkbq-18871d7dc7ecfa41-driver-svc.spark.svc, 7079, None)
21/12/17 10:26:27 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
21/12/17 10:26:27 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.
21/12/17 10:26:27 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
21/12/17 10:26:57 INFO KubernetesClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000000000(ns)
21/12/17 10:26:57 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/opt/spark/work-dir/spark-warehouse').
21/12/17 10:26:57 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.


Started at
17/12/2021 10:27:02.02
10
starting at ID =? 1321 ,ending on =? 1331


?writing to BigQuery table test.randomData


?Populated BigQuery table test.randomData


?rows written is? 10


?Reading from BigQuery table test.randomData




?rows read in is? 1330
+---+
|ID |
+---+
|2? |
|1? |
|3? |
|4? |
|8? |
|7? |
|9? |
|5? |
|6? |
|10 |
|16 |
|17 |
|11 |
|12 |
|18 |
|19 |
|20 |
|13 |
|14 |
|15 |
+---+
only showing top 20 rows


Rows tally in BigQuery table


Finished at
17/12/2021 10:27:46.46
Elapsed time in seconds is 82.47383785247803
pod "sparkbq-18871d7dc7ecfa41-driver" deleted
        

Pod creation stages

The pod creation starts with the driver container being created. Once the driver container is created, the scheduler through kube apiserver starts scheduling the executors as seen below and they are created in batches of three with one second in between. Finally the driver and two executors here are created and running.

hduser@ctpvm: /tmp> k get pods -n spark
NAME? ? ? ? ? ? ? ? ? ? ? ? ? ? ? READY? ?STATUS? ? ? ? ? ? ? RESTARTS? ?AGE
--> creating the driver
sparkbq-6d33537dc950a8ae-driver? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 12s


sparkbq-6d33537dc950a8ae-driver? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 43s


--> driver running
sparkbq-6d33537dc950a8ae-driver? ?1/1? ? ?Running? ?0? ? ? ? ? 45s


--> driver requests apiserver to schedule two pods for executors as below
randomdatabigquery-728f397dc9517c01-exec-1? ?0/1? ? ?ContainerCreating? ?0? ? ? ? ? 0s
randomdatabigquery-728f397dc9517c01-exec-2? ?0/1? ? ?Pending? ? ? ? ? ? ?0? ? ? ? ? 0s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ? ? ? ? ? ?0? ? ? ? ? 55s


--> All executor pods are created and running
randomdatabigquery-728f397dc9517c01-exec-1? ?1/1? ? ?Running? ?0? ? ? ? ? 32s
randomdatabigquery-728f397dc9517c01-exec-2? ?1/1? ? ?Running? ?0? ? ? ? ? 32s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ?0? ? ? ? ? 87s


randomdatabigquery-728f397dc9517c01-exec-1? ?1/1? ? ?Running? ?0? ? ? ? ? 81s
randomdatabigquery-728f397dc9517c01-exec-2? ?1/1? ? ?Running? ?0? ? ? ? ? 81s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ?0? ? ? ? ? 2m16s


--> Job is completed and pods are terminating
randomdatabigquery-728f397dc9517c01-exec-1? ?0/1? ? ?Terminating? ?0? ? ? ? ? 82s
randomdatabigquery-728f397dc9517c01-exec-2? ?0/1? ? ?Terminating? ?0? ? ? ? ? 82s
sparkbq-6d33537dc950a8ae-driver? ? ? ? ? ? ? 1/1? ? ?Running? ? ? ?0? ? ? ? ? 2m17s        

Eventually the Spark job completes and the 10 rows are posted to BigQuery table.

Accessing Driver UI

The Spark UI can be accessed locally using?kubectl port-forward.

DRIVER_POD_NAME=`kubectl get pods -n spark |grep driver|awk '{print $1}'`

kubectl port-forward $DRIVER_POD_NAME 4040:4040 -n spark

Forwarding from 127.0.0.1:4040 -> 4040

Forwarding from [::1]:4040 -> 4040        

The Spark driver UI can be accessed on?https://localhost:4040. Example, the executors are shown below:

No alt text provided for this image

Debugging

To get some basic information about the scheduling decisions made around the driver pod, you can run:

NAMESPACE="spark"
DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
kubectl describe pod $DRIVER_POD_NAME -n $NAMESPACE
kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
?`        

This is also shown in gke.sh https://github.com/michTalebzadeh/spark_on_gke/blob/main/deployment/src/scripts/gke.sh

Scaling down the number of nodes in your cluster to zero?

You can temporarily scale the number of nodes in your cluster down to zero by running the following command to save the cluster cost

gcloud container clusters resize spark-on-gke --num-nodes=0 --zone $ZONE --quiet        

You can scale it back to the required number of nodes as below:

gcloud container clusters resize spark-on-gke --num-nodes=$NODES --zone europe-west2-c --quiet        

Where $NODES could be 3 or any other number

Sizing the driver & executor cores and memory in Kubernetes cluster

Assuming that you have a three node Kubernetes cluster in Google cloud with E2 standard?machines that have?4?GB of system memory per VCPU giving 4 VPCU and 16,384MB of RAM.

An optimum sizing of the number of executors, CPU and memory allocation is important here. These are the assumptions:

  1. You want to fit exactly one Spark executor pod per Kubernetes node
  2. You should not starve the node OS, network etc. from CPU usage
  3. If you have 3 nodes, one node should be allocated to the driver and two nodes to the executors
  4. Regardless you want to execute the code in k8s as fast as possible

?I did some tests on this with leaving?1 VCPU out of 4 VCPUS to the OS on each node (Container & executor). The RAM allocated to each node was 16GB. I then set the initial container AND executor (memory starting from 10% of RAM) and incremented these in steps of 10% from 10% to 50% and measured the time taken for the code to finish (from start to finish). Basically a simple Python timing in the code

start_time = time.time()

end_time = time.time()

time_elapsed = (end_time - start_time)        

Which measured the completion time in seconds. The systematics were kept the same for all measurements and only one measurement taken at each memory setting i.e.?

--conf spark.driver.memory= <Memory in MB> \

--conf spark.executor.memory= <Memory in MB> \
        

Memories were set the same for both the container and executors.

The result I got were as follows:

No alt text provided for this image

So it appears that allocating 50-60% of RAM to both the driver and executors, provides an optimum value. Increasing the memory above 50% (say @60% = 9830MB) will result in the container never been created (stuck at pending), assuming it is trying to grab the memory as shown below:

kubectl describe pod sparkbq-b506ac7dc521b667-driver -n spark


?Events:

? Type? ? ?Reason? ? ? ? ? ? ?Age? ? ? ? ? ? ? ? ? ?From? ? ? ? ? ? ? ? Message

? ----? ? ?------? ? ? ? ? ? ?----? ? ? ? ? ? ? ? ? ----? ? ? ? ? ? ? ? -------

? Warning? FailedScheduling? ?17m? ? ? ? ? ? ? ? ? ?default-scheduler? ?0/3 nodes are available: 3 Insufficient memory.

? Warning? FailedScheduling? ?17m? ? ? ? ? ? ? ? ? ?default-scheduler? ?0/3 nodes are available: 3 Insufficient memory.

? Normal? ?NotTriggerScaleUp? 2m28s (x92 over 17m)? cluster-autoscaler? pod didn't trigger scale-up:
        

Conclusion

In this article I provided typical steps on how to process your data using the container orchestration system Kubernetes. As some of you may be aware I have preciously used Spark on Infrastructure as a Service on Google Dataproc. For example see this article https://www.dhirubhai.net/pulse/end-solution-using-google-dataproc-spark-sql-bigquery-mich/.

Touching on unique benefits orchestrating Spark Jobs on Kubernetes compared to other cluster managers like YARN:

  • Faster Scaling - Scaling containers are much faster than VMs (Virtual Machines)
  • Isolation - Packaging job dependencies in containers provides a better way to isolate workloads.
  • Portability - Containerization of spark applications gives ability to run the spark application on-premise and on cloud and different cloud providers
  • Optimize Costs - Utilize existing Kubernetes cluster to run data engineering or ML workload along with other applications
  • Scaling up and down the size of the nodes in Kubernetes cluster

Currently, the default scheduler of Kubernetes only supports pod-based scheduling, it cannot provide job-based scheduling for Spark applications. Thus, it is work in progress so to speak. Having said that, Kubernetes can offer more as it natively supports running other schedulers. For example, through support for Volcano, see Spark-36057 https://github.com/volcano-sh/volcano/issues/1704. No doubt, you will hear more about Spark on Kubernetes in the future releases of Apache Spark.

Disclaimer:?Great care has been taken to make sure that the technical information presented in this?article is accurate, but any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on its content?is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

xiaodong zheng

Full Stack | Big Data | Cloud Native | Architect

1 年

Hi, Mich, How do you handle this error ? Warning? FailedMount? 2m13s? kubelet? ? ? ? ? ? MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-41ccd77dc7ecfe59-conf-map" not found or just ignore it. Thanks.

回复

I recently published an addition to this article titled "Running Google Dataproc on Google Kubernetes Engine (GKE) with Spark" https://www.dhirubhai.net/pulse/running-google-dataproc-kubernetes-engine-gke-spark-mich/?trackingId=8uuJ%2BuIuTgqTJAy2%2ByrT9Q%3D%3D You ,may find it helpful.

回复

I have also created multiple docker files for Spark that are now available for download from the the docker hub: https://hub.docker.com/repository/docker/michtalebzadeh/spark_dockerfiles/tags?page=1&ordering=last_updated

Tushar Kesarwani

Lead Data Software Engineer at Booking.com

3 年

Great article. Check Argo for orchestration of spark jobs in k8s environment.

Deepak Kumar

OpenAi Langchain Pinecone - OLP | Tooling and Observability |WEB3 | Cloud Native | 2x GCP Certification,3x K8S Certification),AWS | Cloud Native | Big Data Architect | Reactive API | SecDevOps | gRPC | AI/ML | FP

3 年

Shouldn’t be there a dedicated CI and a dedicated CD pipeline for it ? Also there should be a way to make it cloud native or more cloud agnostic.

要查看或添加评论,请登录

Mich Talebzadeh (Ph.D.)的更多文章

社区洞察

其他会员也浏览了