I spent 8 hours learning the details of the Apache Spark scheduling process.
Anatomy of a Spark job and the typical scheduling process.
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
To resume the Apache Spark series, we will explore how Spark schedules the data processing for us this week.
The article begins with the anatomy of a Spark job. Then, we will explore the overview of a typical scheduling process and its related components and concepts.
Jobs, Stages, and Tasks
Job: In Spark, a job represents a series of transformations applied to data. It encompasses the entire workflow from start to finish. A single Spark application can have more than one Spark job.
Stage: A stage is a job segment executed without data shuffling. Spark splits the job into different stages when a transformation requires shuffling data across partitions. Speaking of transformations, there are two categories we need to explore:
Transformations with narrow dependencies are those where each partition in the child RDD has a limited number of dependencies on partitions in the parent RDD. These partitions may depend on a single parent (e.g., the map operator) or a specific subset of parent partitions known beforehand (such as with coalesce). This means that operations like map and filter do not require data shuffling. RDD operations with narrow dependencies are pipelined into one set of tasks in each stage.
Transformations with wide dependencies require data to be partitioned in a specific way, where a single partition of a parent RDD contributes to multiple partitions of the child RDD. This typically occurs with operations like groupByKey, reduceByKey, or join, which involve shuffling data. Consequently, wide dependencies result in stage boundaries in Spark's execution plan.
Task: A task is the smallest unit of execution within Spark. Each stage is divided into multiple tasks, which execute processing in parallel across different data partitions.
DAG: In Spark, the DagScheduler (more on this later) uses RDD dependencies to build a Directed Acyclic Graph (DAG) of stages for a Spark job. The DAG ensures that stages are scheduled in topological order.
The following section will provide an overview of the scheduling process.
The scheduling process
When we submit a Spark application, the SparkContext is first initialized
SparkContext is the entry point to all Spark’s components.
SparkContext
The SparkContext then initializes the TaskScheduler (for task-oriented scheduling) and the SchedulerBackend (which interacts with the cluster manager and provides resources to the TaskScheduler). After that, the DAGScheduler (for stage-oriented scheduling) is created.
DagScheduler
The scheduling process begins with the DAGScheduler building the DAG based on the dependencies between RDD objects.
The DAGScheduler traverses the RDD lineage from the final RDD (with action) back to the source RDD, building up a DAG of stages based on shuffle boundaries. Stages are formed where wide dependencies (shuffle boundaries) exist. Each stage consists of parallel tasks that can be executed on different partitions. Stages are created as ResultStages (final stage) or ShuffleMapStages (intermediate stages that perform shuffles).
The stage is then scheduled according to the DAG's topological order. Each stage is submitted once all its "parent stages" (upstream dependencies) are completed. The DAGScheduler handles failures due to lost shuffle output files, in which previous stages may need resubmit. Failures in a stage not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task several times before canceling the whole stage.
The DAGScheduler creates a TaskSet for each stage. A TaskSet includes fully independent tasks of a stage that are uncomputed.
Then, TaskSet is sent to the TaskScheduler for execution. In addition, the DAGScheduler determines the preferred locations to run each task based on the current cache status and sends these to the TaskScheduler.
Cache tracking: the DAGScheduler detected with RDDs are cached to avoid recomputing them and remembers which shuffle map stages have produced which output files to avoid duplicate process.
Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
TaskScheduler
Next, the process continues with the TaskScheduler. The TaskScheduler is responsible for scheduling tasks (in the TaskSet received from the DAGScheduler) on available executors. It interacts with a SchedulerBackend to schedule tasks across various cluster types. The SchedulerBackend is started and stopped as part of the TaskScheduler’s initialization and stopping process.
In more detail, when the DAGScheduler submits a TaskSet to the TaskScheduler, the TaskScheduler registers a new TaskSetManager and requests the SchedulerBackend to handle resource allocation offers. (More on this later when we discuss the SchedulerBackend.)
The TaskSetManager schedules the tasks in a single TaskSet. It keeps track of each task, retries tasks if they fail (up to a limited number of times), and handles locality-aware scheduling (using each task’s locality preference obtained from the DagScheduler). TaskSetManger tries to assign tasks to executors as close to the data as possible. There are several data locality types (nearest to farthest):
PROCESS_LOCAL: Task runs on the same executor where the data resides.
NODE_LOCAL: The task runs on the same node as the data but on a different executor.
NO_PREF data is accessed equally quickly from anywhere and has no local preference.
RACK_LOCAL: The task runs on the same rack but on a different node.
ANY: The task can run on any executor when no locality preferences are satisfied.
TasksetManager tries to achieve local locality-aware scheduling for a TaskSet by leveraging delay scheduling. This optimization technique has a simple idea: if a task cannot be scheduled on an executor with the desired locality level, TasksetManager will wait a short period before scheduling the task.
When a task is ready to be scheduled, it first checks if an available executor has the desired level of data locally. If no executors satisfy the desired locality level, TaskSetManager doesn't immediately assign the task. Instead, it delays the task for a short, configurable amount of time, hoping that a local executor will become available during that delay. If, after the delay, no satisfied executor becomes available, TaskSetManager will launch a task on an executor that has a more “relaxed” data locality.
There are cases when some tasks might take longer than other tasks (e.g., due to hardware problems). In the TaskSetManager, there is a health-check procedure called Speculative execution of tasks (enabled by setting spark.speculation = true )that checks for tasks to be speculated. Such slow tasks will be re-submitted to another executor. (This means that issues caused by hardware problems can be mitigated with speculative execution.)
TaskSetManager will not stop the slow tasks but launch a copy of that task in parallel. The first copy of the task that is completed successfully will be used, and other copies will be killed.
SchedulerBackend
As mentioned, the SchedulerBackend is started and stopped as part of the TaskScheduler’s initialization and stopping process. At first, the SchedulerBackend requests executors from the cluster manager, which then launches executors based on the application's requirements. Once started, the executors attempt to register with the SchedulerBackend through an RPC endpoint. If successful, the SchedulerBackend receives a list of the application's desired executors.
When the TaskScheduler creates the TaskSetManager, it requests resources from the SchedulerBackend to schedule the tasks. Based on the list of active executors, the SchedulerBackend retrieves WorkerOffers, each representing an executor's available resources.
Based on the Spark source code, active executors are those that are registered and are not pending removal, have not been lost without reason, and are not being decommissioned.
Then, the SchedulerBackend passes the WorkerOffers back to the TaskScheduler. The TaskScheduler assigns tasks from the TaskSet to the resources from the WorkerOffers, resulting in a list of task descriptions. These task descriptions are then returned to the SchedulerBackend, which launches tasks based on this task description list.
For each entry in this list, the SchedulerBackend serializes the task description. Additionally, it pulls the executor ID assigned to the task from the entry and uses this ID to retrieve information for that executor (e.g., hostname, cores, executor address, executor endpoint).
Finally, the SchedulerBackend sends the serialized task descriptions to the executor endpoints.
Task Execution on Executors
When receiving a serialized task description from the SchedulerBackend, the executor deserializes the task description and begins launching the task using the information provided.
During its lifecycle, the executor runs user-defined code, reads data from local or remote storage, performs computations, and writes out intermediate results, such as shuffle data.
Things go on
The process continues until all stage tasks are finished, with stages being processed in the DAG order. A Spark job is considered complete when all stages have finished.
Before we exit, let's review the typical end-to-end Spark scheduling process
Outro
Thank you for reading this far.
We’ve just explored the anatomy of a Spark job and walked through the Spark scheduling process, from RDD dependencies to how tasks are assigned to the executors.
Hope I can bring some value.
If you notice I missed something, feel free to let me know.
Now it’s time to say goodbye; see you on my next blog!
Reference
[1] Xingbo Jiang, Deep Dive into the Apache Spark Scheduler (2018)
[2] Holden Karau, Rachel Warren, High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark (2017)
[4] Mounika Tarigopula, Understanding speculative execution (2022)
[5] Jacek Laskowski, The Internals of Spark Core
[6] Mallikarjuna_g Gitbooks, CoarseGrainedSchedulerBackend
[7] Bimalendu Choudhary, Starting with Spark code (2021)
Before you leave
If you want to discuss this further, please comment or contact me via LinkedIn, Email, or Twitter.
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.