Boost Real-time Processing with Spark Structured Streaming
Patrick Nicolas
Director Data Engineering @ aidéo technologies |software & data engineering, operations, and machine learning.
Conventional distributed batch processing systems fall short in supporting applications like social media platforms, Internet of Things, or business-to-consumer online transactions. Fortunately, Apache Structured Streaming equips software developers with the necessary tools for large-scale, real-time stream processing.
This article delves into how the classic Extract-Transform-Load (ETL) pipeline is implemented within the realm of real-time streaming data.
What you will learn:?How to implement real-time ETL using Spark Structured Streaming.
Notes:
Introduction
Apache Spark
Apache Spark is an?open-source distributed computing system?designed for handling large-scale data processing??[ref?1]. It leverages?in-memory caching?and?refined query execution strategies?to deliver rapid analytic responses, regardless of the data's size.
Spark streamlines the process by requiring only a single step: data is loaded into memory, operations are executed, and outcomes are written back, leading to significantly quicker execution. Additionally, Spark enhances efficiency in machine learning algorithms by caching data in memory, allowing for rapid repeated function calls on the same dataset.
Structured streaming
Apache Spark Structured Streaming is a?stream processing framework that's both scalable and resilient to faults, built atop the Spark SQL engine. Its approach to streaming computation mirrors the batch processing model applied to static datasets. The Spark SQL engine manages the task of executing this process incrementally and perpetually, updating the outcomes as new streaming data flows in [ref 2].
In contrast with Spark's original streaming library that relied on RDDs, Structured Streaming facilitates processing based on event time, incorporates watermarking features, and utilizes the DataFrame API that is a part of Spark SQL.
Spark Streaming processes incoming data by splitting it into small batches, which are executed as Resilient Distributed Datasets (RDDs). On the other hand, Structured Streaming operates on a DataFrame linked to an infinite table, using an API that's fine-tuned for enhanced performance [ref?3].
Streaming components
In this section, we'll provide a concise overview of the essential elements of Spark Streaming that are employed in any?Extract-Transform-Load(ETL) process.
Setup?
To develop a structured streaming application, at least three Spark libraries, in the form of jar files, are essential: Core, SQL, and Streaming. The Maven pom.xml snippet provided below demonstrates how to set up these three libraries:
<spark.version>3.4.0</spark.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
Important note: The artifact for Spark structured streaming with input from Kafka is?spark-streaming-kafka-0-10_2.12.
Our use case utilizes Spark's transformations and actions to construct an ETL (Extract, Transform, Load) pipeline.
Transformation
The class STransform defined the data transformation of DataFrame (map function) using SQL, syntax. The class attributes are:
The selects fields and whereConditions are concatenate for the SQL statement. There is no validation of the generated SQL query prior execution.
TransformFunc = (DataFrame, String) => DataFrame
class STransform(
selectFields: Seq[String],
whereConditions: Seq[String],
transformFunc: TransformFunc,
descriptor: String = ""
){
def apply(df: DataFrame): DataFrame =
transformFunc(df, queryStmt)
def queryStmt: String = {
val whereStr =
if (whereConditions.nonEmpty)
s"WHERE ${whereConditions.mkString("AND ")}"
else ""
s"SELECT ${selectFields.mkString(",")} FROM temptable $whereStr"
}
}
Action
The class?SAggregator?wraps the group by operation with an aggregation function.
AggrFunc = Column => Column
class SAggregator(
groupByCol: Column,
aggrCol: Column,
aggrFunc: AggrFunc,
aggrAliasName: String
){
def apply(inputDF: DataFrame): DataFrame = inputDF
.groupBy(groupByCol)
.agg(aggrFunc(aggrCol).alias(aggrAliasName))
}
Streams wrapper
SparkStructStreams?defines the generic wrapper trait for structured streaming with the minimum set of required attributes to describe any ETL-based pipeline.
Each specific ETL pipeline has to override the following variables:
rait SparkStructStreams{
val outputMode: OutputMode
val outputFormat: String
val outputColumn: String
val isConsoleSink: Boolean
val transform: Option[STransform]
val aggregator: Option[SAggregator]
}
ETL
领英推荐
Streaming pipeline
Our data pipeline implements the conceptual?Extract-Transform-Load pattern.?
The extraction consists of reading the data stream from HDFS in JSON format. The two fundamental types of data processing tasks in Apache?Spark are transformations (map) and actions (reduce). They implements the transform section of the pipeline.
Finally the data stream is written into sink as CSV file, implementing the Load task.
We wrap the streaming pipeline into a class, SparkStructStreaminFromFile inherited from SparkStructStreams to which we add the path of the source, folderPath and an implicit reference to the SparkSession.
As the transform and aggregation tasks rely on SQL statements, we need to extract the schema from the data source. The data source consists of JSON files so we infer the schema from the first record.
class SparkStructStreamsFromFile (
folderPath: String, // Absolute path for the source file
override val outputMode: OutputMode, // Mode for writer stream (i.e. Append, Update, ...)
override val outputFormat: String, // Format used by the stream writer (json, console, csv, ...)
override val outputColumn: String, // Name of the aggregated column
override val isConsoleSink: Boolean,
override val transform: Option[STransform], // Transformation (DataFrame, SQL) => DataFrame
override val aggregator: Option[SAggregator] // groupBy (single column) + sql.functions._
)(implicit sparkSession: SparkSession) extends SparkStructStreams {
// Extract schema from files
lazy val schema: StructType =
sparkSession.read.json(s"hdfs://${folderPath}").head().schema
def execute(): Unit = {
// -------------------- EXTRACT ------------------------
// Step 1: Stream reader from a file 'folderPath'
val readDF: DataFrame = sparkSession
.readStream
.schema(schema)
.json(s"hdfs://$folderPath")
assert(readDF.isStreaming)
// ----------------- TRANSFORM ---------------------
// Step 2: Transform
val transformedDF: DataFrame = transform.map(_(readDF)).getOrElse(readDF)
// Step 3: Aggregator
val aggregatedDF = aggregator.map(_(transformedDF)).getOrElse(transformedDF)
// Step 4: Debugging to console
aggregatedDF.writeStream.outputMode(OutputMode.Complete()).format("console").start()
//-------------------- LOAD ---------------------------
// Step 5: Stream writer into a table
val query = aggregatedDF
.writeStream
.outputMode(OutputMode.Update())
.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.select(outputColumn)
.write
.mode(SaveMode.Overwrite)
.format(outputFormat)
.save(path = s"hdfs://output/$outputFormat")
batchDF.unpersist()
}
// Step 6: Initiate streaming
.trigger(Trigger.ProcessingTime("4 seconds"))
.start()
query.awaitTermination()
}
}
The method?execute?implements the logic of the streaming pipeline. There are 6 steps
Extract
The extraction of data consists of loading the JSON data??into a partitioned data frame,?df?through API method,?readStream.
df?= sparkSession.readStream.schema(mySchema).json(path)
Transform
The transformation,?myTransformFunc, convert the data frame using extracted data,?readDF?and SQL query,?sqlStatement, to execute: ?SELECT age, gender FROM table where age > 18;?The result of the query is stored into a temporary view, 'temptable'.
def myTransformFunc(
readDF: DataFrame,
sqlStatement: String
)(implicit sparkSession: SparkSession): DataFrame = {
readDF.createOrReplaceTempView("TempView")
sparkSession.sql(sqlStatement)
}
val myTransform = new STransform(
Seq[String]("age","gender"),
Seq[String]("age > 18"),
myTransformFunc,
"Filter by age"
)
The second step is to compute the average age of grouped data as?SELECT ?gender, avg(age) FROM TempView GROUP BY gender;
def aggregatedFunc(inputColumn: Column): Column = {
import org.apache.spark.sql.functions._
avg(inputColumn)
}
val myAggregator = new SAggregator(
new Column("gender"),
new Column("age"),
aggregatedFunc,
"avg_age"
)
Load
The final task is to write the CSV file sink.
df.writeStream.outputMode(Update())
.foreachBatch{
(df: DataFrame, batchId: Long) =>
df.persist()
df.select('ave_age')
.write
.mode(Overwrite)
.format("css")
.save(path)
df.unpersist()
}.trigger(Trigger.ProcessingTime("4 seconds")).start()
The?foreachBatch?function enables developers to define a specific operation to be applied to the output data from each micro-batch within a streaming query. However, this function cannot be used in continuous processing mode, where foreach would be the suitable alternative.
The mode defines the procedure to update the unbounded result table:
Putting all together
SparkStructStreamsFromFile(
path,
OutputMode.Update(),
outputFormat = "csv",
outputTable = "avg_age",
debug = true,
myTransform,
myAggregator
).execute()
The output of the streaming pipeline in CSV format is?
gender, avg(age)
male,36
female,33
Thank you for reading this article. For more information ...
References
---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning.? He has been director of data engineering at Aideo Technologies since 2017 and he is the?author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3
#spark #streaming #databricks #scala #etl #kafka #structuredstreaming