Stateful transformations in Spark Streaming - Part 1
In the previous article of this series i.e. Spark Streaming in layman's terms we have understood the following things.
For those who are reading this article without reading the previous article of this series, I recommend reading the last article or watching the video before starting this article. This will give you a better understanding of all spark streaming concepts.
I also created a YouTube video for the same concepts. Please follow these links.
Useful links:
Here in this session, we will mainly focus on the following stateful transformations.
These functions will help you to understand the stateful transformation in an easy way.
So let's start and discuss these concepts in very layman's terms.
Before understanding these examples let's quickly recall the meaning of stateful transformations.
Stateful Transformation
Stateful transformation is a particular property of spark streaming,?it enables us to maintain the state between micro-batches. In other words, it maintains the state across a period of time, which can be as long as an entire session of streaming jobs.
We mostly achieved this by creating checkpoints on streaming applications.
If we understand it in simple terms then using stateful transformation we can use some data from the previous batch to generate the results for a new batch.
Let me try to make you understand the above concept using one simple example too.
There can be two scenarios here.
Let's try to solve the first scenario i.e. finding the running sum of the entire stream. Here we will be calculating the frequency of each word across the entire stream.
We will be using the updateStateByKey transformation function for this.
updateStateByKey
updateStateByKey is a stateful transformation and it requires 2 steps.
Do you remember, we have written a word count example in our previous article where we read a stream of data coming from the console?
Just follow this article and you will find the word count code snippet.
Now let's suppose we want to count the occurrence of some word in an entire stream of data. For example, we would like to check how many times the word "Data" is occurring in the entire stream.
I hope you have understood the above problem statement of finding the occurrence of one particular word in the entire stream.
In this example too we will be using a socket as the source or producer for producing data. For using a socket, we will simply use `nc -lk 9000`?command. It will help us to produce data from port 9000.
Let's suppose for the first time, we have entered the following line in the terminal.
领英推荐
The Big Data Show is a great channel for learning Data Engineering concepts.
So after applying the following code
val sc: SparkContext = spark.sparkContext
// Initiate the streaming context
val ssc: StreamingContext = new StreamingContext(
sc,
batchDuration = Seconds(15)
)
// Here we have used the check point directory. It will help us
// perform stateful operation.
ssc.checkpoint("./checkpointing")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream(
"localhost", 9000
)
val words: DStream[String] = lines.flatMap(x => x.split(" "))
val wordsFrequencyMap = words.map(w => {
(w, 1)
})
We will get output something like this
(The, 1)
(Big, 1)
(Data, 1)
(Show, 1)
(is, 1)
(a, 1)
(great, 1)
(channel, 1)
(for, 1)
(learning, 1)
(Data, 1)
(Engineering, 1)
(concepts, 1))
Now we will use the updateStateByKey function to apply our first stateful operation.
We have just read above that updateStateByKey work on the entire stream of data & it accepts one function as a parameter. Let's look at an example
def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {
// Here we are filtering the data
val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
x._1.startsWith("Data")
})
val result = filterMap.updateStateByKey(updateFunc)
result.print()
}
Here you can see that the first thing that we are doing is filtering all messages of RDD which starts with "Data". Then we are calling updateStateByKey on our Filtered Dstream and pass one updateFunc here. Let's check the updateFunc code & things will look more clear then.
def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = {
val newCount = previousState.getOrElse(0) + newValues.sum
Some(newCount)
}
Here you can see that for the first time, there will be the following values of the parameter passed.
(Data, {1, 1}) newValues = Seq(1, 1) previousState = None
But as we have used getOrElse(0) with the previousState whose data type is Option[Int]. So value None will equal 0 in the code.
After doing the calculation, updateFunc will return 2 for two times occurrences of the word "Data"
Now let's suppose we enter the following lines in the terminal which is a producer for us.
The Big Data Show is an interesting channel on Youtube but there are other channels too which are equally good for learning Data Engineering.
Now once we enter this line and after our filter statement is applied.
....filter(x=> {
x._1.startsWith("Data")
})
We will get the following parameter in updateFunc function.
(Data, {1, 1}) newValues = Seq(1, 1) previousState = 2
Here if you are observing and understanding it clearly then now previousState is not None but 2.
After running this code, it will return the output as 4.
So now you have understood how updateStateByKey can be used for calculating operations in the entire stream of data.
I hope, you do understand that we are using a checkpoint directory in our code where we are storing all our states.
But frankly speaking, there will be very few scenarios where you will be really interested in calculating the running aggregation for the entire stream of data. It is technically not flexible also because if we keep maintaining the state every time for the entire stream then the size of the checkpointing directory will keep on increasing and this creates a lot of problems.
I have not seen or implemented the aggregation of stream data for the entire duration of the entire stream.
One needs to understand that this information of checkpointing directory will actually be copied to every executor and if the size of checkpointing keeps on growing then there are greater chances of getting memory out of exception error.
Then how can we actually apply the aggregation or operation on streaming data?
We can actually do it in production but don't apply an operation on the entire stream mostly but we can apply aggregation on the last 30 min or the last 15 min of data.
For this Window Size and Sliding Interval comes in very handy.
We will understand this concept in the next article and we will try to understand it with one function reduceByKeyAndWindow. Till then, keep learning and goodbye. Cheers to learning.
Please find the whole code snippet of updateStateByKey demo.
package sparkStreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{SparkIOUtil => SU}
object WordSearching {
val WORDS_TO_SEARCH = "Data"
def updateFunc(newValues:Seq[Int], previousState:Option[Int]): Option[Int] = {
val newCount = previousState.getOrElse(0) + newValues.sum
Some(newCount)
}
def stateFullEntireStream(wordsFrequencyMap: DStream[(String, Int)]): Unit = {
val filterMap: DStream[(String, Int)] = wordsFrequencyMap.filter(x=> {
x._1.startsWith(WORDS_TO_SEARCH)
})
val result = filterMap.updateStateByKey(updateFunc)
result.print()
}
def main(args: Array[String]): Unit = {
val spark: SparkSession = SU.getSparkSession
spark.sparkContext.setLogLevel("ERROR")
val sc: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(
sc,
batchDuration = Seconds(15)
)
ssc.checkpoint("./checkpointing")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9998)
val words: DStream[String] = lines.flatMap(w => w.split(" "))
val wordsFrequencyMap: DStream[(String, Int)] = words.map(w => (w, 1))
stateFullEntireStream(wordsFrequencyMap)
ssc.start()
ssc.awaitTermination()
}
}
Feel free to subscribe to my YouTube channel i.e?The Big Data Show. I might upload a more detailed discussion of the above concepts in the coming days.
More so, thank you for that most precious gift to a me as writer i.e. your time.
Data Engineering | Distributed Systems | DA-IICT '16 | UT Dallas '18 | AWS x Databricks Certified
1 年hey Ankur Ranjan - stumbled across this a little while back, just dropping by to say terrific read. Neatly explained. Assuming the state results from a prior microbatch are held in memory to be passed down to subsequent microbatches, and say we are dealing with a large state, it would be reasonable to expect OOM and spill to disk issues? I was working on a ~1gb data being ingested as data streams, with high cardinality on most fields - running a cumulative running % distribution, and started seeing disk spill issues. Skew is not a problem, the I/O is it appears. Just wanted to check if you have encountered any potential downsides to maintain too much state data in memory and how you may have navigated through them.
Nice post, thank you for sharing
Software Engineer- AI/ML | Generative AI | Perplexity Business Fellow | MSCS @ CSUEB
2 年This is very insightful article. Thanks ??
Director for Data and Analytics | Azure | Microsoft Fabric | Strategist | Guest Speaker | Data Engineer
2 年The state can be maintained till what duration? Is there any limit?
Sr. Data Engineer @ Mastercard ????| 25K+ | Spark | SQL | Databricks | Python | 2x Azure | Open for Collaboration | Ex- Infosys, Tiger Analytics
2 年How about stressful transformation which is getting on our nerves???