Parallel execution in Spark
On reading the title, I am sure the first reaction will be 'What am I talking about'. As we all know, Spark is a framework meant for parallel / distributed data processing. That is the very premise on which it is built and makes the task of parallel data processing relatively easy. Why relatively? When push comes to shove, many factors and settings have to be considered to have the application execute faster than the speed at which it is executing.
The story this time is about parallel processing / execution and on Spark (on Databricks to be precise), deals with data but does not deal with data processing as such. It deals with data ingestion.
A couple of months ago, we were re-writing the ingestion portion of our application. Earlier, this portion was implemented using MuleSoft. After one year of execution, the customer decided to stop using MuleSoft due to an enterprise wide decision. We, the Databricks team, was given the task of implementing the functionality of MuleSoft using Databricks. What was the functionality? We were required to connect to a REST API endpoint and fetch data for the last six hours (the job ran four times a day). From the data fetched, we were required to extract ID information and then fetch the details for each ID, using another REST API call.
After we implemented the said functionality using Databricks our runtime increased by 15 minutes. This was infuriating because I had spent quite a lot of effort to reduce the total run time from four hours per batch to two hours per batch. After this implementation, the run time went to two hours 15 min.
The reason for the time increase was clear - sequential execution. After downloading the initial data set, we were iterating over the data set and extracting the required ID information. Then we were fetching additional information for each ID using another REST API call. This activity of fetching information for each ID was performed sequentially. Hence the time taken was linearly proportional to the number of IDs for which information had to be fetched.
To reduce the time taken for execution, the only option was to make the tasks download in parallel. Using threads for this purpose was the most logical way to go ahead. But I decided not to implement parallel processing using threads. A few years ago, I had implemented parallel processing using threads in a Spark application, but I decided not to use threads once again.
Why?
For the purpose of simplicity, clarity and scalability. Surprised? If not threads, what did I use? I implemented parallel processing using multiple tasks in the same Databricks job. To make this work, I stored the IDs to be downloaded in a table and stored a batch number for each ID. To begin with, I decided to use five batches. Then I updated the Databricks job and added five tasks - one task for one batch.
If I had implemented parallel processing using threads, the threads would have executed on the driver - single VM. This puts a limit on scalability. To scale, we would need to increase the number of cores in the VM. But in a Spark cluster, it is not possible to increase the number of cores for a single node. We have to make the change for all the nodes in the cluster - which can be costly. I also did not implement parallel processing using Python, keeping in mind re-startability and transparency.
Let us assume that I had used threads and that one of the threads failed during ingestion. On failure, the support team goes through the application logs and performs a repair run of the job. Without extensive logs, it would have been difficult for the support team understand the reason for failure and also predict the behaviour of the application on repair run. When using multiple threads, we typically divide the list of IDs to be downloaded into a pre-determined batch size. Then we would create those many threads and give one batch to one thread. Unless we keep track of which batch and hence which IDs failed during download, we would end up download all the IDs all over again. Even when we keep track of the IDs downloaded, on repair run, the remaining IDs could once again be split across multiple threads and sent for download.
The most important consideration for me for visibility. By using parallel tasks, parallel execution was obvious. No one needed to guess what was happening under the hood. For the multiple-task implementation of Databricks, each task can execute to completion or fail independent of the other tasks in the batch. As each task has its own set of IDs, when a repair run it performed, other tasks will not try to download the IDs that do not belong to their batch. In fact, when a task completes its batch, it is marked complete and Databricks does not even try to execute that task if a repair run is performed on the job. The drawback of this approach is that we have to identify parallel execution upfront as we have to add tasks to Databricks.
By using multiple tasks, I was able to implement parallel processing in a transparent manner. I was also able to reduce the time taken from 15 min to three minutes.
#databricks #spark #ingestion #parallel_processing #thread
Program Delivery Manager | Senior Solution Architect | Data and Analytics | Capgemini
2 天前Indeed, always a best practice to use out of the box features for data processing, whenever available. Most of the data processing tools are optimized for parallel processing.