Apache Spark: 5 Performance Optimization Tips

Interesting and specific lessons learned from experience

So recently my team and I started writing down lessons and conclusions from every issue we had with Spark. In this post I’m going to give you 5 interesting tips, that are quite specific, and you may face some issues in which knowing those tips can solve the problem. I hope you’ll find them valuable.

1) Parquet schema Vs. Hive Metastore in SparkSQL

When reading a Hive table made of Parquet files, you should notice that Spark has a unique way of relating to the schema of the table. As you may know, the Parquet format stores the table schema in its footer. That’s why sometimes there can be some conflicts between the Parquet schema and the Hive schema (in the Metastore).

When Spark reads a Hive/Parquet table, it generates a new schema by the following rules:

  1. In case of columns that have the same name in both the Parquet schema and the Hive Metastore, but not the same datatype, Spark will choose the Parquet datatype. This is very important to know, and its the reason I’m writing this tip.
    A few weeks ago, we had an issue of reading a column that was of type timestamp in the Hive Metastore, but the type in the Parquet schema was string and because the code performed a cast to long — the whole process didn’t work. It took us some time to recognize the conflict.
  2. The new generated SparkSQL schema will contain only the columns that appear in the Hive Metastore, any Parquet columns that doesn’t appear there will be emitted.

Further reading: http://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion

2) JDBC fetch size in SparkSQL

SparkSQL enables reading from a lot of databases through JDBC. Therefore, Spark supports many features that JDBC offers, one of them is the fetchsize — which will be the subject of this tip.

This parameter is very important because of 2 cases:

  1. If the fetch size is too big, we’ll try to process to much data in one time and we may face some GC problems or Out-Of-Memory errors. But that’s actually not the common case.
  2. If the fetch size is too small, and we need to fetch a massive amount of data, the network overhead will be high and dramatically slow down the process. That’s actually something we faced because the default value of the parameter for Oracle is 10, we raised it to 50,000 and the process ran 5 times faster.

Further reading: http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

3) spark.sql.shuffle.partitions

Shuffling in Spark happens after a groupby, join, or any operations of that sort. The number of the partitions after a shuffle is determined by the value in the parameter spark.sql.shuffle.partitions, the default value is 200. It means that if you didn’t change that value, after every shuffle that occurs you have 200 partitions, which can be quite a lot in some cases.

You should adjust this parameter to your specific case:

  • The number of partitions should be at least the number of the cores running your code (and you should actually set it to 2–3 times more than that).
  • The maximum size of a single partitions is limited by the memory of a single executor, therefore you should make sure to set enough partitions and avoid over-sized ones.
  • Too few partitions can lead to unused cores (wasted resources). On the other hand, too many will heavy on the tasks scheduling and management (because there will be a lot of tasks), and it can be an overhead.

Further reading: https://www.talend.com/blog/2018/03/05/intro-apache-spark-partitioning-need-know/

4) Join a small DataFrame with a big one

To improve performance when performing a join between a small DF and a large one, you should broadcast the small DF to all the other nodes. This is done by hinting Spark with the function sql.functions.broadcast(). Before that, it will be advised to coalesce the small DF to a single partition.

  • Notice that we use coalesce instead of repartition, because coalesce doesn’t invoke a full shuffle.

PySpark example:

from pyspark.sql.functions import broadcastsmallDF = smallDF.coalesce(1)
largeDF.join(broadcast(smallDF), largeDF.id == smallDF.id)

5) “Container killed by YARN for exceeding memory limits…”

This tip is written thanks to the profound research of my colleague (and dear friend) Shani Alisar.

If you’re using Spark on YARN for more than a day, I’m sure you have come across the following errors:

Container killed by YARN for exceeding memory limits ... of ... physical memory used. Consider boosting spark.yarn.executer.memoryOverhead

Container killed on request. Exit code is ...

Those are very common errors which basically says that your app used too much memory. Or more precisely — the driver or the executers reached the maximum container memory and got killed by YARN.

How to avoid it

  1. Repartition: Make sure to spread your data among enough partitions, and that every partition will consist an equal amount of data (see tip #3 above).
  2. Set a lower number of cores in spark.executor.cores, fewer cores will cause less parallel tasks under the same executor. Make sure the number of course is efficiently correlated to the number of partitions (again, tip #3). The default values are: YARN mode: 1, Standalone: all available cores.
  3. Boosting the memory overhead: The overhead is the gap between the container memory and the process memory (executor JVM max heap). We define this gap in order to keep the process running in the peaks, and not get killed because of the container memory limit. Therefore, if we increase the following parameters: spark.yarn.driver.memoryOverhead and spark.yarn.executor.memoryOverhead, we will be better prepared for the peaks, but the trade-off will be some performance loss — because the GC will work harder. The default value for those parameters is 10% of the defined memory (spark.executor.memory or spark.driver.memory)
  4. GC Tuning: You should check the GC time per Task or Stage in the Spark Web UI. If you think it doesn’t run in the optimal frequency — you can try play with the values of the related Java parameters in spark.executor.extraJavaOptions: -XX:OldSize={SIZE} -XX:MaxNewSize={SIZE} -XX:NewSize:{SIZE}
    Further reading: https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html
  5. For pyspark developers: Try setting a lower value to the spark.executor.memory parameter. The reason is, when you run pyspark — it involves 2 processes: an on-heap JVM process and an off-heap python process. The parameter defines the size of the heap, and both processes are limited by the container memory. Therefore, the larger the heap, the less memory the python process will have, and you may reach the container’s limit faster.

I hope you found those tips useful and informative. I tried to make it short because there is really a lot to say about some of those topics, but for the purpose of this post I think this level of detail is fine.

As always, I encourage you to correct me, ask questions and generally discuss the post in the comments.

I like data-backed answers