Spark: Under the Hood - Part 2
Abhishek Mazumdar
Senior Machine Learning Engineer | building intelligent applications
This article is a continuation of the Spark: Under the Hood -Part 1 , and it will be highly recommended to read Part 1 and then continue with this Part 2 article.
In Part -1 we went over the process by which the Application master, cluster resource manager, and executors are provisioned followed by spark code execution sequence which covered topics like Transformation types, actions, blocks, stages, exchange buffers, and tasks.
In this article, we will look into spark Memory Allocation process.
Spark Memory Allocation
When a spark job is submitted to a cluster, the spark-submit command can be utilized to configure several memory requirements.
Driver Node Container
This is the place where the Application Master is set to run. The YARN resource manager allocates the requirements based on the below parameters:
- spark.driver.memory = 4g (in gigabytes)
- spark.driver.memoryOverhead = 0.1 (in percentage)
Below is a spark-submit code
spark-submit --master yarn
--jars <comma-separated-jars>
--conf <spark-properties>
--name <job_name> <python_file> <argument 1> <argument 2>
The Memory Overhead parameter is a percentage of the driver's main memory. By default, it is set to a maximum of 10% or 348 MB.
Driver main memory is utilized to execute spark workloads. in other words, this is the JVM working memory. On the other hand, the overhead memory is utilized for container processes and all the non-JVM workloads. Spark drivers are only allowed to utilize JVM heap memory and not overhead.
Executor Memory Allocation
In order for YARN to provision executor containers, the following spark-configuration parameters are required:
- Heap Memory (spark.executor.memory)
- Overhead Memory (spark.executor.memoryOverhead)
- Off-Heap Memory (spark.memory.offHeap.size)
- Pyspark Memory (spark.executor.pyspark.memory)
Additionally, Executor cores (spark.executor.cores)
Note: YARN or any cluster manager can never allocate memory and compute requirements beyond the physical limits of the worker nodes. For example, if the worker node has RAM of 8GB and 4 Cores, then a request of 12GB and 8 Cores can never be facilitated. Hence, it is helpful to first check the below configuration parameters to help make relevant decisions:
- yarn.scheduler.maximum-allocation-mb
- yarn.nodemanager.resource.memory-mb
Heap is nothing but the main memory for JVM workloads and Overhead Memory configuration is MAX(10 % OR 348MB) similar to Driver configurations. For explanation purposes let's imagine the requirement of an executor is 12GB of memory.
- Reserved Memory —> This memory is reserved by spark by default for its own internal processes
- Spark Memory —> This is the memory space where the main operations of spark take place.
- Leftover Memory —> This memory is utilized for user workloads such as: 1. User-defined data structures 2. Spark’s internal metadata3. User Defined Functions (UDFs)4. RDD conversion operations, RDD lineage and dependency
The Spark Main Memory percentage is 60% by default but it is an editable parameter.
This Spark Memory is further broken down into two separate pools:
- Storage Memory Pool (50 % of spark Memory)
- Executor Memory Pool (50 % of spark Memory)
By default, these two pools get 50% of the spaces, but again the default configuration can be changed. This memory is used by multiple threads within the Executor as per the number of provisioned cores. In our case, we requested for 4 cores.
Cores are simply the threads within a single JVM. Parallel execution happens within these Threads (also known as slots). Below is what the memory allocation will look like for 4 cores.
Executor memory allocation is dynamic in nature and As per the executor tasks, the required memory is allocated accordingly. Unused threads are not allocated any memory (as of spark versions > 1.2).
The requirement for caching dataframes is limited and the available memory can be dynamically accessed by the Executor pool. It is also true for the Memory Pool as well, meaning if the caching requirement is more than the executor memory requirement and the executor memory has some available bandwidth then, the Memory pool can borrow additional storage from the executor pool.
In cases of change in demand of ongoing operations, when for instance the memory pool has borrowed some space out of the executor pool and the executor pool now requires that occupied percentage back then, spark will cache those dataframes into the hard disk and give back the space to the executor.
However, in case of further additional executor memory requirements when the memory pool is already filled with its maximum capacity, then Out of Memory (OOM) Exception will be encountered.
Here is where Off-Heap Memory comes into the picture. The off-heap memory is an additional storage space that is required to overflow in-memory cached data to this space, and as a result, the space will then be available for the required high-priority operation.
The Heap memory of JVM is subjected to garbage collection which is an automatic process. It analyzes irrelevant data that is no longer in utilization for the run time and it triggers a cleanup process of Garbage Collection. By adding off-heap memory, we indirectly increase the executor and memory pool. This feature is disabled by default but can be enabled by (spark.memory.offHeap.enabled = true) and configured by (spark.memory.offHeap.size = **) parameters.
while developing using pyspark, there can be scenarios where we may need to install additional libraries beyond what is offered by pyspark, then such off-heap overhead memory will be required as it is not a part of the JVM process. This can be configured using (spark.executer.pyspark.memory = **)
That's all with this article. The effort was to summarize learnings from different resources and put in a sequence of articles to be read for quick revisions and interview preparation.