Spark 3.0 released while ago. A bunch of new features are added in this new version of spark. One of the interesting features added is Adaptive Query Execution. Let’s look into AQE optimizations in this post.

  • Dynamic Join Strategy
  • Handling Skew Data
  • Adaptive shuffle partitions

Let’s look into each feature in detail.

Dynamic Join Strategy:

Below is the strategy followed in Spark to generate the optimized physical plan. Once the optimized physical plan generated, it will be executed, with no further changes to the plan.

As part of Spark 3.0, Optimized physical plan will be further fine-tuned based on the latest stats. If one of the tables going to be used in join in less than 10 MB after getting the run time stats of the table, instead of using Sort Merge Join, automatically it will be converted into Broadcast Join. This will improve the performance of the query, as we are avoiding the data shuffle.

Handling Skew Data:

While handling a larger data set, if one of the keys is skewed then that task will run quite a long time compared to other tasks and this will reduce the overall performance of the application. In Spark 2.0 we need to salt the key so that multiple tasks will be created for that key and will run in parallel. For the same, we need to aware of the key and salt it. In Spark 3.0, Spark will detect the skew data and automatically will start multiple tasks. We can enable this feature by setting spark.sql.adaptive.skewJoin.enabled to true.

Adaptive shuffle partitions:

One of the issues I faced multiple times while handling different sizes of the data sets is shuffle partitions. By default whenever shuffle happens 200 partitions will be created. In scenarios like where we are joining two tables each around 1TB, 200 shuffle partitions will be too less. On the other hand whenever we are joining two tables each 100 MB, 200 shuffle partitions will be too high. We need to come up with a workaround solution like setting the shuffle partitions table-wise to avoid this. With AQE in pace, Spark will automatically adjust the shuffle partitions.

Note:To enable AQE we need to set spark.sql.adaptive.enabled to true.