Introduction
Hey data enthusiasts! This week, we're exploring one of the most powerful features of Apache Spark: Caching. We'll see various caching strategies, from basic DataFrame caching to advanced techniques like using persist with customizable storage levels. We'll walk through practical use cases, analyze performance implications, and demonstrate how to leverage Spark UI to monitor and optimize caching operations.
Caching dataframes
Caching is used to read and save frequently used data in-memory, thus reducing the need for accessing the disk for loading the data into RDDs or dataframes.
In case of caching, the results need to be checked while the application is still running. Spark UI helps us in achieving this.
- Consider the following example dataset.
- By default, 1 driver and 2 executors are allocated upon the starting of the job.
- We invoke an action on the dataframe.
- For this action, two stages get created because it involves shuffling of data.
- We cache the results into a dataframe as follows. Note that cache being lazy, gets executed only upon an action i.e. show() in this case.
- Spark being smart, caches only the required data. In our case, it knows that it only needs the first partition to give the results, hence it caches the first partition only.
- The memory deserialized storage format is computation optimized because of its object format. Note that in memory, data is always stored in this form.
- Subsequent executions are much faster as the data is accessed directly from the cache, without hitting the disk.
- Note that in this case, we require the entire data. So, all the 9 partitions will be cached.
- Notice the difference of execution durations between first execution and second execution.
- Storage formats for the 9 cached partitions.
- The executors get dynamically added or removed based on the requirement.
Cache use case 1
- We cache the following dataframe.
- We perform the filter transformation, forcing a scan of the entire dataset for considering the worst case scenario.
- Under the SQL tab, we get the following query flow diagram. In Memory table scan indicates that the data is getting accessed from the cache.
- As an extension of this use case, consider that we cache only two columns.
- Since we are using filters, only the required data / subset of the entire data is cached.
- Now, we call an action, so that caching takes place.
- Note that less data is cached because of filter operations.
- In memory table scan indicates that the cache gets hit.
- Note the analyzed plan for the above query (select, filter). Here an optimization of Predicate Push Down is performed, whereby filters are moved ahead for processing as less records as possible.
- Now, we modify the query by changing the order of select and filter transformations.
- In this query, cache does not get hit and data is fetched from the disk, indicated by 'scan csv'.
- The reason for different behaviors by both these queries is different analyzed plans. The analyzed plan for this query (filter, select) is this.
Cache use case 2
- For the same orders dataset, we cache the dataframe with the following transformations.
- For the following two actions, let us see if we hit the cache or not.
- As we can see, we did not hit the cache (for both the actions), as indicated by scan csv and the number of output rows.
- Consider the following action where only the second column is selected.
- Here also, the data is getting scanned from the disk.
- In order to hit the cache, it is recommended to store the cached results in some other dataframe.
- Now, if we perform the count action again, we will hit the cache, indicated by in memory table scan.
Cache use case 3
- Consider the following example where we apply a transformation distinct(), followed by an action count() on the dataframe.
- Each executor will perform local distinct results, which will be shuffled to another machine.
- Node_local indicates that data gets scanned from the local disk, not from the cache.
- Whenever we invoke a wide transformation, 200 partitions get created by default.
- Now, we cache the results.
- The transformed results get stored in cache, which can be used by subsequent actions.
- The locality level of process_local also reaffirms the fact that cache was hit in order to perform the above operations.
Caching of Spark tables
- spark.write is used to write the results of a dataframe into a table on the disk.
- The data gets stored as follows.
- Note that this is a managed table.
- We invoke the count(*) for checking the number of records. The data gets scanned from the disk.
- In Spark SQL, caching is not lazy, unlike in RDDs and dataframes.
- Now, if we call the query again, the data will be scanned from the cached in memory table.
- Consider that we execute the distinct SQL query on the cached data as follows.
- The in memory table will be scanned, indicating a cache hit.
- On similar lines, we count the number of distinct order_status.
- Since it involves a wide transformation, 200 tasks get created.
- For clearing the cache for only a specific table, we use uncache.
- For emulating the lazy behavior of cache, we can add the lazy keyword explicitly.
- Now, the table gets cached only when a query gets executed upon it.
- Interestingly, when data gets inserted into the spark table, it automatically keeps track of the changes and refreshes the invalidated cache for subsequent actions.
- However, the cache gets invalidated when the data is changed at the backend.
Caching performance
- Parquet format is the most compatible with Spark and it is the default format as well.
- If we don't specify the format explicitly, the data gets stored in parquet format as follows.
- Consider that we perform count(*) query on the un-cached table stored in parquet format.
- Note that only 17.5KB of data is provided as input, indicating good performance by cache.
- Now, let's consider that the same table is cached and the same query is performed on the cached table.
- Note that now the input data is as much as 269.4MB, indicating a bad performance by cache.
- Hence, in case of parquet file format, caching acts as a bottleneck and should be avoided.
Persist
Persist is equivalent to cache, but with an additional flexibility of changing storage levels with the following optional parameters.
Disk: whether the data has to be persisted in Disk or not.
Memory: whether the data has to be persisted in Memory or not.
Off heap: whether the data has to be persisted Off heap or not.
Deserialized: whether the data is deserialized or not.
Number of cache replicas.
- The corresponding results can be seen in the Storage section.
- The whole data gets partitioned upon the calling of an action (count) because persist is also lazy.
Conclusion
In this week's blog, we delved into the intricacies of caching in Apache Spark. We saw how caching improves performance by storing frequently used data in memory, thus reducing the need for disk access. We demonstrated how to cache DataFrames and Spark SQL tables, handled different storage formats, and optimized data transformations.
That's it for Week 4! Keep experimenting, exploring, and pushing the boundaries of what's possible with Spark!