WHEN DOES SHUFFLE OCCUR?
A shuffle can occur when the resulting RDD from a transformation depends on other elements from the same or another RDD.
But the above statement is not completely valid, for example, let us take a map operation:
val r1 = sc.parallelize(0 to 9)
//generating key-value pairs setting values as the count of each key occured in r1
val r3 = r1.map((_, 1))
The above map operation results in a PairedRDD with 1 attached to each ParallelCollectionRDD.
Here each partition of the child RDD is fully dependent on one more parent partition. This is called Narrow Dependency
Here shuffling does not occur because one child partition is fully dependent on one[1:1] or many parent partition[N:1] which is going to be in the same machine.
But whereas take an example of groupByKey.
groupByKey() aggregates records with the same key by the shuffle. The compute() function in ShuffledRDD fetches necessary data for its partitions, then performs mapPartition() operation in a OneToOneDependency style. Finally, ArrayBuffer type in the value is cast to Iterable.
As we can see here, one child partition is dependent on ore more part of the parent partition which will be on different machines which need to be fetched for the transformation giving a place for shuffling. This is called Wide dependency.
As you might know, there are a number of shuffle implementations available in Spark. Which implementation would be used in your particular case is determined by the value of spark.shuffle.manager parameter. Three possible options are: hash, sort, tungsten-sort, and the “sort” option is the default starting from Spark 1.2.0.
A BRIEF HISTORY ON SHUFFLE WITH THE SPARK VERSIONS:
Spark 0.6-0.7: same code path with RDD’s persistent method, can choose MEMORY_ONLY and DISK_ONLY (default).
separate shuffle code path from BM and create ShuffleBlockManager and BlockObjectWriter only for shuffle, now shuffle data can only be written to disk.
Shuffle optimization: Consolidate shuffle write.
Spark 1.0: pluggable shuffle framework.
Spark 1.1:sort-based shuffle implementation.
Spark 1.2: netty transfer service reimplementation. sort based shuffle by default
Spark 1.2+ :external shuffle service etc.
- Spark 2+ on the go: SparkSession: new entry point that replaces the old SQLContext and HiveContext for DataFrame and Dataset APIs.
Prior to Spark 1.2.0, this was the default option of shuffle (spark.shuffle.manager = hash).
But it has many drawbacks, mostly caused by the number of files it creates – each mapper task creates a separate file for each separate reducer.
Resulting in M * R total files on the cluster, where M is the number of “mappers” and R is the number of “reducers”.
With a high amount of mappers and reducers, this causes big problems, both with the output buffer size, amount of open files on the filesystem, speed of creating and dropping all these files.
There is an optimization implemented for this shuffler, controlled by the parameter “spark.shuffle.consolidateFiles” (default is “false”).
If your cluster has E executors (“–num-executors” for YARN) and each of them has C cores (“spark.executor.cores” or “–executor-cores” for YARN) and each task asks for T CPUs (“spark.task.cpus“), then the number of execution slots on the cluster would be E * C / T, and the number of files created during shuffle would be E * C / T * R.
Instead of creating a new file for each of the reducers, it creates a pool of output files. When map task starts outputting the data, it requests a group of R files from this pool.
When it is finished, it returns this R files group back to the pool as each executor can execute only C / T tasks in parallel, it would create only C / T groups of output files, each group is of R files.
After the first C / T parallel “map” tasks have finished, each next “map” task would reuse an existing group from this pool.
But a huge amount of files written to the filesystem causes IO to skew towards random IO, which is in general up to 100x slower than sequential IO.
Each map task generates 1 shuffle data file + 1 index file, this way you can easily fetch the chunk of the data related to “reducer x” by just getting information about the position of the related data block in the file and doing a single fseek before fread.
If map-side combine is required, data will be sorted by key and partition for aggregation. Otherwise, data will only be sorted by partition.
But of course for a small amount of “reducers,” it is obvious that hashing to separate files would work faster than sorting.
spark.shuffle.sort.bypassMergeThreshold == 200 (default) If the number of reduce partitions < spark.shuffle.sort.bypassMergeThreshold then the SortshuffleManager opts the BypassMergeSortShuffleHandle
TUNGSTEN – SORT
The Tungsten Project is an umbrella project under the Apache foundation to improve the execution engine of Spark.
Tungsten is a new Spark SQL component that provides more efficient Spark operations by working directly at the byte level.
In Spark’s shuffle subsystem, serialization and hashing (which are CPU bound) have been shown to be key bottlenecks, rather than raw network throughput of underlying hardware by studies hence the inception of Project Tungsten.
Some of the main features that tungsten execution engine includes are as mentioned below.
Memory Management and Binary Processing:
- Java object overhead and the Garbage collector memory overheads are been handled by the tungsten using sun.misc.Unsafe setting, provided by the JVM that exposes C-style memory access (off-heap)
- Project Tungsten is designing cache-friendly algorithms and data structures so Spark applications will spend less time waiting to fetch data from memory and more time doing useful work.
- Spark dynamically generates bytecode for evaluating these expressions, rather than stepping through a slower interpreter for each row.
Tungsten-sort is similar to the sort based shuffle except for this leverages the on and off-heap memory by invoking the unsafe type and has tungsten data structures specially written to reduce the cost due to the overhead of java objects and for efficient Garbage collection.
After Spark 1.2 version, SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort, (unless you enabled one using spark.shuffle.manager property).
EXTERNAL SHUFFLE SERVICE
From Spark 1.2 version External shuffle service has been included in the Spark core.
When you run Spark with YARN or MESOS, dynamic resource allocation when enabled can be used to free the executors that have no task running on it. Also available in standalone mode.
If a map side of the shuffle operation has happened then intermediate shuffle files are usually stored in the executor and serves it to the reducer of the shuffle.
Since dynamic allocation paves way for the executor to be cleared if no task is running on it, this may cause the loss of shuffle files needed for the shuffle operation to complete.
This can be achieved by the external shuffle service, which basically makes the worker JVM or the name node in case of cluster manager to serve the shuffle files to the reducer of the shuffle. The executor writes the shuffle files into the buffer and then lets the worker JVM take care of it.
The external shuffle service must be activated (spark.shuffle.service.enabled configuration to true) and spark.dynamicAllocation.enabled to true for dynamic allocation to take place.
SHUFFLE RELATED PARAMETER TUNING
The following are some of the main parameters of the Shffule process, where the functions of each parameter, the default values, and the tuning recommendations based on practical experience are explained in detail.