Improve Streamsets pipelines performance with loose coupling architecture
image by Mariya Pestova, Vecteezy.com

Improve Streamsets pipelines performance with loose coupling architecture

'Our Data Collector engines crash with Out Of Memory error', said a customer in a regular cadence call with me, 'and even when they don’t crash, the data load performance is low'.

The customer had rather powerful VMs hosting their Streamsets Data Collectors (SDC). Moreover, they had several SDCs — a lot of processing power. Then, why? I asked the customer to share their pipeline, and this is how it looked:

representative sample of the customer's pipeline

The customer used Streamsets to extract some normalized data from a REST API. In the data model of the source system, each Document (imagine a grocery receipt) had Document Items (all the things you paid for). Some Documents had less than 10 Items, others had more than 100. The pipeline made the first REST call to get the list of Documents, then, for each Document, it made another call to get the list of Items. So, what went wrong?

Let’s start with performance. As I mentioned, a REST call was required to get Items of each Document. Besides HTTP protocol not being the fastest of them all, the bigger problem was that all those Item-loading calls have been performed in a sequence, all hundreds or thousands of them, one by one. If only we could add some degree of parallelism here..

But what about OOM errors and memory consumption? This may become clearer if we look closer at how an SDC pipeline works.

Say, you need to move a heap of sand, large enough that you can’t lift it all at once. You get a wheelbarrow, and move sand by batches — SDC does the same. It assumes the source data is potentially infinite, and moves data in batches: the origin (head) of a pipeline fills the "wheelbarrow", and the rest of the pipeline moves it. The batch size is configured in the origin, because it’s the origin that fills the "wheelbarrow”. That size depends on available memory: if a batch is too large, the SDC may crash with OOM error.

“But the configured batch size was not too big”, said the customer. OK, lets say, the HTTP Client origin of the pipeline read 1000 records. If each Items-loading call returns e.g. 30 Items, the batch size grows suddenly to 30000 records, those records being wide.. and here is that crash sound.

But what if calls for Documents and for Items are executed by different pipelines? Here’s an example:?

  • the first pipeline extracts the list of Documents from the source system, then runs multiple parallel instances of the second pipeline, each instance processing one Document;
  • the second pipeline receives a single Document and extracts its Items from the source system.

There are a couple things I don’t like about this architecture, which I will cover later. But it demonstrates the idea: breaking the tight link between two loads makes it possible to parallelise of data loading processes.

Let’s explore how this architecture can be implemented, starting with the second pipeline. It’s practically a carve-out from the original one, with added parameter for Document:

A pipeline is a design unit. To make it executable, a Job Template with that pipeline needs to be created.

The first pipeline, after it extracts Documents list, will use orchestration to run ephemeral instances of the second one, using Start Jobs processor:

Start Jobs processor doesn’t start a job directly on a Data Collector – it submits a job instance to the Streamsets Control Hub. Control Hub then picks the least busy SDC to execute the job. This way, not only several instances of a pipeline can run on the same SDC, but multiple SDCs can be utilised (if present), further distributing the data load. And if all SDCs are too busy to pick a new job, that job will stay in the queue on the Control Hub. Streamsets Platform has a “safety belt” restriction on the total number of jobs, therefore the orchestration pipeline should limit the number of job instances it submits: since one record in this pipeline corresponds to one submitted job, Max Batch Size setting of the origin in this pipeline limits the number of parallel running jobs.

The advantages of this architecture are:

  • Memory impact of each pipeline is minimal, no OOM errors,
  • Due to parallel runs of ephemeral jobs, data loading performance is higher,
  • Data load can be spread across multiple SDCs, achieving horizontal scalability.

There are disadvantages, as mentioned:

  • Starting a Job in Control Hub requires some administrative overhead. The 1:1 ratio of Jobs vs Documents may accrue too much of that admin overhead time;
  • There is not enough resilience in this architecture: if an ephemeral job fails, how do we reliably restart it?
  • Job parameters are a part of the logged information stored on the Control Hub. If Document IDs are used as parameter values, this means some of the customer data is stored on the Control Hub, ie outside of the customer’s network. While Control Hub is quite secure, this still may violate some customers’ data sovereignty policies.

In the next blog issue I will address these and outline an improved loosely coupled design.

要查看或添加评论,请登录

Roman Bukarev的更多文章

社区洞察

其他会员也浏览了