By: Kash Sabba
Databricks is an advanced analytics platform that supports data engineering, data science, and machine learning use cases from data ingestion to model deployment in production. The prominent platform provides compute power in the cloud integrated with Apache Spark via an easy-to-use interface.
Databricks provisions notebook-style coding in Python, SQL, Scala, and R programming languages. The notebooks attach to compute clusters that optimize user queries leveraging Spark’s distributed parallel computing technology. (To see our previous article on Azure Databricks, click here.)
When working with large data sets, the following set of rules can help with faster query times. The rules are based on leveraging the Spark dataframe and Spark SQL APIs.
1. Partitions on Shuffle
Partition discovery is imperative when working with large tables or several large files. When doing data transformations such as group by or join on large tables or several large files, Spark shuffles the data between executor nodes (each node is a virtual computer in the cloud within a cluster).
This is an expensive operation and can be optimized depending on the size of the tables. Spark by default uses 200 partitions when doing transformations. The 200 partitions might be too large if a user is working with small data, hence it can slow down the query. Conversely, the 200 partitions might be too small if the data is big.
Users can adjust the 200 default shuffle partitions by using the command found below and incorporating it at the beginning of a cell in a notebook. This way, depending on the size of the tables and the complexity of the data transformation queries, a user can approximate the shuffle partitions. Figuring out the right size shuffle partitions requires some testing and knowledge of the complexity of the transformations and table sizes.
In the example below, let’s assume a user is joining a few dataframes and further aggregating. Making the assumption that the result of the joins and aggregations is 150 GB of shuffle read input (this number can be found in the Spark job UI) and considering a 200 MB block of shuffle partition on average, the number of shuffle partitions can be calculated roughly as (150 GB x 1024) / 200 MB = 768 partitions
The calculated number of partitions can be inputted into a cell using the command below to run the joins and aggregations faster.
spark.conf.set (“spark.sql.shuffle.partitions”, 768)
2. Cache Dataframes
Spark also offers caching intermediate big dataframes for repetitive use. For instance, if a dataframe is created using transformations by joining several other dataframes, and used for several queries downstream in a notebook, then the dataframe that is created can be cached in memory. This will provide significant performance improvement while running several queries on the same dataframe.
For instance, the dataframe “df_intermediate” below is used in several queries downstream in the notebook and is cached as shown below, with a cache() and count() method that will persist the dataframe in memory.
df_intermediate = df_table1. join (df_table2, on="column", how="inner").cache()
Doing the above cache and count will help speed up the queries shown below that use the df_intermediate dataframe several times.
df_query1 = df_intermediate.filter(“column1 = “x”).show()
df_query2 = df_intermediate.groupBy(“column1”, “column4”).count()
df_query3 = df_intermediate.select(“column5”).distinct().show()
As an example, the cache and count methods used on large dataframes such as df_table1 and df_table2 may run for 30 minutes to create df_intermediate. However, the three queries used downstream (query1, 2, and 3) may each take only 2 seconds to show the query results.
On the contrary, without cache and count methods, df_intermediate dataframe might only take 5 seconds to run, but each of the three queries downstream could take up to 30 minutes to run (aggregate of 90 minutes for three queries). This is because Spark doesn’t actually execute a command until an action such as cache, count, show etc. are called.
Using cache and count can significantly improve query times. Once queries are called on a cached dataframe, it’s best practice to release the dataframe from memory by using the unpersist() method.
3. Actions on Dataframes
It’s best to minimize the number of collect operations on a large dataframe. For instance, when a user uses the “collect” method to collect a subset of a dataframe or an entire dataframe, Spark must move the dataframe from the distributed worker nodes to the driver node. This can be an expensive operation.
If the size of the dataframe is bigger than the driver node, it likely will cause an out-of-memory error. Similarly, a “toPandas” method also collects a Spark dataframe into a pandas dataframe by moving the data across worker nodes into the driver node. This can also cause an out-of-memory error if not used correctly.
It’s best practice to minimize the amount of collect operations if possible or use a smaller subset of the data to collect during data exploration.
4. Writing Data
It’s good practice to write a few interim tables that will be used by several users or queries on a regular basis. If a dataframe is created from several table joins and aggregations, then it might make sense to write the dataframe as a managed table under “Data” (hive store in Databricks) or store it as a parquet file format. Spark offers “repartition” and “coalesce” methods, which are memory-based data partition techniques and “partitionby”, which is a dataframe writer partition technique.
The “repartition” method shuffles data and creates equal partitions of the data. This operation can be expensive since all data needs to be reshuffled, but it can improve performance during data reads since the data is partitioned in equal sizes. On the contrary, the “coalesce” method doesn’t shuffle data, which results in unequal data partition sizes.
Both parquet file format and managed table format provide faster reads/writes in Spark compared with other file formats such as csv or gzip etc. It’s best to use managed table format when possible within Databricks. If writing to data lake storage is an option, then parquet format provides the best value.
5. Monitor Spark Jobs UI
It is good practice to periodically check the Spark UI within a cluster where a Spark job is running. Users can access the Spark UI from the Spark job execution under a cell where code is written or directly from the cluster UI > “Spark UI”.
Within the UI, users can check the “Storage” tab to view cache statistics per each worker node. This provides feedback on cache size used for the job and disk usage per worker node. It’s important to reduce the amount of data that is spilled to the disk during shuffle operations. Data gets spilled to a disk when the job runs out of memory on a worker node.
Spilled disk operations provide poor performance on Spark job run time. The Spark UI can help users understand the size of spilled disk for Spark jobs. Data spills can be fixed by adjusting the Spark shuffle partitions and Spark max partition bytes input parameters.
Databricks provides fast performance when working with large datasets and tables. However, it should be noted that there is no one-solution-fits-all option. Each use case must go through a rigorous discovery process to clearly define a problem statement and leverage Databricks and Spark to fine-tune the solution.
Keep Your Business Intelligence Knowledge Sharp by Subscribing to our Email List
Get fresh Key2 content around Business Intelligence, Data Warehousing, Analytics, and more delivered right to your inbox!