Lithium: Dynamic, Self Hosted, and Distributed Ephemeral Streaming Pipelines

Lithium: Dynamic, Self Hosted, and Distributed Ephemeral Streaming Pipelines

There are numerous use cases that require moving large amounts of data between different systems and validating and transforming them in-flight. Frameworks such as Apache Flink can be excellent choices for moving and transforming data at scale - effectively through streaming ETL. However, certain use cases within Atlassian such as migrations, scheduled backups etc present unique challenges because they require stream processing pipelines to be entirely provisioned at runtime, including dedicated Kafka topics, parallelism, selection of stream processors with appropriate and available compute resources. Stream processors must be embedded directly in product and platform services to enable in-process access to service context in order to meet functional requirements along with throughput goals. In addition, our pipelines require coordination amongst sources, transforms, and sinks that exist in different product and infrastructure services. This led us to build the Lithium Platform. It is 100% event-driven and is built on Kafka and Kafka Streams.

Lithium Streaming Data Pipelines ETL (extract, transform, load)++

At the heart of it, Lithium is a streaming ETL pipeline. We’ve all either seen or have worked with traditional ETL systems before. However, Lithium is more than just a traditional ETL pipeline and the ++ part is what we are going to discuss a lot in this article. Some of the salient features of the platform are:

  • Dynamic and ephemeral pipelines
  • Sidelining and In Progress remediation capabilities
  • Tenant level isolation
  • Proximity to data and domain logic

Workplans: Dynamic and Ephemeral streaming pipeline

The data pipelines in Lithium is called as a Workplan and it contains all the information about the ETL components (and other information we will see later) required to run the pipeline. A workplan is dynamically provisioned to serve a data movement request and is torn apart as soon as the movement finishes (ephemeral pipeline)


Anatomy of a workplan
Anatomy of a workplan

Everything in yellow in the above diagram is dynamically provisioned and ephemeral. Source, Transform, Sink and the kafka topics between them. Workplans have majorly three types of processor (with an optional fourth type we will discuss later),

  • Source (a.k.a Extract) - The component responsible for sourcing the data from the source system and ingesting it to the pipeline
  • Transform - Transform components are basically transform and/or validation and is responsible for transforming and validating the data for the destination system
  • Sink (a.k.a Load) - The component responsible for sinking the data into the destination system

But how are these workplan processors hosted/deployed? Let’s see in the next section.

BYOH (Bring your own Host)

Source, Transform and Sink running on some service instance

The Lithium platform has used a Bring your own Host model. The lithium processors live inside of services.? One may ask what service?,? Who owns those services? etc.?The services are owned by the consumers (e.g. Jira, Confluence etc) of Lithium, a.k.a the use cases which want to leverage Lithium for data movement.? They just need to integrate the lithium SDK and write their own custom processors to be provisioned on demand.?The Lithium SDK abstract out all the kafka related processing such as discovering kafka clusters, creating kafka topics, publishing and consuming from the kafka topics, Authentication etc and provides very simple APIs to the consumers to interact with the platform.

Consumers of the platform just focus on writing their processing logic and not worry about how the underlying platform works. This gives them the ability to host their processors closer to the data and the domain logic, one of the key advantages of Bring your own Host model.

Lithium data streaming pipeline heterogenous distribution

Source, Transform and Sink running on separate services

Each processor of a Lithium workplan can live in a different kind of service, and potentially in different instances of a service.? Each processor type lives in an appropriate service instance based on the type of processor, business logic and data locality.? This results in a workplan where different processors are hosted and owned by different services.

Data Pipeline Isolation


Since all Lithium workplans are dynamic and ephemeral, data pipelines operate in Isolation of each other.? Although they may share local service resources, they DO NOT share processor instances of kafka topics.? Each data pipeline has its own set of kafka topics, processor types and runtime parameters.?This isolation is also the enabler for core capabilities such as pause, resume, rewind, and remediation. The pipelines and be paused and resumed at will without impacting other workplans. This also allows us to guarantee that the data of each tenant is isolated in its own set of kafka topics and they are never mixed.

So, we learnt about the workplans briefly but how to define a workplan and submit it for execution. Let’s discuss next.

Workplan Spec

Sample workplan specification

Every workplan requires a workplan specification which defines all the parameters for it’s execution. A specification contains the details about source, sink and data processors (or transformation). It contains other parameters such as parallelism, retry attempts etc.

Each of the processor section contains the processor definition as well as the number of worker required to run this workplan. The parallelism defined in the workplan spec is used to decide the number of partitions of the workplan’s kafka topics. This, and the number of workers defined in the spec, allow a workplan to process the data in parallel.

Lithium Control Plane and Data Plane

Lithium platform can be divided into two logical boundaries, a Control Plane and a data plane. A Control plane can be considered as the nervous system of the platform and is responsible for handling several responsibilities for the platform (We will cover this in the next section). On the other hand, a data plane is a logical boundary which contains all the service instances integration with Lithium control plane using Lithium SDK. These service instances are called as Resource Providers because they are responsible for providing resources for a workplan. Also, data plane is responsible for the actual data movement within a pipeline using the provisioned kafka topics and different types of processors hosted by the data plane resource providers

Lithium Control Plane and Data Plane

As can be seen in the above diagram, Control Plane and Data Plane communicate with each other using a set of kafka topics which are:

  • control-plane-event-topic - This is used as a broadcast channel by the Control Plane to announce important updates such as workplan creation request, provisioning request, termination request etc. Each message in this topic is received by every resource provider instance in the data plane.
  • data-plane-event-topic - This is used by the data plane components (Resource Providers) to send events to Control Plane

It is worth noting that since Lithium SDK abstracts all the Kafka processing from it’s consumers, they are not at all required to write any implementation related to the control plane and data plane event handling. It all comes bundled with the SDK.

All the instructions to the control plane by the clients of Lithium is sent through its input kafka topic and control plane sends updates to the external world using the control plane output topic.

Control Plane


Let’s dive deeper into the Control plane and understand three of it’s major responsibilities, which are:

  • Workplan management: Control plane tracks the statuses of each of the workplans running on the platform or are queued for execution and also managing the transition of state and statuses for each of these workplans. We do that using the kafka streams aggregator. It’s a reliable method of getting the data, which is going to come out of a change log topic in kafka, manipulating certain pieces of that data based on messages that have come in through the data plane event topic, and handing it back to kafka through a compacted workplan topic. This compacted topic is the source of truth for workplan state and statuses, which is completely managed using kafka.
  • Resource provider management: The other responsibility of the control plane is to manage all the registered resource providers. As shown in the above diagram, resource provider management looks exactly the same as workplan management. You can also notice that we materialize this information into a Postgres database which is purely designed for handling the complex queries on the workplan and resource providers. But, the source of truth for both workplans and resource providers is in kafka topics. We don’t rely on the information in the materialized store, its just for query purposes.
  • Topic Manager: This part manages the creation and deletion of kafka topics used by workplans, along with the roles required to connect to these topics. We will discuss later in this article about when these topics are created for a workplan


Anatomy of a Data plane resource provider

Let’s dig a little deeper into whats inside of a resource provider. As discussed previously, a resource provider is in communication with the control plane. It’s receivig all of the events from the control plane, and it’s emitting events back to the control plane through the data plane event topic. It’s also responsible for provisioning in process instance of the processors for workplans, if the host hosting the resource provider is participating in the workplan.

The resource provider and the processors follow an actor model enabling them to use asynchronous messages internally and thus mirroring the way we do things externally using kafka. The processors do not talk to the control plane at all. They only talk to the resource provider, and they talk to the topics thatv were provisioned for a particular workplan.

Workplan Auctioning and Provisioning

Okay, so we discussed about various platform components and workplan specification, but how do we start with the definition of a workplan and how do we decide which resource providers should participate in the workplan. For some of our use cases, some of our client products have hundreds, if not thousands of nodes participating. So which one gets to actually do the provisioning? Who has hot capacity to do all this? The model used in Lithium platform to manage this is auctioning. As the name suggests, it is exactly the way an auction works where multiple participants bid and then the winner is chosen.

The way this works is that control plane is not responsible for deciding that a particular service host should provision or participate. That knowledge is only known by service hosts. Does it have capacity? Does it know about the tenant ID that we are moving data for? Does it handle the data for that tenant? Does it know about the processors that are described in the workplan. All of that information is known locally in the data plane hosts. So, whenever a workplan request is submitted on the control plane event topic , the resource provider evaluates the workplan and sees that the workplan is in auctioning phase and evaluates it can participate in the workplan, it sends a positive bid back to the control plane through the data plane event topic. Let’s walkthrough an example as shown in the next diagram.


Workplan in auctioning phase

In the above example, a workplan is created using a parallelism of 4. It requires a source of type sourceB, transform of type transE, and a sink of type sinkY with their configured min and max workers. Please note that there is no inherent benefit of defining max worker count which is more than the configured parallelism. On the righ side of the diagram, we can see multiple resource provider instances of each processor type, e.g. few hosts hosting sourceA, few hosting sourceB etc. When control plane puts the workplan 001-002 into the control plane event topic in the auctioning status, all the resource providers get this event. But, only the resource providers which can participate in this workplan (marked in red) send positive bid back to the control plane. After the auction window closes, control plane aggregates all the positive bid events for this workplan and adds it the list of bidders section of the workplan. At the moment, these are just indication and not the commitment from the resource providers.

Please note that a lot of resources can bid for a workplan whereas only few may be required to run the workplan. e.g. Control plane received two source bids but only need one resource to run this workplan. Control plane then reconciles the bids and choses the winning bidders depending on their individual capacities. After getting sufficient winning bidders, control plane then add these processors in the resources section of the workplan. Please note that the provisioned flag is set to false as the resources are not provisioned yet.

But, before we send the workplan for provisioning anything, we need topics. So, control plane then moves the workplan into topic provisioning status where the workplan topics are created.

Workplan in topic provisioning phase

control plane’s Topic Management module then communicates with the kafka cluster to create workplan specific topics as shown in the above diagram and adds it to the topics section of the workplan. At this point now, any processor will be free to subscribe to to publish to these topics as they exist now. And that’s what happens next, i.e. in the resource provisioning phase

Workplan in resource provisioning phase

We can see that in reach of the service hosts that won the bidding process, they’ve now provisioned an instance of the appropriate stream processor and the provisioned flag for these resources are set to true. Now since we have acquired all the resources required for this workplan we can actually move on and start running this workplan. Please note that if any of the above phase fails to acquire sufficient number of resources, the workplan will be retried again after some delay, as defined in the deploy section of the workplan spec.

Here is a simplified view of this workplan after going through the provisioning process

A simplified view of the workplan

On the left side, we have one instance of source because we asked for one min and max source worker. This source extracts data from some data source that it understands and feeds the data as messages into the input kafka topic. In the second step, we see that we had asked for two transformers and we were able to get them. This splits the workload between these two processors by assigning equal number of topic partitions to them. One thing worth mentioning is that transformers and validators are built using Kafka streams topologies. One of the really important things about living in the middle here, as we are moving data from one side to the other, is that the plartform should do it’s best to make sure that it does not produce duplicates in the middle. And so we use kafka streams and its transactional guarantees and exactly once semantics.

The messages then go to the output topic of the workplan and is received by 4 hosts which have provisioned the sink processors for this workplan. This is because we had requested for four sink workers to equally distribute the work across them. The sinks then write the data into the destination data storage.

In the above example, we were able to acquire the max number of workers for each type, but its worth discussing what happens if that’s not the case. What happens if we could partially obtain the resources, or what happens if some of these processor nodes (resource provider hosts) come and go and thus leaving workplans with less than the optimal number of workers.

In the above example, we had requested for two transformers and the workplan was able to acquire them. This workplan is called a Fully Provisioned workplan. Assume that one of the hosts crashes and thus leaving the workplan with only one transformer. Since we had defined one min transformer worker for this workplan, it can still run with a Nominally Provisioned status. In this case, kafka partition assignment will kick in and all the partitions of the input topic will we assigned to one available host. Control plane detects this through the processor heartbeats sent by these resource providers and appropriately setting the status of the workplan. If the remaining one transformer also go down, this workplan is now considered as unviable and control plane stops the workplan and sets the status to Under Provisioned. This just instructs all the processors to pause themselves and wait for the resources to become available again. This is where the workplan enters into something called a secondary auction and tries to acquire new resource to revive the workplan. This auction works exactly in the same way as the initial auction and the resource providers bid if they have capacities to take up the work.

A closer look at the Source Processor

We discussed earlier that the source puts the messages immediately into the input topic of the workplan, but there’s another detail that we will discuss now. The source processor collects the data from it’s data source but sends them to what we call an ingress topic. A kafka stream topology called an usher picks up the data from the ingress topic and ushers it into the input topic. But, why is this extra proocessing required, you might ask. It seems like an unnecesary extra step. This extra step enables us to add one of the core features of Lithium workplans, the ability to rewind. In a lot of cases, data extraction from the data source is expensive and we would want to avoid that if we just want to start over and rerun the workplan. In case of Lithium, the extracted data remains in the ingress topic and when the rewind is requested, the usher stream reads messages from the beginning of the ingress topic and redelivers them to the input topic with a different stream generation number. All the existing data in the pipeline with the previous stream generation number is them filtered out by each of the processor.

The Transform Processor Chain

One Transformer running on two instances of the same service

We discussed earlier that a workplan can have a transformer and/or validator, but what this actually is a chain of transformers, also called as a processor chain. Each element in this processor chain is a fine grained transform and validate functions and they can be specified in the order that they are meant to pass data down the line. But, all of these processors live in one host and are actually implemented as a single Kafka Steam topology and the wiring of all these processors are done under the cover by the Lithium SDK. Outside the Lithium platform, it is basically messages in and a messages out, but inside there are multiple functions chained together.

Multiple transformers running on different services

There might be use cases where different transformation functions need to live inside different services, because of their proximity to the required context or business domain logic. Lithium workplan supports the notion of multiple transformation chains which can live on multiple services. Lithium control plane chains them together by creating all the intermediate kafka topics

Multiple Sink Processors

Lithium supports the definition of multiple sink processors in the workplan specification. This means that the data generated on the output topic can be utilized by various services employing different types of sinks. In the provided example, the workplan specification outlines two types of sinks along with their respective number of worker configurations. Within the pipeline, two types of services are involved in this workplan, each providing the specified sinks. sinkA operates on two hosts, as the maximum worker configuration is set to 2, while sinkB runs on a single host for the same reason. Both sets of sinks receive the same data and have the option to process it if deemed relevant or disregard it.

Internally, multiple sinks are managed through distinct consumer groups assigned to each sink type. In the aforementioned scenario, all data from the output topic is distributed among the two sink hosts of type sinkA due to their membership in the same consumer group. Similarly, all data is directed to the singular instance of sinkB since there is only one consumer within this consumer group.

Workplan State Processor

Up until now, the data pipeline processors have been entirely stateless. However, many use cases benefit from the ability to render state information. This is achieved by an additional data plane processor called a state processor, making it the only stateful processor in the workplan. These state processors are provisioned in the same manner as any other type of processors. Each of the other processor types can publish custom state messages to the data pipeline’s state event topic, which is once again a provisioned topic for the workplan. The state processor’s custom state functions process those messages and maintain an aggregated state. This is accomplished using Kafka Streams aggregators, providing reliable data pipeline state processing. The custom state functions are defined by the state processor owners and declared in the workplan spec, which outlines the aggregation logic for the Kafka stream. It is important to note that only one worker is required for any workplan’s data pipeline. Therefore, the state event topic is a single partition topic. The state processor utilizes a punctuator to periodically send the entire rendered state to the lithium control plane, where workplan management utilizes it to update the workplan's custom state. Subsequently, this action triggers the workplan event to be broadcast to all data plane resource providers.

There are potentially various use cases for a custom state processor, one of which involves progress tracking and marking a workplan as Done. Lithium is unable to detect when a workplan is completed because it is entirely controlled by the consumers; for example, it is unknown if the source has extracted all necessary information. Therefore, the state processors can be utilized to track the progress, such as monitoring how many entities have been extracted versus how many have been loaded into the definition, determining the percentage of entities that have been processed, or marking the workplan as done when all entities have been extracted (a source can produce a state) and when the sink has loaded all data (a sink can emit a state).

Sidelining and Remediation

Data sidelining refers to a process where data that fails validation in the pipeline is diverted to a specific "sideline topic" within a workplan. Valid data continues downstream while failed data is held at a gate within the sideline topic. Initially, the gate is closed, keeping the data in the sideline topic. This gate is created using a Kafka stream topology, and pausing the topology effectively closes the gate. If the number of failed entities exceeds a set limit on the workplan specification, the control plane halts the workplan. In this scenario, all processors pause, awaiting the control plane to resume the workplan.

The question arises: when does the control plane restart the workplan? The key lies in one of Lithium's core features, known as "In Progress Remediation." Platform users can examine failed data and rectify issues by introducing new transformers to the active workplan (hence the term "in progress remediation"). This action leads to the addition of new transformers to the existing processing chain, triggering a workplan update. Consequently, the update opens the feedback gate, allowing both sidelined entities and new data to undergo processing using the updated transformations.

Bring you own topic (External Topics)

Lithium workplans integrating with external topics

There may be use cases where the consumers of workplans need to send some data beyond the workplan boundaries to kafka topics which doesn’t have the same lifespan as of the workplan’s topic. The data might be needed to be sent to some long lived topic for further processing by some downstream system. One such example can be custom error reporting in which the consumers would want to generate some error report based on errors produced in the pipeline. Lithium provides the concept of external topic integration which allows consumers to inject their topics in the workplan processors and Lithium SDK provides very simple APIs to publish to these topics. These topics aren’t owned by the Lithium platform and are managed by the services requiring the integration.

Summary

At the core of it, Lithium is a streaming ETL platform, but what make lithium truly unique are the niche set of features. It’s dynamic and ephemeral nature gives the consumers the flexibility to create the ETL pipelines only when they are required. The hosted dataplane gives the ability to the consumers to write the data processing logic closer to their data and business domain logic, and thus achieving maximum throughput. The very important sidelining and remediation opens up powerful capabilities for monitoring data validation failures and fixing them in flight.

It has been a truly rewarding journey to work on this platform from its inception to having it running in production, working with some of the greatest engineers I have ever met.


Simarpreet Singh

Principal Engineer at Atlassian

4 个月

Well written!

回复
Archana Kumari

Director of Software Engineering at Salesforce

6 个月

Very insightful and nicely written Niraj Mishra ??

Harshit Kapoor

Microsoft | IIT Roorkee

6 个月

Wonderful Niraj Mishra. Big opportunity. Great learnings. Higher Impact.

Suvrajit Deb

Data Engineer @EPAM | 3x Azure & 1x AWS Certified | Python | Spark | SQL | DataBricks | Azure | AWS | DevOps | Kafka | Airflow | Git

6 个月

cfbr

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了