I spent 4 hours learning Apache Spark Resource Allocation
Spark's resource allocation mechanism and the two scheduling modes.
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
This week, we will explore Spark's resource allocation mechanism and the two scheduling modes: FIFO and scheduling.
Resource Allocation
As you might know, when running on a physical cluster, a Spark application gets an isolated set of executors (JVM processes) that are only responsible for processing and storing data for that application.
Spark provides two ways of allocating resources for Spark applications: static allocation and dynamic allocation.
Static allocation: Each application is allocated a finite maximum amount of resources on the cluster, which are reserved for the duration of the application as long as the SparkContext is running. Users can define the resource configuration.
Dynamic allocation (enabled by setting
spark.dynamicAllocation.enabled
toTrue
): Since version 1.2, Spark offers dynamic resource allocation. The application may return resources to the cluster if they are no longer used and can request them later when there is demand. Let’s dive into this approach in more detail.
Dynamic allocation
Spark should generally release executors when no longer needed and acquire new executors when necessary. Because it's impossible to predict whether an executor about to be removed will soon run a task or whether a newly added executor will remain idle, Spark uses a set of heuristics to decide when to remove and request executors.
Request Policy
An application with dynamic allocation enabled requests more executors when pending tasks are waiting to be scheduled. It only requests when the tasks have been pending for a defined interval.
The request is triggered when there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter if the queue of pending tasks persists. —Source—
The requests are made in rounds, and the number of executors in each round increases exponentially from the previous round. For example, an application will request 1 executor in the first round and then 2, 4, 8, etc.
There are two reasons behind this approach:
First, an application should request a small number of executors at first to ensure that only a few additional executors are used if they are sufficient.
Second, the application should be able to accelerate its resource usage if many executors are actually needed.
Remove Policy
The policy for removing executors is straightforward. A Spark application removes an executor when idle for a predefined interval (spark.dynamicAllocation.executorIdleTime).
Graceful Decommission of Executors
In static resource allocation, an executor only exits when its associated application has also been completed; this implies that the executor can be safely discarded.
However, when an executor is removed, the application still runs with dynamic allocation. If the application attempts to access data stored in or written by the executor, it must perform data recomputing.
Thus, a mechanism exists to gracefully remove an executor by preserving its data before removing it. In other words, we will try to make the executor stateless a bit.
During a shuffle, the executor writes its map outputs locally to disk and then serves as the entry point for other executors to fetch those files.
The solution is to use an external shuffle service. This service runs as a long-running process on each cluster node, independently of Spark applications and their executors.
Spark executors fetch shuffle files from the service rather than from each other. This means that shuffle data produced by an executor can continue to be served even after the executor has been terminated.
Besides outputting shuffle data, executors also cache data on disk or in memory. However, when an executor is removed, all cached data is gone. Users can configure executors containing cached data that are never removed by default.
In the future, cached data may be stored off-heap and managed independently from the executor's lifetime. This is similar to how Spark handles shuffle data with the external service.
Schedule Mode
First In First Out (FIFO)
Note: The above image is just meant to convey the general idea. It may not reflect the exact implementation.
By default, jobs are run in FIFO order. The first job gets priority on all available resources, followed by the next jobs. If the first job doesn’t consume all of the cluster’s resources, later jobs can start running immediately. However, if some of the first jobs use up all the resources, later jobs may remain pending for a while.
Fair
Since Spark 0.8, the user can configure fair scheduling between jobs. With this mode, Spark assigns tasks between jobs in a round-robin fashion to ensure equal resource sharing between jobs. This implies that short jobs submitted while a long job is running can start receiving resources immediately without waiting for the long job to finish.
Note: The above image is just meant to convey the general idea. It may not reflect the exact implementation.
The approach is modeled after the Hadoop Fair Scheduler. The fair scheduler supports grouping jobs into pools and setting various scheduling options for each pool, such as the weight.
This can help isolate workload so critical jobs can be executed on a more resource pool. (User can configure which jobs can be run on which pools.)
Here are some pool properties that user can configure:
Scheduling Mode (default is FIFO): This option controls the scheduling behavior in a specific pool. This can be FIFO or FAIR.
Weight (default is 1): This controls the pool’s cluster share relative to other pools. By default, all pools have a weight of 1, which means they will have the same amount of resources. However, if a pool has a weight of 2, it will have double the resources of other pools.
minShare (default is 0): Each pool can be given a minimum share (e.g., minimum number of CPU cores). The scheduler always tries to meet all active pools’ minimum shares before redistributing extra resources according to the weights.
Outro
Thank you for reading this far.
In this article, we explored Spark's resource allocation behavior and its scheduling modes, including FIFO and Fair scheduling.
Now it’s time to say goodbye. See you in my next blog!
Reference
[1] Apache Spark Job Scheduling
[2] Holden Karau, Rachel Warren, High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (2017)
Before you leave
If you want to discuss this further, please comment or contact me via LinkedIn, Email, or Twitter.
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.