In the world of distributed computing, we are bound to write efficient programs to reduce the latency, which can be achieved by following certain best practises. Lets see how data partitioning reduces latency and improves parallelism.

 

We can see the difference in the time taken by tasks like Reading 1 MB sequentially from memory whereas reading the same from the disk/SSD and while sending bytes across network.

Lets see another chart of the latency numbers in humanized version to infer the time difference

Now that we have a better intution about the latency lets relate, What’s the link between latency and partitioning and how they are connected.

What is Partitioning?

Simply put, the data within an RDD is split into many partitions and sent across the worker nodes.

Note that the data partition never span across the nodes. If a worker node with a partition is lost then data is retrieved by recomputing that RDD via its Parent RDD’s (thats where the resilient part of the RDD comes from)

Custom partitioning is only possible when working with Pair RDDs since partitioning is done based on keys

Types of Partitioning

By default, when a spark job starts the number of partitions is equal to the total number of cores on all executor nodes.

So for example if all of the machines in your cluster have 4 cores and you have 6 worker nodes then that means the default number of partitions you can start with may be 24.

Spark reads data from HDFS and number of partitions will be determined by numbers of input split for the file in HDFS. If there are N input splits then there will be N partitions. if we are reading a textfile the the input splits will be set by TextInputFormat.

My Input file size is about 114.5 GB which is 114500MB divided by 32 (fs.local.blocksize default value is 32 MB)) gives around 3579 partitions. But if the lines in your file is too long (longer than the block size), amount of partitions would be smaller.

The three types of partitioning are

  • Hash partitioning
  • Range partitioning
  • Custom partitioning

Hash Partitioning

Uses Java’s Object.hashCode method to determine the partition as partition = key.hashCode() % numPartitions.

We will go with an example of some tuples (data) and convert it to RDD (hashpartrdd) and then partition it.

We can mention the type of partition we want on the data by .partitionBy() method. The number of partitions we want can be done by passing it inside the HashPartitioner() method.

Note that partitionBy() is a transformation, so it always returns a new RDD. Therefore it is important to persist otherwise, partitioning is repeatedly applied, which involves shuffling each time the RDD is used.

Hash partitioning tries to spread around the data as evenly as possible over all of the partitions based on the keys. Here we can see the data entering the partions as below.

Lets assume this in a distributed system then the data on partition 0, 1, 2, 3 are sent to worker nodes 1, 2, 3, 4.

  • The partition (0) consists of the tuple (1,1), (1,2), (1,3) —-> worker node 1
  • The partition (1) consists of the tuple (1,4), (3,1), (2,2), (2,3) —-> worker node 2
  • The partition (2) consists of the tuple (3,1), (4,2), (3,3) —-> worker node 3
  • The partition (3) consists of the tuple (4,1), (6,2), (4,3) —-> worker node 4

If we want to group the elements by key across the worker nodes, then we will be calling the groupByKey method.

Notice that above groupByKey method actually groups the data over the network across the worker nodes (shuffling). This paves way to more time taken to compute the resulting grouped RDD.

Range Partitioning

To eliminate the above shuffling across the network, we can use Range paritition where the keys of our Pair RDD’s can be segregated within certain ranges. Lets consider the previous example “data”.

RangePartitioner will sort the records based on the key and then it will divide the records into a number of partitions based on the given value.

  • The partition (0) consists of the tuple  (1,1), (1,2), (1,3), (1,4) —-> worker node 1
  • The partition (1) consists of the tuple  (3,1), (2,2), (2,3), (3,1), (3,3) —-> worker node 2
  • The partition (2) consists of the tuple  (4,1), (4,2), (4,3)  —-> worker node 3
  • The partition (3) consists of the tuple  (6,2)   —-> worker node 4

When ran in cluster we can see the difference in time taken comparing GroupByKey on hash and range partitioned RDD’s

We can notice that the range partitioned data is more skewed while the hash partitioned data is more evenly distributed across the nodes, yet the compute time is less for range partitioner because it eliminates the data shuffle between nodes.

 Spark’s Java and Python APIs benefit from partitioning in the sameway as the Scala API. However, in Python, you cannot pass a HashPartitioner object to partitionBy; instead, you just pass the number of partitions desired (e.g., rdd.partitionBy(100)).

Custom Partitioning

To implement a custom partitioned, we need to extend the Partitioner class and implement the three methods which are

  • numPartitions : Int : returns the number of partitions
  • getPartitions(key : Any) : Int : returns the partition ID (0 to numPartitions-1) for the given key
  • equals() : This is the standard Java equality method. Spark will need to test our partitioner object against other instances of itself when it decides whether two of our RDDs are partitioned the same way!!

we have defined a class named CustomPartitoner. lets partition our data using this class.

The custom partitioner has partitioned as below.

  • 4 % 4 = 0    ————-> The partition (0) consists of the tuple (4,2), (4,1), (4,3)
  • 1 % 4 = 1    ————-> The partition (1) consists of the tuple (1,1), (1,3), (1,4)
  • 2 % 4 and 6 % 4 = 2 –> The partition (2) consists of the tuple (2,2), (2,3), (6,2)
  • 3 % 4 = 3   ————–> The partition (3) consists of the tuple(3,1), (3,1), (3,3)

Thus we can customize the condition on which the partition of data happens.

Invoking Partitions

  • By calling PartitionBy method on the RDD
  • Certain transformation functions will partition the data under the hood.

For example the join function invokes hash partitioning by default while the sortByKeys function invokes range partitioning by default

Lets see how sortByKey invokes partitioning.

Functions that can invoke some kind of partition under the hood

  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • cogroup
  • foldByKey
  • combineByKey
  • mapValues (if parent has a partitioner)
  • flatMapValues (if parent has a partitioner)
  • filter (if parent has a partitioner)
  • sort
  • partitionBy
  • groupWith

Notice that map and flatMap function isnt listed above, but mapValues and flatMapValues are. This is because the map and flatMap functions have the ability to change the keys of the RDD hence has the power to change the partitioned RDD.

NOTE : Before using a map or Flatmap over a large transformed RDD its good to cache that RDD. 

Partitioning will not be helpful in all applications for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.