Stateful transformations in Spark Streaming - Part 1

Stateful transformations in Spark Streaming - Part 1

In the previous article of this series i.e. Spark Streaming in layman's terms we have understood the following things.

  1. Different streaming sources
  2. Stateful vs Stateless transformation

For those who are reading this article without reading the previous article of this series, I recommend reading the last article or watching the video before starting this article. This will give you a better understanding of all spark streaming concepts.

I also created a YouTube video for the same concepts. Please follow these links.

Useful links:

Here in this session, we will mainly focus on the following stateful transformations.

  • updateStateByKey (This Article)
  • reduceByKeyAndWindow (Next Article)

These functions will help you to understand the stateful transformation in an easy way.

So let's start and discuss these concepts in very layman's terms.


Before understanding these examples let's quickly recall the meaning of stateful transformations.

Stateful Transformation

Stateful transformation is a particular property of spark streaming,?it enables us to maintain the state between micro-batches. In other words, it maintains the state across a period of time, which can be as long as an entire session of streaming jobs.

We mostly achieved this by creating checkpoints on streaming applications.

If we understand it in simple terms then using stateful transformation we can use some data from the previous batch to generate the results for a new batch.


Let me try to make you understand the above concept using one simple example too.

  • Let's consider, you have a streaming application which runs for 12 hours continuously.
  • Suppose our batch interval is of 5 minutes. It means that a new RDD will be created every 5 minutes.
  • So during the entire stream, we will have 144 RDD.
  • It's simple mathematics.
  • 12 hours = 12 * 60 = 720 minutes.
  • 720 minutes / 5 minutes = 144 RDD
  • Now let's suppose we wanted to create a running sum then we need information about the previous states too. It is because it is a streaming application. At one point in time, you will only have some data. Some of the data might have already come or some might be coming in the next batch duration.

There can be two scenarios here.

  • We might be interested in calculating the running sum of the entire stream i.e. 12 hours.
  • Or, the last 30 minutes. It means that we are interested in the last 6 RDDs always. For this type of requirement, we use the concept of sliding interval and window size. Please check my previous article to understand the concept of sliding interval and window size.


Let's try to solve the first scenario i.e. finding the running sum of the entire stream. Here we will be calculating the frequency of each word across the entire stream.

We will be using the updateStateByKey transformation function for this.



updateStateByKey

updateStateByKey is a stateful transformation and it requires 2 steps.

  1. Define a state to start with - The state can be an arbitrary data type.
  2. A function to update the state - In this function, you have to specify how to update the state with the previous state and the new value received from the stream.

Do you remember, we have written a word count example in our previous article where we read a stream of data coming from the console?

Just follow this article and you will find the word count code snippet.


Now let's suppose we want to count the occurrence of some word in an entire stream of data. For example, we would like to check how many times the word "Data" is occurring in the entire stream.


I hope you have understood the above problem statement of finding the occurrence of one particular word in the entire stream.


In this example too we will be using a socket as the source or producer for producing data. For using a socket, we will simply use `nc -lk 9000`?command. It will help us to produce data from port 9000.


Let's suppose for the first time, we have entered the following line in the terminal.

The Big Data Show is a great channel for learning Data Engineering concepts.

So after applying the following code

val sc: SparkContext = spark.sparkContext 

// Initiate the streaming context
val ssc: StreamingContext = new StreamingContext(
  sc,
  batchDuration = Seconds(15)
)

// Here we have used the check point directory. It will help us 
// perform stateful operation.
ssc.checkpoint("./checkpointing")


val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
    "localhost", 9000
 )

val words: DStream[String] = lines.flatMap(x => x.split(" "))
 
val wordsFrequencyMap = words.map(w => {
   (w, 1)
})        

We will get output something like this

(The, 1)

(Big, 1)

(Data, 1)

(Show, 1)

(is, 1)

(a, 1)

(great, 1)

(channel, 1)

(for, 1)

(learning, 1)

(Data, 1)

(Engineering, 1)

(concepts, 1))        

Now we will use the updateStateByKey function to apply our first stateful operation.

We have just read above that updateStateByKey work on the entire stream of data & it accepts one function as a parameter. Let's look at an example


def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {
 
 // Here we are filtering the data
 val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
    x._1.startsWith("Data")
  })
  
  val result = filterMap.updateStateByKey(updateFunc)

  result.print()
}        


Here you can see that the first thing that we are doing is filtering all messages of RDD which starts with "Data". Then we are calling updateStateByKey on our Filtered Dstream and pass one updateFunc here. Let's check the updateFunc code & things will look more clear then.


def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = {
  
  val newCount = previousState.getOrElse(0) + newValues.sum

  Some(newCount)

}        

Here you can see that for the first time, there will be the following values of the parameter passed.

(Data, {1, 1}) newValues = Seq(1, 1) previousState = None

But as we have used getOrElse(0) with the previousState whose data type is Option[Int]. So value None will equal 0 in the code.


After doing the calculation, updateFunc will return 2 for two times occurrences of the word "Data"


Now let's suppose we enter the following lines in the terminal which is a producer for us.

The Big Data Show is an interesting channel on Youtube but there are other channels too which are equally good for learning Data Engineering.

Now once we enter this line and after our filter statement is applied.

....filter(x=> {
    x._1.startsWith("Data")
  })        

We will get the following parameter in updateFunc function.

(Data, {1, 1}) newValues = Seq(1, 1) previousState = 2

Here if you are observing and understanding it clearly then now previousState is not None but 2.

After running this code, it will return the output as 4.

So now you have understood how updateStateByKey can be used for calculating operations in the entire stream of data.

I hope, you do understand that we are using a checkpoint directory in our code where we are storing all our states.



But frankly speaking, there will be very few scenarios where you will be really interested in calculating the running aggregation for the entire stream of data. It is technically not flexible also because if we keep maintaining the state every time for the entire stream then the size of the checkpointing directory will keep on increasing and this creates a lot of problems.

I have not seen or implemented the aggregation of stream data for the entire duration of the entire stream.

One needs to understand that this information of checkpointing directory will actually be copied to every executor and if the size of checkpointing keeps on growing then there are greater chances of getting memory out of exception error.

Then how can we actually apply the aggregation or operation on streaming data?

We can actually do it in production but don't apply an operation on the entire stream mostly but we can apply aggregation on the last 30 min or the last 15 min of data.

For this Window Size and Sliding Interval comes in very handy.

We will understand this concept in the next article and we will try to understand it with one function reduceByKeyAndWindow. Till then, keep learning and goodbye. Cheers to learning.

Please find the whole code snippet of updateStateByKey demo.

package sparkStreaming

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{SparkIOUtil => SU}

object WordSearching {

  val WORDS_TO_SEARCH = "Data"

  def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = {
    val newCount = previousState.getOrElse(0) + newValues.sum

    Some(newCount)
  }


  def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {
    val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
      x._1.startsWith(WORDS_TO_SEARCH)
    })

    val result = filterMap.updateStateByKey(updateFunc)

    result.print()
  }
  

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SU.getSparkSession
    spark.sparkContext.setLogLevel("ERROR")
    val sc: SparkContext = spark.sparkContext
    val ssc: StreamingContext = new StreamingContext(
      sc,
      batchDuration = Seconds(15)
    )

    ssc.checkpoint("./checkpointing")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9998)

    val words: DStream[String] = lines.flatMap(w => w.split(" "))

    val wordsFrequencyMap: DStream[(String, Int)] = words.map(w => (w, 1))


    stateFullEntireStream(wordsFrequencyMap)



    ssc.start()
    ssc.awaitTermination()


  }
}        


Feel free to subscribe to my YouTube channel i.e?The Big Data Show. I might upload a more detailed discussion of the above concepts in the coming days.

More so, thank you for that most precious gift to a me as writer i.e. your time.

Rachit Mishra

Data Engineering | Distributed Systems | DA-IICT '16 | UT Dallas '18 | AWS x Databricks Certified

1 年

hey Ankur Ranjan - stumbled across this a little while back, just dropping by to say terrific read. Neatly explained. Assuming the state results from a prior microbatch are held in memory to be passed down to subsequent microbatches, and say we are dealing with a large state, it would be reasonable to expect OOM and spill to disk issues? I was working on a ~1gb data being ingested as data streams, with high cardinality on most fields - running a cumulative running % distribution, and started seeing disk spill issues. Skew is not a problem, the I/O is it appears. Just wanted to check if you have encountered any potential downsides to maintain too much state data in memory and how you may have navigated through them.

回复

Nice post, thank you for sharing

回复
Bhanu R.

Software Engineer- AI/ML | Generative AI | Perplexity Business Fellow | MSCS @ CSUEB

2 年

This is very insightful article. Thanks ??

Ravikumar Vishwakarma

Director for Data and Analytics | Azure | Microsoft Fabric | Strategist | Guest Speaker | Data Engineer

2 年

The state can be maintained till what duration? Is there any limit?

回复
Naman Seth

Sr. Data Engineer @ Mastercard ????| 25K+ | Spark | SQL | Databricks | Python | 2x Azure | Open for Collaboration | Ex- Infosys, Tiger Analytics

2 年

How about stressful transformation which is getting on our nerves???

要查看或添加评论,请登录

Ankur Ranjan的更多文章

  • Apache Arrow Flight

    Apache Arrow Flight

    A Few Days Ago During a conversation with my lead, Karthic Rao , at e6data , I was introduced to a fascinating…

    22 条评论
  • Unlocking Apache Kafka: The Secret Sauce of Event Streaming?

    Unlocking Apache Kafka: The Secret Sauce of Event Streaming?

    What is Apache Kafka? Why is Kafka considered an event streaming platform & more over what does an event actually mean?…

    6 条评论
  • Spark Dynamic Resource Allocation

    Spark Dynamic Resource Allocation

    One of Spark's great features is the support of dynamic resource allocations. Still, with my experience in the last 5…

    6 条评论
  • Intro to Kafka Security for Data Engineers - Part 1

    Intro to Kafka Security for Data Engineers - Part 1

    I have a story about Kafka and Data Engineers that I'd like to share. In the world of Data Engineering, there are two…

    10 条评论
  • Apache Hudi: Copy on Write(CoW) Table

    Apache Hudi: Copy on Write(CoW) Table

    As Data Engineer, we frequently encounter the tedious task of performing multiple UPSERT(update + insert) and DELETE…

    11 条评论
  • Solve Small File Problem using Apache Hudi

    Solve Small File Problem using Apache Hudi

    One of the biggest pains of Data Engineers is small file problems. Let me tell you a short story and explain how one of…

  • Data Swamp - A problem arises due to the love life of Data Engineers.

    Data Swamp - A problem arises due to the love life of Data Engineers.

    Philosophy and the cycle of love even hold in the world of Data Engineering. Let me help you understand how the love of…

    2 条评论
  • Supercharging Apps with Polyglot Persistence: A Simple Guide

    Supercharging Apps with Polyglot Persistence: A Simple Guide

    After working for more than 4 years on Data Intensive applications in a startup, consultancy and product-based…

    4 条评论
  • Optimize Google BigQuery

    Optimize Google BigQuery

    I love BigQuery and think It is one of the best products ever made by the Google Cloud Platform. As someone who works…

    6 条评论
  • Kafka for Data Engineers

    Kafka for Data Engineers

    Kafka is the prominent queuing system which is the most used technology in all streaming solutions. In most of my…

    6 条评论

社区洞察

其他会员也浏览了