Improve Streamsets pipelines performance with loose coupling architecture
'Our Data Collector engines crash with Out Of Memory error', said a customer in a regular cadence call with me, 'and even when they don’t crash, the data load performance is low'.
The customer had rather powerful VMs hosting their Streamsets Data Collectors (SDC). Moreover, they had several SDCs — a lot of processing power. Then, why? I asked the customer to share their pipeline, and this is how it looked:
The customer used Streamsets to extract some normalized data from a REST API. In the data model of the source system, each Document (imagine a grocery receipt) had Document Items (all the things you paid for). Some Documents had less than 10 Items, others had more than 100. The pipeline made the first REST call to get the list of Documents, then, for each Document, it made another call to get the list of Items. So, what went wrong?
Let’s start with performance. As I mentioned, a REST call was required to get Items of each Document. Besides HTTP protocol not being the fastest of them all, the bigger problem was that all those Item-loading calls have been performed in a sequence, all hundreds or thousands of them, one by one. If only we could add some degree of parallelism here..
But what about OOM errors and memory consumption? This may become clearer if we look closer at how an SDC pipeline works.
Say, you need to move a heap of sand, large enough that you can’t lift it all at once. You get a wheelbarrow, and move sand by batches — SDC does the same. It assumes the source data is potentially infinite, and moves data in batches: the origin (head) of a pipeline fills the "wheelbarrow", and the rest of the pipeline moves it. The batch size is configured in the origin, because it’s the origin that fills the "wheelbarrow”. That size depends on available memory: if a batch is too large, the SDC may crash with OOM error.
“But the configured batch size was not too big”, said the customer. OK, lets say, the HTTP Client origin of the pipeline read 1000 records. If each Items-loading call returns e.g. 30 Items, the batch size grows suddenly to 30000 records, those records being wide.. and here is that crash sound.
But what if calls for Documents and for Items are executed by different pipelines? Here’s an example:?
There are a couple things I don’t like about this architecture, which I will cover later. But it demonstrates the idea: breaking the tight link between two loads makes it possible to parallelise of data loading processes.
领英推荐
Let’s explore how this architecture can be implemented, starting with the second pipeline. It’s practically a carve-out from the original one, with added parameter for Document:
A pipeline is a design unit. To make it executable, a Job Template with that pipeline needs to be created.
The first pipeline, after it extracts Documents list, will use orchestration to run ephemeral instances of the second one, using Start Jobs processor:
Start Jobs processor doesn’t start a job directly on a Data Collector – it submits a job instance to the Streamsets Control Hub. Control Hub then picks the least busy SDC to execute the job. This way, not only several instances of a pipeline can run on the same SDC, but multiple SDCs can be utilised (if present), further distributing the data load. And if all SDCs are too busy to pick a new job, that job will stay in the queue on the Control Hub. Streamsets Platform has a “safety belt” restriction on the total number of jobs, therefore the orchestration pipeline should limit the number of job instances it submits: since one record in this pipeline corresponds to one submitted job, Max Batch Size setting of the origin in this pipeline limits the number of parallel running jobs.
The advantages of this architecture are:
There are disadvantages, as mentioned:
In the next blog issue I will address these and outline an improved loosely coupled design.
all things data integration
10 个月Part 2 is published here: https://www.dhirubhai.net/pulse/improve-streamsets-pipelines-performance-loose-coupling-roman-bukarev-3tskc