"Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Streaming"
Abhishek Singh
Technical Lead Data Engineer Azure at Publicis Sapient. Expertise in SQL, Pyspark and Scala with Spark, Kafka with Spark Streaming, Databricks, and Data Tuning Spark Application for PetaByte. Cloud AWS, Azure and GCP
Structured Streaming APIs enable building end-to-end streaming applications called?continuous applications?in a consistent, fault-tolerant manner that can handle all of the complexities of writing such applications. It does so without having to reason about the nitty-gritty details of streaming itself and by allowing the usage of familiar concepts within Spark SQL such as DataFrames and?Datasets. All of this has led to a high interest in use cases wanting to tap into it. From?introduction, to?ETL, to?complex data formats, there has been a wide coverage of this topic. Structured Streaming is also integrated with third party components such as Kafka, HDFS, S3, RDBMS, etc
Connecting to a Kafka Topic
Let’s assume you have a?Kafka cluster?that you can connect to and you are looking to use Spark’s Structured Streaming to ingest and process messages from a topic. The Databricks platform already includes an Apache Kafka 0.10 connector for Structured Streaming, so it is easy to set up a stream to read messages:
Streaming ETL
Now that the stream is set up, we can start doing the required ETL on it to extract meaningful insights. Notice that?streamingInputDF?is a DataFrame. Since DataFrames are essentially an untyped Dataset of rows, we can perform similar operations to them.
Let’s say that the generic ISP hit JSON data is being pushed to the Kafka?<topic>?above. An example value would look like this:
Notice in the command above, we are able to parse the zipcode out of incoming JSON messages, group them and do a count, all in real-time as we are reading data from the Kafka topic. Once we have the count, we can display it, which fires the streaming job in the background and continuously updates the counts as new messages arrive.
Windowing
Now that we have parse, select, groupBy and count queries continuously executing, what if we want to find out traffic per zip code for a 10 minute window interval, with sliding duration of 5 minutes starting 2 minutes past the hour?
Output Options
So far, we have seen the end results being displayed automatically. If we want more control in terms output options, there are a variety of output modes available. For instance, if we need to debug, you may wish to select the console output. If we need to be able to query the dataset interactively as data is being consumed, the memory output would be an ideal choice. Similarly, the output can be written to files, external databases, or even streamed back to Kafka.
Memory
In this scenario, data is stored as an in-memory table. From here, users are able to query the dataset using SQL. The name of the table is specified from the?queryName?option. Note we continue to use?streamingSelectDF?from the above windowing example.
Console
In this scenario, output is printed to console/stdout log.
Databases
Often times we want to be able to write output of streams to external databases such as MySQL. At the time of writing, the Structured Streaming API does not support external databases as sinks; however, when it does, the API option will be as simple as?.format("jdbc").start("jdbc:mysql/.."). In the meantime, we can use the foreach sink to accomplish this. Let’s create a custom JDBC Sink that extends?ForeachWriter?and implements its methods.
We can now use the?JDBCSink
As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.
Kafka
Similar to writing to databases, the current Structured Streaming API does not support the “kafka” format, but this will be available in the next version. In the meantime, we can create a custom class named?KafkaSink` which extends _ForeachWriter. Let’s see how that looks:
Now we can use the writer:
You can now see that we are pumping messages back to Kafka topic?<topic2>. In this case we are pushing updated?zipcode:count?at the end of each batch. The other thing to note is that streaming Dashboard provides insights into incoming messages versus processing rate, batch duration and raw data that is used to generate it. This comes in very handy when debugging issues and monitoring system.
On the Kafka consumer side, we can see:
In this case, we are running in “update” output mode. As messages are being consumed, zipcodes that are getting updated during that batch are being pushed back to Kafka. Zipcodes that do not get updated are not being sent. You can also run in “complete” mode, as we did in the database sink above, in which all of the zipcodes with latest count will be sent, even if some of the zipcode counts did not change since the last batch.
I hope this will help you to learn kafka and spark streaming ETL with sink formats.
Thank you