What is Apache Spark ?

Spark is a fast and general-purpose cluster computing system for real-time processing. It was developed at the AMPLab at U.C. Berkeley in 2009 and later donated and open-sourced by Apache. It has a thriving open-source community and is the most active Apache project at the moment.

Why Spark ?

Speed: In-memory processing

Unlike Hadoop, Spark uses RAM for processing data and this makes it 100x faster than that of Hadoop.

Support: Developer-friendly API’s

Spark provides for lots of instructions that are a higher level of abstraction than what MapReduce provided. Currently spark gives support for Scala, Python, R and Java API to program and as well can be integrated with Yarn, Mesos, Kafka, Cassandra, Hbase, MongoDB, Amazon S3.

Simple: Lazy Evaluation

Computation in Spark doesn’t start unless an action is invoked. This is called as Lazy Evaluation and this makes spark faster and resourceful. Spark adds them to DAG (Directed Acyclic Graph) of computation and only when the driver requests some data, does this DAG actually gets executed.

How Spark works ?

Spark uses master/slave architecture, one master node, and one or many slave worker nodes.

Here, Driver is the central coordinator that runs on master node or name node and executors are on the worker nodes or data nodes that are distributed.

Spark has its own standalone cluster manager to run the spark applications, it also supports other cluster managers like YARN, Mesos etc.

Spark uses RDD (Resilient Distributed Dataset) which is a fault-tolerant collection of elements that can be operated in parallel. They are immutable in nature.

RDD’s support two types of operations

  • Transformations : Functions that produces new RDD from existing RDD’s. There are two types of transformations as shown below.

    1. Narrow transformation : doesn’t require the data to be shuffled across the partitions. for example, Map, filter and etc.

    2. Wide transformation : requires the data to be shuffled for example, reduceByKey and Join etc.

  • Actions : Functions that perform some kind of computation over the transformed RDD and sends the computed result from executors to driver.

Spark translates the RDD transformations into DAG and starts the execution. When an action is called the DAG is submitted to the DAG scheduler.

DAG scheduler divides operators into Stages and each Stages are comprised of units of work called as Tasks.

Stages are passed on to the Task Scheduler. The task scheduler launches the tasks via cluster manager.

The executors then executes the tasks on the worker nodes.


Entry point of Spark

After Spark 2.0 the entry point of spark is Spark Session. Spark enables its users to create as many sessions as possible for the Spark.

Prior Spark 2.0, Spark Context was the entry point of any spark application and used to access all spark features and needed a sparkConf which had all the cluster configurations and parameters to create a Spark Context object.

Spark session can be created using the builder pattern.

The spark session builder will try to get a spark session if there is one already created (in case of spark shell or databricks ) or create a new one and assigns the newly created SparkSession as the global default.

Inside the spark session we can get to create the Spark context and create our RDD objects.

Spark gives a straight forward API to create a new session which shares the same spark context. spark.newSession() creates a new spark session object.

Before implementing Spark session, for Hive or Sql has integrations with spark, seperate SparkContext was needed to be created for each of them. To avoid this Spark Session has been created with well defined API’s for most commonly used components.

RDD lineage

Once we apply transformations to the RDD’s we create an RDD lineage.

When we apply transformations on an existing RDD it creates a new child RDD, and this Child RDD carries a pointer to the Parent RDD along with the metadata about what type of relationship it has with the parent RDD.

These dependencies are logged as a graph which is called as RDD lineage or RDD dependency graph.

You can learn about a RDD lineage graph using RDD.toDebugString method which gives an output as below.

We can see the RDD’s created at each transformation for this wordcount example.

HadoopRDD (parentRDD) => MapPartitionRDD => MapParitionsRDD(flatMap) => MapParitionsRDD(Map) => ShuffledRDD (childRDD)

A lineage will keep track of what all transformations has to be applied on that RDD, including the location from where it has to read the data. It is used to derive to Logical execution plan.

This RDD lineage is used to recompute the data if there are any faults as it contains the pattern of the computation thus the resilience and the fault tolerance of Spark.

DAG scheduler

DAG Scheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling. It transforms a logical execution plan to a physical execution plan (using stages).

DAG Scheduler does three things in Spark

  • Computes an execution DAG or Physical execution plan, i.e. DAG of stages, for a job.

  • Determines the Preferred locations to run each task on.

  • Handles failures due to shuffle output files being lost.

We will cover the how the Physical plan is created in this blog, other two will be discussed in the upcoming blog series.

DAGScheduler uses event queue architecture to process incoming events, which is implemented by the DAGSchedulerEventProcessLoop class. The purpose of the DAGSchedulerEventProcessLoop is to have a separate thread to process events asynchronously and serially, i.e. one by one, and let DAGScheduler do its work on the main thread.

Whenever an action is called over an RDD, it is submitted as an event of type DAGSchedulerEvent by the Spark Context to DAGScheduler. It is submitted as a JobSubmitted case.

The first thing done by DAGScheduler is to create a ResultStage which will provide the result of the spark job which is submitted.

Now to execute the submitted job, we need to find out on which operation our RDD is based on. So backtracking begins.

In backtracking, we find the current operation and the type of RDD it creates. If the current operation produces a ShuffledRDD then shuffle dependency is detected which creates the ShuffleMapStage

DAG scheduler creates a shuffle boundary when it encounters Shuffle dependency or Wide transformation and creates a new stage. This new stage’s output will be the input to our ResultStage.

If we find another shuffle operation happening then again a new shuffleMapStage will be created and will be placed before the current stage (also a ShuffleMapStage) and the newly created shuffleMapStage will provide an input to the current shuffleMapStage.

Hence all the intermediate stages will be ShuffleMapStages and the last one will always be a ResultStage.

The DAG scheduler pipelines operators together. Narrow transformations on RDD produces RDD’s without Shuffle dependency that are collected together as a single stage by the DAG scheduler.

DAG also determines the execution order of stages. Execution order is accomplished while building DAG, Spark can understand what part of your pipeline can run in parallel. It optimises minimal stages to run the Job or action.

The final result of a DAG scheduler is a set of stages and it hands over the stage to Task Scheduler for its execution which will do the rest of the computation.

Task Scheduler

Stages consists of set of tasks which are given as input to the Task scheduler. When a Spark application starts TaskSchedulerImpl with a SchedulerBackend and DAGScheduler are created and started.

TaskSchedulerImpl is the default task scheduler in Spark that generates tasks. It uses the SchedulerBackend which schedules tasks on a cluster manager. It also uses the SchedulableBuilder to build and manage the Pool .

TaskSchedulerImpl submits the tasks using SchedulableBuilder via submitTasks

submitTasks registers a new TaskSetManager (for the given TaskSet ) and requests the SchedulerBackend to handle resource allocation offers (from the scheduling system) using reviveOffers method.

submitTasks requests the SchedulableBuilder to submit the task from TaskSetManager to the schedulable pool.

The schedulable pool can be a

  • single flat linked queue (in FIFO scheduling mode)

  • hierarchy of pools of Schedulables (in FAIR scheduling mode)

Any task either finishes succesfully or fails, TaskSetManager gets notified and also has the power to abort a TaskSet if the number of failures of task is greater than that of spark.Task.Maxfailures.

Executors

Tasks are then scheduled to the acquired Executors according to resources and locality constraints. They live in Worker nodes or slave to execute the tasks.

The ExecutorBackend uses launchTask to send tasks to Executor to get executed where the TaskRunner manages to do the execution of a single Task.

There are two kinds of tasks

  • Result task : computes the result stage and sends result back to the driver.

  • ShuffleMap task : Divides the elements of RDD into multiple buckets based on the partitioner mentioned in Shuffle Dependency.

ShuffleMap Task

The ShuffleMap Task involves with the shuffling of data and the steps involved with it.

  • Shuffle write

  • Shuffle read

and the functions that play valuable part in a shuffling process are

  1. Shuffle Manager : Manages the shuffle related components. Invoked within SparkEnv. (after version 1.2 Default : Sort Shuffle Manager)

  2. Shuffle Writer : Handles Shuffle data output logic. Returns Mapstatus tracked by MapOutputTracker once it writes data into buckets. Currently there are three shuffle writers as mentioned below.

    • SortShuffleWriter is requested to write records into shuffle partitioned file in disk store.

    • BypassMergeSortShuffleWriter is requested to write records into single shuffle block data file.

    • UnsafeShuffleWriter is requested to close the internal resources and write out merged spill files.

  3. Shuffle Reader : Fetches data from the buckets.

  4. Shuffle Block Manager : Manages the mapping of data between the buckets and the data blocks written in disk.

When executed, a ShuffleMapStage saves map output files using BlockManager from Mapper(MapTask) to buckets via Shuffle Writer that can later be fetched by Shuffle Reader and given to Reducer(ReduceTask).

ShuffleMapStage uses outputLocs and _numAvailableOutputs internal registries to track how many shuffle map outputs are available.

All intermediate stages will be ShuffleMapStages that are computed will end up with a ResultStage finally that will be computed by TaskRunner and the computed result is sent back to the Driver to display to User.

References

Khoa Nguyen Blog

Jacek laskowski gitbooks on Spark

Spark Source code

0x000f Blog

Footnotes

[DAG]  is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD.
[MapReduce]  is a processing technique and a program model for distributed computing based on java. The MapReduce algorithm contains two important tasks, namely Map and Reduce.
[ShuffledRDD]  shuffle flag , mapside combine flag if set true will create a shuffledRDD
[preferred locations]  using the getPreferredLocs(stage.rdd) function
[pool]  recursive data structure for prioritising TaskSet
[​ExecutorBackend]  Pluggable interface that TaskRunners use to send task updates to scheduler