Data Stream Processing: Kafka vs Flink Streams.
Stream processing and related pipelines are crucial tasks in modern industry domains and disciplines such as Telecom, NLP, Online Analytics, IoT, and others. The stream data is different due to the data source, throughput, unit size, ordering criteria, and key and value natures.
Additionally, the processing models have diverse characteristics and parameters, which are essential for selecting the appropriate processing technology and framework. A set of factors impacts technology selection in a concrete case. Among them are the size of intermediate and state data, subject of data aggregation and map-reduce, size of aggregation storage, and size of time window are among the factors. In addition, an apparent engineering decision considers, also the technological stack, which is in use in the current architecture, CI/CD stack, and the expected growth of processing complexity, volume, and throughput of data.
Amid competitive Stream Processing technologies, Kafka Streams, Apache Flint Streams, and Spark are spreadly used.
Flink is a big data processing-oriented framework with batch and stream processing layers.
We will concentrate on the Flink Streams, as mostly similar to Kafka (from the point of view of DSL API)
Kafka Streams and Flink Streams are the proper stream processing engines, that are spreadly used, have very similar interfaces, and are attractable to be applied both.
Kafka Streams and Flink.
What is common in the architecture and engineering of the frameworks?
What are the principal differences between Streams and the choreographic model of event-driven distributed computing in microservices?
Architectural and Engineering aspects of Apache Kafka Streams and Flink Streams.
The fact that the client APIs of Flink and Kafka are very similar is not surprising - both of them have been developed by the Apache Corporation and a part ideology of Kafka came to Apache Flink, especially under the affection of Jay Krepps, who proposed their common architecture of processing - the Kappa architecture.
Kafka Streams and Flink Streams have similarities in DSL API - this is natural: both of them are from the same producing corporation
As mentioned above their common architectural pattern, Kappa, is finely dedicated to Real-Time data processing, and has distinct strengths, as a stateful processing, running in dedicated threads with cross-cluster replication. (Stateful means that a pipeline states are placed locally, not in external databases).
What is different in the architecture, and engineering of Kafka Streams and Flink Streams?
Looking ahead, I would say: ALMOST IN ALL. From engineering to CI/CD aspects.
Kafka Streams Architecture and Engineering.
Kafka Streams is a framework supplied as a library, that leverages standard Kafka API on top of Kafka Client. The framework, therefore, runs in the process of a service client service API. Events, their data, and event logs are backed as topics on a cluster of Kafka brokers. Standardly, there is no special staff for Kafka Streams on a Kafka Cluster.
Internal architecture and engineering.
The processing mechanism provides durability by means of creating stateful processing pipelines and logging them using the event source patterns.
Kafka Stream exposes Low Level and DSL API which are transformed into a chain of specialized processors, which execute the code, that the developer provides. Kafka Stream builds a reliable, durable, and resilient processing pipeline as the?processors'?topology, called inside tasks.
Kafka Streams also relies on the?Kafka Consumer?of the Kafka API, which performs Key-Value message/event data polling from a topic's partitions as a part of Stream. The Stream is executed as a composite consumer that handles the incoming event data by applying the sequence of processors of the pipeline. The topology of the processors is the execution model, and Kafka Consumer is the subject of one of the processors in the topology, which is indispensable in any topology and named "source processor". This processor supplies data to downstream of the next processor.
Normally a stream processing topology has a "sink processor", which has no upstream, and just pushes the processing results to the next topic.
Internal engineering.
So, on the bottom level, Kafka Stream creates a topology of processors in the business logic of a stream, those are realized as the pipeline of operations (as aggregation, MR, windowing).
All the processing is executed as a usual Kafka message processing: consumer polls data from a topic and then sequencing processors are called in a thread. This is the subject of stream StreamTasks, which gets the updated topology of processors. All the chain of builds happens in a specific StreamThread, that is bound to a real JVM thread, where tasks are running in a thread.
Because specifics of Kafka Consumer and traits of Kafka, the time correct order of delivery is guaranteed inside a partition only.
At the same time operations as stream joining, grouping, map-reduce, and aggregation require data shaffling, reorganization and keeping in memory for a time period requires states, which lept in local memory and are used by processing units.
Detaily more, the creational aspects of Kafka Streams are highlighted here
Stateful and state-less operations in Kafka Streams. Performance factor
Kafka Streams have low-level processing nodes (processors), which can be stateful and stateless.
Simple mapping operations, transforming a key-value pair of a Kafka message has no state of course, and transforms data on the fly.
Operations of processing pipelines as stream join, grouping map-reduce, and aggregation, require data shaffling, reorganization, and computations in a time window, require keeping data and intermediate results.
The states are stored in memory using the internal mechanism of Kafka Streams, which are known as State Stores. Kafka Streams uses the standard tool, which other Apache Frameworks( like Flink) also use locally: the RockDB.
This realization of the Kappa architecture is one of the essential factors of stream performance. Intermediate data of any task are accessible by StreamThread and transmitted to processors without any additional request and synchronization.
Distributed execution and scalability. Kafka Stream tasks can be executed on an unlimited (theoretically, but it depends on a topic configuration actually) number of machines (JVM) and consumers.
Scalability is achieved by running stream processing (StreamThread -> StreamTasks) in parallel.
Because any stream task is sourced from a Kafka Consumer, Kafka Streams parallelism inherits all traits of the internal mechanism of Kafka API and Kafka cluster rules.
Kafka Consumer amount is the factor that defines the parallelism aspect of scalability when the JVM amount related to resources aspect
Any thread of parallel mechanism pumps data from a Kafka Consumer so the number of the Kafka Consumer is the factor that defines number of parallelly executed tasks, rather number of JVMs and virtual machines. The number of JVM and virtual machines leads to increment processor (CPU and memory resources)
Limitations of Kafka Parallelism and Scalability.
In Kafka Streams number of threads, that define parallelism is regulated by the num.stream.threads configuration property. But it is just formal.
Because Kafka Consumer is not a thread-safe object, only one source processor for only one StreamTask and only one StreamThread with only one backing JVM thread can use this consumer. So the number of parallelly running tasks cannot be greater than the number of consumers.
When a Kafka Consumer is registered in a cluster to poll events from a topic, the cluster assigns a set of partitions to it, which will be polled by this consumer. The assignment will be rebalanced when new consumers are registered, or dead.
But anyway, the number of consumers, that are consuming a topic contemporary cannot be more than the number of partitions in the topic.
In terms of Kafka Streams, it means that the number of consumers and running parallel tasks (StreamThread -> StreamTasks) is limited by the number of partitions of the source topic.
Sure, we can run more consumers /tasks in yield threads, but those tasks will be in idle threads till the next rebalancing, due to the number of consumers and /or partitions changed.
So, the number of parallel running tasks in the whole cluster of Kafka Stream services cannot be more than:
Sum( NPartitions[topic]), where NPartitions is the number of partitions in a topic.
Therefore increasing the number of really running parallel tasks is possible only with an incremented number of partitions in the topic.
Otherwise, an increment of threads will work for redundancy: if a consumer dies, partition assignment will be rebalanced and a StreamThread -> StreamTasks that is in idle still will get control.
Durability and State-full Model Factor.
The system's resilience and durability require restarting and continuing tasks running on another machine. When a server machine (or local consumer) fails, the partitions are rebalanced among consumers, and the partition of an interrupted task is assigned to another consumer on another machine and the task must be continued on another machine
As remarked above tasks can be stateless and stateful, depending on operations, that are in the pipeline of a stream.
If all processors of a task are stateless, then the durability is supported by native Kafka. if a consumer dies, then the related partition is re-assigned to another consumer, Kafka reads Kafka logs from the same offset in the same partition and a new instance of the task on another machine makes the same stateless operations.
Stateful stream operations require restoring the state of the task after consumers are rebalanced, otherwise, computations will be lost, if start reading from the same offset of partitions.
Kafka Streams have no specialized replication/snapshot mechanism because they use the infrastructure of the Kafka cluster and because specifics of Kafka messaging: it is not dedicated to transferring large messages.
Kafka Stream's task durability is based on the Event Store pattern, which is inherent to Kafka.
Change Log and standby.
Kafka Streams provides durability by means of the Change Logs mechanism. It logs changes in a task state as events into the Kafka cluster. Kafka Streams create an internal logging topic per stream, and produce events of tasks state changed into the topic. Logs of this topic enable to restore the tasks state history within a reasonable retention time on any machine. Once a task?related to a partition is delegated to another machine upon a cluster rebalance, the log topic consumed on another JVM, where the reassigned partitions will be consumed, the state of the related task will be reconstructed, and the task will be continued from the same point.
Downsides of Change Log. The STANDBY mechanism for recovery time improvement.
The Change Log mechanism of Kafka Stream is a durable, and reliable mechanism, but requires reading data from related topics to restore a task's state. That could take a significant time, when a task's processing pipeline is complex, and cause a significant delay.
The alternative, standby tasks mechanism is provided by Kafka Stream. It does almost the same as the Change Log, but does it in online mode. As a result, it reflects the state of a running task into the state of a related standby task, which is ready to run, but is in the idle state until will be activated after the failure of a consumer, which handled a related partition previously
This feature is switched on by setting the property num.standby.replicas to a non-0 value.
Yes, this feature provides the desired number of replications over the cluster of servers, where Kafka Streams are launched. But this could take too much memory when computations are complex and require keeping copies of states on machines for a long time.
Message size is a downside of Kafka Streams, that is inherited from Kafka.
In addition, Kafka Streams is supported by Kafka Brokers and inherits its constraints and features. Although Kafka is a throughput champ, the optimal message size in Kafka varies from 1 to 10K. Larger messages lead to performance degradation. Kafka loses its advantage as the highest throughput message tool upon 100K and upper of message size. As a result, Kafka Streams should not be used for aggregations and joins, where data of large window intervals are collected and streamed to an output queue. Also, states must not have a large size, because they are kept locally and changes are logged out over internal Kafka topics. Because states are kept locally, the growth of the time window leads to the growth of allocated memory.
Language compatability.
Kafka Stream API provides JVM compatable API.
Strenghts of Kafka Streams.
Downsides of Kafka Streams.
Where Kafka Streams is welcome for use.
Where Kafka Streams is not a good idea.
Remarks.
Here, we considered pure Kafka Stream architecture and the client library. The important aspects of memory management, aggregations in large time windows can be improved by combination with other frameworks and services, and additional engineering of Kafka applications.
领英推荐
I will return to the interesting topic a later bit.
FLINK -terabyte data stream processing cluster.
Apache Flink is a dedicated platform platform, framework and distributed data flow processing engine, where Stream and Batch processing jobs are running under the control of the system.
Remark. Apache Flink can be leveraged as an in-process library, but we consider its industrial usage, that is cluster
Flink can be integrated with all cluster resource managers (as Hadoop YARN, or Kubernetes ).
Flink is a job runner
In a Flink flow-processing engine, a client's job does not run in a client services JVM, it is delivered to the Apache Flink Cluster and launched through Client API.
Flink cluster executes the Client's Data Stream Job, which is developed by the client using Flink Data Stream API. This can be a JVM , Python, or SQL Job.
A Client's Data Stream Job is delivered to a Flink Cluster, loaded, and run the way of either rest of RestClusterClient API for example. Cloud Providers such as AWS Managed Service for Apache Flink have a specific delivery mechanism.
Flink Server Cluster ( JobManager+JobMaster + JobGraph) transforms the client's code into a graph of operations inside the cluster, composes jobs, and then runs and schedules them optimally.
Apache Flink inherits both Kafka and Spark engineering ideas and is initially dedicated to continuous (Stream) and batch (Data Set) processing. Data are ingested from any source and downstream to any destination, such as JDBC, Hadoop, HBase, and Kafka. Flink uses Connectors (adapter patterns) that enable poll data from any sources that have a supporting Connector.
Flink provides multi-lingual API for Stream and patch processing development, including widely used languages such as Java, Python, and Scala.
Apache Flink cluster takes care not only just for building a pipeline of operations but also of resource allocation across machines in the cluster (as managed and not managed memory, threading), job placement for execution.
Developers can concentrate just on the business logic. (This is the initial advantage of any data flow processing engine, which handles a job processing script/component of the client).
Internal Architecture and Engineering of the Apache Flink Stream processing.
As mentioned, a Flink Cluster has the functions of job building, pipeline execution, and resource management
The JobManager component is the main, leading component, which is running on a leading machine. It faces client requests for job creation, and, using a dispatcher it solves tasks, that are indispensable to running, optimizing, and managing Stream processing job
Considering crucial Flink's architecture objects we can separate the 2 main groups:
Job Builders and Schedulers:
Main Resource allocators and task runners.
The 1st group is responsible for the creation and execution of business logic, using memory, threading, JVM processes, and networking staff, which is provided by the second group.
These are the important winning features of the Flink Cluster: distributed resources management, fine-tuning parallelism, JVM instance, networking, control rational CPU and memory distribution, accepts fine-graining hints of developers.
A job code of the client is parsed by the JobGraph and transformed on to the Task instances, which wraps the business code operation and executes it in the chain.
The instances of the main actor of the resource manager group, ResourceManager, provide an allocation of memory, threads, and TCP connections.
The internal flow of the Flink Scheduling is built involving the main actors: JobManager manages all the processes of the job handling, leveraging the internal dispatcher. JobGraph creates an executable graph of operations of the Job, which is transferred to the control of JobMaster. The?ResourceManager?creates?TaskSlot and?updates the?TaskManager?with the slots where tasks are run. The Resource Manager decides which VM of the cluster tasks will be running under the TaskManager's control. Each of the?TaskManagers?is a separate JVM process, where?Tasks?are executed in?TaskSlots.
The task is not something atomic and consists of a subtask - a low-level operations operation graph. Each of the task slots has assigned resources as Memory and Thread Pool, which are shared by several Subtasks. Normally each of the Subtasks is executed in one thread of the pool in the?TaskSlot, where subtasks are called in the chain, sharing the resources.
Parallelism means the number of threads, or how many task instances can run contemporarily. Several Sub-tasks can share the same TaskSlot. that provides sustaining high performance and avoid bottlenecks in pipelines.
Parallelism can be appointed by a client code, using API.
For example
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.sum(1).setParallelism(5);
For more details, visit please this source.
The number of JVMs, which means the number of TaskManagers per job is a configurable factor, however, new versions of Flink handle the selection of the parameter automatically. Amount of TaskManagers is the pure resource management factor because each JVM has its own memory heap, system memory reserve, and additional system threads.
Multiple JVM per job, unlimited flexible parallelism depends only on the developer's will and is not limited by any technology inherent factor.
Stream states.
Flink Stream processing adheres to the Kappa architecture as well. The state-full design for the running tasks is supported by various mechanisms, including RockDB.
States are saved in a keyed Storage, using standard and custom serializers.
Also, "in-memory" does not mean only managed JVM heap, it can include Flink's memory resources, which are allocated as unsafe.
A Cluster can include hundreds of machines, use file memory mapping on volumes, Cloud provider specific volumes (as EBS from AWS), Flink can work with streams states having terabytes of data summary, work with huge windows, and process an unlimited (in practice) amount of tasks.
Durability.
Flink realizes resilience and failover recovery using the full?checkpoint concept?(an example), which performs periodical tasks state saving in durable storage with configurable operation parameters.
Task state saving happens in the background and there is a wide spectrum of persistable storage mechanisms, where states are saved: standard, Flink inherent, and from custom Flink Providers.
Failover recovery. Upon a failure occurring in a TaskManager JVM, the supervising JobMaster tries to restart the failed task manager and its tasks on available resources
The tasks state are loaded by Job and task IDs from the durable store and execution will be continued.
Technical consequences Consequences of Flink Architecture.
Strengths of Flink
Downsides of Flink.
When Flink Streams are welcome to use
Where Flink Streams is not a good idea.
You already have Kafka in your stack. Can Kafka memory management and load persist mechanism be improved for large-size data?
Kafka Stream's internal mechanism is open to involve frameworks, native/mapped memory tools, and blobs/file systems of Cloud Providers. We consider these possibilities as add-ins, which require writing additional libraries and/or involvement of additional API.
Thus, Stream Building API provides a method to add Global State Stores to a Stream by providing a custom StoreBuilder. Also, a stream, that has been transformed into KGroupedStream, should provide the Materializer class with Store Supplier when calling any of overrides of the aggregate method. This is the opportunity to involve various mechanisms of large data (as collections) persistence and load.
There are known methods of working with large data chunks in Kafka, such as using external data storage for real data collection and transmitting metadata in events/messages, which enables retrieval of these data from external storage. The Kafka Stream mechanism provides graceful instruments for such transformations.
Often, Redis, Hazelcast, or other fast-access data storage is used as such external storage for intermediate data keeping.
Suppose, we need to group data by keys and collect data within a time window for further analysis.
The method is defined in the TimeWindowedKStream
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
The collection is one of the simplest operations, which could be in aggregation.
The Aggregator interface represents a Stream developer callback that will encapsulate and take control of performing aggregation operations for a time interval and will return a result of aggregation. The result will have any type, which we need to propagate and use the aggregation result in advance. On this point, we can provide an aggregator that uses any tools to keep data in any external storage and provides an anchor object, which points to the data.
As a simple example, we have a stream transformation
RedisBackedAggregator<K, V> redisBacked = new RedisBackedAggregator<>(redisson, streamId,2*3600*1000);
return stream.groupBy(groupKeyProducer).windowedBy(TimeWindows.of(Duration.ofHours(2)).advanceBy(Duration.ofMinutes(90))).aggregate(redisBacked::init, aggregator);
Where the RedisBackedAggregator is skilled in managing data save operations the Redis, using the Redisson API. The initializer just generates aggregating objects, which will be updated in the stream processing progress.
class RedisBackedAggregator<K, V> implements Aggregator<K, V, RedisBackedAggregation<V>> {
/** latest aggregation */
RedisBackedAggregation<V> currentAggregation;
/** identifier of the stream */
private final String streamId;
/** size of window */
private final long winSize;
/** The Redisson client reference */
private final Redisson redisson;
RedisBackedAggregator(Redisson redisson, String streamId,long winSize) {
this.streamId = streamId;
this.redisson = redisson;
this.winSize= winSize;
}
public RedisBackedAggregation<V> init() {
RedisBackedAggregation<V> prev = currentAggregation;
currentAggregation = new RedisBackedAggregation<V>(winSize,System.currentTimeMillis(),streamId + "." + UUID.randomUUID().toString(), (prev == null) ? null : prev.collectionKey);
return currentAggregation;
}
@Override
public RedisBackedAggregation<V> apply(K key, V value, RedisBackedAggregation<V> aggregate) {
aggregate.<V>getListAccess(redisson).add(value);
return aggregate;
}
}
The RedisBackedAggregator is built quite primitively, it can only generate and update a new instance of aggregating class object initially, connect this new instance to the previous one, which represents the collection of data related to the previous window, and update the instance of aggregating class with newly incoming data.
The RedisBackedAggregation is persistable and skilled in putting and extracting data using resources of the Redis, way of the Redisson API.
After the aggregation in a window, aggregating objects will have references to the external resources and can extract data from the resources, when needed.
The objects could be streamed from KTable as a stream using the KTable#toStream method and then directed to a queue for further processing.
The logic is straightforward:
The Stream aggregates streamed data for a time window, where the aggregating structure contains parameters of the window and reference to the external storage (Redis Cache's collection), where real data are placed.
The reading stream can just read the references to the persisted staff and extract the values.
The example is just targeted to display possibilities of combining various tools, that are simple, well-documented, and do not require a learning curve or /and serious changes in the CI/CD processes.
Anyway, if you already have the Kafka technology in your stack and meet challenges like data size, memory usage, or others, still, there is a great chance to solve your problems and organize good processing topology, by combining the Kafka Streams technology with other services and tools.