In the world of distributed computing, we are bound to write efficient programs to reduce the latency, which can be achieved by following certain best practises. Lets see how data partitioning reduces latency and improves parallelism.
We can see the difference in the time taken by tasks like Reading 1 MB sequentially from memory whereas reading the same from the disk/SSD and while sending bytes across network.
Lets see another chart of the latency numbers in humanized version to infer the time difference
Now that we have a better intution about the latency lets relate, What’s the link between latency and partitioning and how they are connected.
What is Partitioning?
Simply put, the data within an RDD is split into many partitions and sent across the worker nodes.
Note that the data partition never span across the nodes. If a worker node with a partition is lost then data is retrieved by recomputing that RDD via its Parent RDD’s (thats where the resilient part of the RDD comes from)
Custom partitioning is only possible when working with Pair RDDs since partitioning is done based on keys
Types of Partitioning
By default, when a spark job starts the number of partitions is equal to the total number of cores on all executor nodes.
So for example if all of the machines in your cluster have 4 cores and you have 6 worker nodes then that means the default number of partitions you can start with may be 24.
1 2 3 4 5 |
val defaultPartRdd = sc.parallelize(data) [out] defaultPartRdd = ParallelCollectionRDD[79] at parallelize at <console>:35 ParallelCollectionRDD[79] at parallelize at <console>:35 |
1 2 3 4 5 |
// number of partitions made by spark. defaultPartRdd.partitions.size [out] 4 |
1 2 3 4 5 |
//To get the number of cores of the machine. Runtime.getRuntime().availableProcessors(); [out] 4 |
Spark reads data from HDFS and number of partitions will be determined by numbers of input split for the file in HDFS. If there are N input splits then there will be N partitions. if we are reading a textfile the the input splits will be set by TextInputFormat.
1 2 3 4 5 |
val textfileRDD = sc.textFile('path of your file') textfileRDD.getNumPartitions [out] Int = 3413 |
My Input file size is about 114.5 GB which is 114500MB divided by 32 (fs.local.blocksize default value is 32 MB)) gives around 3579 partitions. But if the lines in your file is too long (longer than the block size), amount of partitions would be smaller.
The three types of partitioning are
- Hash partitioning
- Range partitioning
- Custom partitioning
Hash Partitioning
Uses Java’s Object.hashCode method to determine the partition as partition = key.hashCode() % numPartitions.
We will go with an example of some tuples (data) and convert it to RDD (hashpartrdd) and then partition it.
1 2 3 4 5 6 7 |
import org.apache.spark.HashPartitioner val data = Vector((1,1), (1,2), (1,3), (1,4), (3,1), (2,2), (2,3), (3,1), (4,2), (3,3), (4,1), (6,2), (4,3)) [out] data = Vector((1,1), (1,2), (1,3), (1,4), (3,1), (2,2), (2,3), (3,1), (4,2), (3,3), (4,1), (6,2), (4,3)) Vector((1,1), (1,2), (1,3), (1,4), (3,1), (2,2), (2,3), (3,1), (4,2), (3,3), (4,1), (6,2), (4,3)) |
We can mention the type of partition we want on the data by .partitionBy() method. The number of partitions we want can be done by passing it inside the HashPartitioner() method.
1 2 3 4 |
var numPartitions = 4 [out] numPartitions = 4 |
Note that partitionBy() is a transformation, so it always returns a new RDD. Therefore it is important to persist otherwise, partitioning is repeatedly applied, which involves shuffling each time the RDD is used.
1 2 3 4 5 |
val hashpartrdd = sc.parallelize(data).partitionBy(new HashPartitioner(numPartitions)).persist [out] hashpartrdd = ShuffledRDD[26] at partitionBy at <console>:32 ShuffledRDD[26] at partitionBy at <console>:32 |
1 2 3 4 5 6 7 8 9 10 11 |
// To know the type of partitioner used use the .partitioner method on the rdd hashpartrdd.partitioner [out] Some(org.apache.spark.HashPartitioner@4) // To know the number of partitions hashpartrdd.partitions.size [out] 4 |
Hash partitioning tries to spread around the data as evenly as possible over all of the partitions based on the keys. Here we can see the data entering the partions as below.
1 2 3 4 5 |
var partition = hashpartrdd.keys.map(_ .hashCode() % numPartitions).collect [out] partition = Array(0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3) Array(0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 3, 3, 3) |
Lets assume this in a distributed system then the data on partition 0, 1, 2, 3 are sent to worker nodes 1, 2, 3, 4.
- The partition (0) consists of the tuple (1,1), (1,2), (1,3) —-> worker node 1
- The partition (1) consists of the tuple (1,4), (3,1), (2,2), (2,3) —-> worker node 2
- The partition (2) consists of the tuple (3,1), (4,2), (3,3) —-> worker node 3
- The partition (3) consists of the tuple (4,1), (6,2), (4,3) —-> worker node 4
If we want to group the elements by key across the worker nodes, then we will be calling the groupByKey method.
1 2 3 4 |
hashpartrdd.groupByKey.collect [out] Array((4,CompactBuffer(2, 1, 3)), (1,CompactBuffer(1, 2, 3, 4)), (6,CompactBuffer(2)), (2,CompactBuffer(2, 3)), (3,CompactBuffer(1, 1, 3))) |
Notice that above groupByKey method actually groups the data over the network across the worker nodes (shuffling). This paves way to more time taken to compute the resulting grouped RDD.
Range Partitioning
To eliminate the above shuffling across the network, we can use Range paritition where the keys of our Pair RDD’s can be segregated within certain ranges. Lets consider the previous example “data”.
RangePartitioner will sort the records based on the key and then it will divide the records into a number of partitions based on the given value.
1 2 3 4 5 6 |
import org.apache.spark.RangePartitioner data [out] Vector((1,1), (1,2), (1,3), (1,4), (3,1), (2,2), (2,3), (3,1), (4,2), (3,3), (4,1), (6,2), (4,3)) |
1 2 3 4 5 |
val datardd = sc.parallelize(data) [out] datardd = ParallelCollectionRDD[39] at parallelize at <console>:32 ParallelCollectionRDD[39] at parallelize at <console>:32 |
1 2 3 4 5 6 7 |
val numPartitions = 4 val rpartrdd = new RangePartitioner(numPartitions,datardd) [out] rpartrdd = org.apache.spark.RangePartitioner@e9c6e org.apache.spark.RangePartitioner@e9c6e |
1 2 3 4 5 |
val rangepartrdd = datardd.partitionBy(rpartrdd) [out] rangepartrdd = ShuffledRDD[44] at partitionBy at <console>:32 ShuffledRDD[44] at partitionBy at <console>:32 |
1 2 3 4 |
rangepartrdd.mapPartitionsWithIndex((index, iterator) => iterator.toList.map(x => x + "->" + index).iterator).collect [out] Array((1,1)->0, (1,2)->0, (1,3)->0, (1,4)->0, (3,1)->1, (2,2)->1, (2,3)->1, (3,1)->1, (3,3)->1, (4,2)->2, (4,1)->2, (4,3)->2, (6,2)->3) |
1 2 3 4 |
rangepartrdd.groupByKey.collect [out] Array((1,CompactBuffer(1, 2, 3, 4)), (3,CompactBuffer(1, 1, 3)), (2,CompactBuffer(2, 3)), (4,CompactBuffer(2, 1, 3)), (6,CompactBuffer(2))) |
- The partition (0) consists of the tuple (1,1), (1,2), (1,3), (1,4) —-> worker node 1
- The partition (1) consists of the tuple (3,1), (2,2), (2,3), (3,1), (3,3) —-> worker node 2
- The partition (2) consists of the tuple (4,1), (4,2), (4,3) —-> worker node 3
- The partition (3) consists of the tuple (6,2) —-> worker node 4
When ran in cluster we can see the difference in time taken comparing GroupByKey on hash and range partitioned RDD’s
We can notice that the range partitioned data is more skewed while the hash partitioned data is more evenly distributed across the nodes, yet the compute time is less for range partitioner because it eliminates the data shuffle between nodes.
Spark’s Java and Python APIs benefit from partitioning in the sameway as the Scala API. However, in Python, you cannot pass a HashPartitioner object to partitionBy; instead, you just pass the number of partitions desired (e.g., rdd.partitionBy(100)).
Custom Partitioning
To implement a custom partitioned, we need to extend the Partitioner class and implement the three methods which are
- numPartitions : Int : returns the number of partitions
- getPartitions(key : Any) : Int : returns the partition ID (0 to numPartitions-1) for the given key
- equals() : This is the standard Java equality method. Spark will need to test our partitioner object against other instances of itself when it decides whether two of our RDDs are partitioned the same way!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import org.apache.spark.Partitioner class CustomPartitioner(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { val k = key.asInstanceOf[Int] return k%numPartitions // the partition number is calculated as modulo division on Key with numPartitions } override def equals(other: scala.Any): Boolean = { other match { case obj : CustomPartitioner => obj.numPartitions == numPartitions case _ => false } } } |
we have defined a class named CustomPartitoner. lets partition our data using this class.
1 2 3 4 5 |
val customrdd = sc.parallelize(data) [out] customrdd = ParallelCollectionRDD[66] at parallelize at <console>:35 ParallelCollectionRDD[66] at parallelize at <console>:35 |
1 2 3 4 5 |
val customPartRdd = customrdd.partitionBy(new CustomPartitioner(4)).persist [out] customPartRdd = ShuffledRDD[67] at partitionBy at <console>:34 ShuffledRDD[67] at partitionBy at <console>:34 |
1 2 3 4 |
customPartRdd.collect [out] Array((4,2), (4,1), (4,3), (1,1), (1,2), (1,3), (1,4), (2,2), (2,3), (6,2), (3,1), (3,1), (3,3)) |
1 2 3 4 |
customPartRdd.mapPartitionsWithIndex((index, iterator) => iterator.toList.map(x => x + "->" + index).iterator).collect [out] Array((4,2)->0, (4,1)->0, (4,3)->0, (1,1)->1, (1,2)->1, (1,3)->1, (1,4)->1, (2,2)->2, (2,3)->2, (6,2)->2, (3,1)->3, (3,1)->3, (3,3)->3) |
The custom partitioner has partitioned as below.
- 4 % 4 = 0 ————-> The partition (0) consists of the tuple (4,2), (4,1), (4,3)
- 1 % 4 = 1 ————-> The partition (1) consists of the tuple (1,1), (1,3), (1,4)
- 2 % 4 and 6 % 4 = 2 –> The partition (2) consists of the tuple (2,2), (2,3), (6,2)
- 3 % 4 = 3 ————–> The partition (3) consists of the tuple(3,1), (3,1), (3,3)
Thus we can customize the condition on which the partition of data happens.
Invoking Partitions
- By calling PartitionBy method on the RDD
- Certain transformation functions will partition the data under the hood.
For example the join function invokes hash partitioning by default while the sortByKeys function invokes range partitioning by default
1 2 3 4 5 6 7 |
val dataone = Seq((1,2), (2,3), (3,4),(4,5)) val datatwo = Seq((1,1), (2,2), (3,3),(4,4)) [out] dataone = List((1,2), (2,3), (3,4), (4,5)) datatwo = List((1,1), (2,2), (3,3), (4,4)) List((1,1), (2,2), (3,3), (4,4)) |
1 2 3 4 5 6 7 |
val dataonerdd = sc.parallelize(dataone) val datatwordd = sc.parallelize(datatwo) [out] dataonerdd = ParallelCollectionRDD[47] at parallelize at <console>:33 datatwordd = ParallelCollectionRDD[48] at parallelize at <console>:34 ParallelCollectionRDD[48] at parallelize at <console>:34 |
1 2 3 4 5 |
val resultingRdd = dataonerdd.join(datatwordd).persist [out] resultingRdd = MapPartitionsRDD[57] at join at <console>:31 MapPartitionsRDD[57] at join at <console>:31 |
1 2 3 4 5 |
// We can see that under the hood the join invokes hashpartitioner resultingRdd.partitioner [out] Some(org.apache.spark.HashPartitioner@4) |
Lets see how sortByKey invokes partitioning.
1 2 3 4 5 |
val sortedRdd = resultingRdd.sortByKey() [out] sortedRdd = ShuffledRDD[63] at sortByKey at <console>:30 ShuffledRDD[63] at sortByKey at <console>:30 |
1 2 3 4 |
sortedRdd.collect [out] Array((1,(2,1)), (2,(3,2)), (3,(4,3)), (4,(5,4))) |
1 2 3 4 5 6 |
// We can see that under the hood the sortByKey invokes rangepartitioner sortedRdd.partitioner [out] Some(org.apache.spark.RangePartitioner@e988e) |
Functions that can invoke some kind of partition under the hood
- join
- leftOuterJoin
- rightOuterJoin
- groupByKey
- reduceByKey
- cogroup
- foldByKey
- combineByKey
- mapValues (if parent has a partitioner)
- flatMapValues (if parent has a partitioner)
- filter (if parent has a partitioner)
- sort
- partitionBy
- groupWith
Notice that map and flatMap function isnt listed above, but mapValues and flatMapValues are. This is because the map and flatMap functions have the ability to change the keys of the RDD hence has the power to change the partitioned RDD.
NOTE : Before using a map or Flatmap over a large transformed RDD its good to cache that RDD.
Partitioning will not be helpful in all applications for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.
By default, when a spark job starts the number of partitions is equal to the total number of cores on all executor nodes.
Can we validate the above statement ?