Apache Spark Structured Streaming

Apache Spark Structured Streaming

Introduction

Apache Spark Structured Streaming is a scalable, fault-tolerant stream processing engine built on top of Spark SQL. Unlike traditional stream processing frameworks, it treats streams as incremental tables and processes data using declarative SQL-style queries.

Why Spark Structured Streaming?

  • End-to-end exactly-once processing (no duplicate records).
  • Scalable: Handles millions of events per second.
  • Integrated with batch & ML pipelines.
  • Supports stateful aggregations & windowing functions.


Input Sources :

  • Kafka, Kinesis, Socket, Files, Delta Lake, S3, etc.

Sink Targets :

  • Kafka, S3, Delta, MySQL, PostgreSQL, ElasticSearch, etc.


Operation:

The stream of input data will be flowing into spark which will be treated as an unbounded table. Each time the new data comes in, it will be appended to the unbounded table. And this unbounded table will be processed by spark each time with the new appended data. After processing the input table, a result table will be created and it'll be put into the sink in different output mode.


Output Modes in Structured Streaming

Append: This is the default output mode. Only new rows are written to the sink. Suited for non-aggregated queries.

Update: Updates existing rows with new aggregated values and also the new rows.

Complete: Writes entire aggregated results every time.


Fault Tolerance: Checkpoint Directory

A checkpoint directory is a storage location (e.g., HDFS, S3, DBFS) where Spark persists metadata and intermediate states for a streaming job. If a failure occurs, Spark can restart from the last successful state rather than reprocessing all data from the beginning.

Key Components Stored in the Checkpoint Directory

  1. Metadata Logs – Stores information about the executed queries.
  2. Offsets – Keeps track of processed data offsets (e.g., Kafka topic partitions).
  3. State Store – Stores intermediate aggregation results for stateful operations like windowed aggregations.
  4. Committed Files – Tracks successfully written files to prevent duplication.

df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .option("checkpointLocation", "hdfs:///user/spark/checkpoints/") \
    .start()        


Triggers

Triggers in Spark Structured Streaming define how frequently the streaming query processes new data. They control the batching behavior of micro-batches, influencing latency and throughput.

1. Default Trigger (Micro-Batch Mode)

  • Spark automatically decides when to process the next batch. Its as soon as the previous batch completes.
  • It runs as fast as possible with minimum latency.
  • This is the default behavior if no trigger is specified.
  • Best for: Low-latency applications where throughput is not a major concern.

df.writeStream \ 
          .format("console") \ 
          .start()        

2. Processing Time Trigger (Trigger.ProcessingTime)

  • Runs the query at a fixed interval, e.g., every 10 seconds.
  • Reduces resource usage by avoiding continuous execution.
  • Ensures periodic data processing.
  • Suited in use cases where a slight delay in processing is acceptable (e.g., dashboard updates).
  • Controlling resource usage in cost-sensitive environments.

from pyspark.sql.streaming import Trigger 

df.writeStream \ 
           .format("console") \ 
           .trigger(Trigger.ProcessingTime("10 seconds")) \ 
           .start()        

3.One-Time Trigger (Trigger.Once)

  • The query processes all available data only once and then stops.
  • Useful for batch-like processing of streaming data.
  • Works well when using structured streaming for ETL.
  • Best for,Scheduled batch jobs that run periodically.
  • Best for, Streaming data ingestion pipelines into a data lake.

df.writeStream \ 
            .format("parquet") \
            .option("path", "s3://output-data/") \
            .trigger(Trigger.Once()) \ 
            .start()        

4.Continuous Processing (Trigger.Continuous)

  • Enables low-latency processing (milliseconds-level).
  • No micro-batches; each record is processed as it arrives.
  • Useful for real-time applications like fraud detection.
  • Best for: Ultra-low latency use cases (e.g., real-time fraud detection, IoT streaming). Applications where delays of seconds are not acceptable.
  • Limitations: Only supports append mode (no updates or aggregates). Requires exactly-once sinks (e.g., Kafka, Delta Lake).

Example: Continuous Processing (Low Latency Mode)

df.writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("topic", "alerts") \
            .trigger(Trigger.Continuous("1 second")) \
           .start()        

Watermarking & Late Data Handling

Watermarking allows Structured Streaming to handle late events while maintaining performance and accuracy.

df.withWatermark("timestamp", "10 minutes") \ 
   .groupBy(window(col("timestamp"), "5 minutes")) \ 
   .count()        

?? How It Works?

  • If an event arrives within 10 minutes of the latest processed data → It is included.
  • If it arrives later than 10 minutesIt is ignored.


Stateful Aggregations & Windowing in Streaming

Structured Streaming allows stateful processing, maintaining aggregates over a time window.

Tumbling Windows (Fixed Intervals)

  • Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals.
  • An input can only be bound to a single window.

  • Non-overlapping, fixed-time intervals.

df.groupBy(window(col("timestamp"), "5 minutes")).count()        

Sliding Windows (Overlapping Intervals)

  • Sliding windows are similar to the tumbling windows from the point of being “fixed-sized”, but windows can overlap if the duration of slide is smaller than the duration of window, and in this case an input can be bound to the multiple windows.

  • Overlapping windows allow repeated processing for better accuracy.

df.groupBy(window(col("timestamp"), "5 minutes", "2 minutes")).count()        

Session Windows (User Activity-Based Windows)

  • Session window has a dynamic size of the window length, depending on the inputs. A session window starts with an input, and expands itself if following input has been received within gap duration. For static gap duration, a session window closes when there’s no input received within gap duration after receiving the latest input.

  • Dynamically created windows based on user sessions.

df.groupBy(session_window(col("timestamp"), "10 minutes")).count()        


Join Operations in Structured Streaming

Structured Streaming supports stream-static and stream-stream joins.

Stream-Static Joins

  • Useful when you need to enrich streaming data with a lookup table.

static_df = spark.read.format("jdbc").option("url", "jdbc:mysql://db").load() df.join(static_df, "id", "left")        

Stream-Stream Joins

  • Requires watermarking for state management.

df1.join(df2, expr("df1.id = df2.id AND df1.timestamp >= df2.timestamp - interval 10 minute)        


Optimizations in Spark Structured Streaming

RocksDB-based Stateful Processing

  • Spark 3.x introduced RocksDB for better memory efficiency in stateful processing.


Kafka Source Partitioning

  • Uses Kafka consumer group parallelism to scale input ingestion.

df.option("startingOffsets", "earliest")        


Adaptive Query Execution (AQE)

  • Dynamically optimizes join strategies & partitioning based on runtime stats.

spark.conf.set("spark.sql.adaptive.enabled", "true")        


Conclusion

Apache Spark Structured Streaming bridges the gap between batch and streaming processing, offering a declarative, SQL-driven approach. With support for exactly-once semantics, stateful aggregations, and integrations with Kafka, Delta Lake, and cloud storage, it is one of the most powerful streaming engines available today.

要查看或添加评论,请登录

Lashman Bala的更多文章

社区洞察

其他会员也浏览了