"Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Streaming"?

"Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Streaming"

Structured Streaming APIs enable building end-to-end streaming applications called?continuous applications ?in a consistent, fault-tolerant manner that can handle all of the complexities of writing such applications. It does so without having to reason about the nitty-gritty details of streaming itself and by allowing the usage of familiar concepts within Spark SQL such as DataFrames and?Datasets . All of this has led to a high interest in use cases wanting to tap into it. From?introduction , to?ETL , to?complex data formats , there has been a wide coverage of this topic. Structured Streaming is also integrated with third party components such as Kafka, HDFS, S3, RDBMS, etc

No alt text provided for this image

Connecting to a Kafka Topic

Let’s assume you have a?Kafka cluster ?that you can connect to and you are looking to use Spark’s Structured Streaming to ingest and process messages from a topic. The Databricks platform already includes an Apache Kafka 0.10 connector for Structured Streaming, so it is easy to set up a stream to read messages:

No alt text provided for this image
No alt text provided for this image

Streaming ETL

Now that the stream is set up, we can start doing the required ETL on it to extract meaningful insights. Notice that?streamingInputDF?is a DataFrame. Since DataFrames are essentially an untyped Dataset of rows, we can perform similar operations to them.

Let’s say that the generic ISP hit JSON data is being pushed to the Kafka?<topic>?above. An example value would look like this:

No alt text provided for this image
No alt text provided for this image

Notice in the command above, we are able to parse the zipcode out of incoming JSON messages, group them and do a count, all in real-time as we are reading data from the Kafka topic. Once we have the count, we can display it, which fires the streaming job in the background and continuously updates the counts as new messages arrive.

Windowing

Now that we have parse, select, groupBy and count queries continuously executing, what if we want to find out traffic per zip code for a 10 minute window interval, with sliding duration of 5 minutes starting 2 minutes past the hour?

No alt text provided for this image

Output Options

So far, we have seen the end results being displayed automatically. If we want more control in terms output options, there are a variety of output modes available. For instance, if we need to debug, you may wish to select the console output. If we need to be able to query the dataset interactively as data is being consumed, the memory output would be an ideal choice. Similarly, the output can be written to files, external databases, or even streamed back to Kafka.


Memory

In this scenario, data is stored as an in-memory table. From here, users are able to query the dataset using SQL. The name of the table is specified from the?queryName?option. Note we continue to use?streamingSelectDF?from the above windowing example.

No alt text provided for this image

Console

In this scenario, output is printed to console/stdout log.

No alt text provided for this image

Databases

Often times we want to be able to write output of streams to external databases such as MySQL. At the time of writing, the Structured Streaming API does not support external databases as sinks; however, when it does, the API option will be as simple as?.format("jdbc").start("jdbc:mysql/.."). In the meantime, we can use the foreach sink to accomplish this. Let’s create a custom JDBC Sink that extends?ForeachWriter?and implements its methods.

No alt text provided for this image

We can now use the?JDBCSink

No alt text provided for this image

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

No alt text provided for this image

Kafka

Similar to writing to databases, the current Structured Streaming API does not support the “kafka” format, but this will be available in the next version. In the meantime, we can create a custom class named?KafkaSink` which extends _ForeachWriter. Let’s see how that looks:

No alt text provided for this image

Now we can use the writer:

No alt text provided for this image

You can now see that we are pumping messages back to Kafka topic?<topic2>. In this case we are pushing updated?zipcode:count?at the end of each batch. The other thing to note is that streaming Dashboard provides insights into incoming messages versus processing rate, batch duration and raw data that is used to generate it. This comes in very handy when debugging issues and monitoring system.

On the Kafka consumer side, we can see:

No alt text provided for this image

In this case, we are running in “update” output mode. As messages are being consumed, zipcodes that are getting updated during that batch are being pushed back to Kafka. Zipcodes that do not get updated are not being sent. You can also run in “complete” mode, as we did in the database sink above, in which all of the zipcodes with latest count will be sent, even if some of the zipcode counts did not change since the last batch.

I hope this will help you to learn kafka and spark streaming ETL with sink formats.

Thank you






要查看或添加评论,请登录

社区洞察

其他会员也浏览了