Flow in Kotlin
Amit Nadiger
Polyglot(Rust??, Move, C++, C, Kotlin, Java) Blockchain, Polkadot, UTXO, Substrate, Sui, Aptos, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Cas, Engineering management.
Kotlin Flow is a reactive streams library introduced in Kotlin 1.3 that provides a declarative and composable way to work with asynchronous data streams. It is designed to handle asynchronous and potentially infinite sequences of values, allowing developers to write reactive, non-blocking, and asynchronous code in a more concise and intuitive manner.
Key Concepts of Kotlin Flow:
How is flow built internally?
Internally, Kotlin Flow is built on top of coroutines and leverages coroutine builders and suspending functions to achieve its functionality. Let's explore the internal components and mechanisms of Kotlin Flow:
By leveraging coroutines and suspending functions, Kotlin Flow provides a powerful and concise way to work with asynchronous data streams. It enables developers to write sequential and non-blocking code that handles backpressure, cancellation, and error handling effectively.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
? ? // Create a flow that emits a sequence of numbers
? ? val numberFlow: Flow<Int> = flow {
? ? ? ? for (i in 1..5) {
? ? ? ? ? ? delay(1000) // Simulate some asynchronous operation
? ? ? ? ? ? emit(i) // Emit the current number
? ? ? ? }
? ? }
? ? // Collect and print the numbers emitted by the flow
? ? numberFlow.collect { number ->
? ? ? ? println("Received: $number")
? ? }
}
/*
Op =>
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
*/
In the above example, we create a Flow<Int> called numberFlow that emits a sequence of numbers from 1 to 5 with a delay of 1 second between each emission. The flow builder function is used to create the flow. Inside the flow block, we use the emit function to emit each number.
Next, we use the collect operator on the flow to consume and process the emitted values. The collect function is a suspending function that waits for each value emitted by the flow and performs the specified action. In this case, we simply print each received number.
Finally, we call the runBlocking function to start the main coroutine and wait for the flow collection to complete.
When you run the above code, you will see the numbers 1 to 5 printed with a 1-second delay between each number.
This example demonstrates the basic usage of Kotlin Flow, where you create a flow, emit values asynchronously, and collect the emitted values for further processing. You can apply various operators and transformations on the flow to manipulate the emitted values or combine multiple flows together.
Flow builders:
Let's dive into each of the Flow builders in Kotlin and explore their characteristics, usage scenarios, advantages, limitations, and more:
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
? ? val flow = flowOf(1, 2, 3, 4, 5)
? ?
? ? flow.collect { value ->
? ? ? ? println(value)
? ? }
}
2. asFlow:
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
fun main(){
? ? val numbers = listOf(1, 2, 3, 4, 5)
? ? val scope = CoroutineScope(Dispatchers.IO+CoroutineExceptionHandler {_,e->
? ? ? ? println("catching global exception");
? ? })
? ??
? ? val flow = numbers.asFlow()
? ??
? ? scope.launch {
? ? ? ? flow.collect { value ->
? ? ? ? ? ? println(value)
? ? ? ? }
? ? }? ?
}
3. channelFlow:
The below code creates a custom flow using channelFlow, emits values using the send() function, waits for the channel to close using awaitClose(), and collects the emitted values using collect(). It demonstrates how to create and use a custom flow with channels in Kotlin coroutines.
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
? ? val flow = channelFlow {
? ? ? ? for (i in 1..5) {
? ? ? ? ? ? send(i)
? ? ? ? }
? ? ? ? awaitClose()
? ? }
? ??
? ? flow.collect { value ->
? ? ? ? println(value)
? ? }
}
The channelFlow function is used to create a custom flow that emits values using channels. It takes a lambda expression as a parameter, which serves as the body of the flow. Inside this lambda, we can emit values using the send() function and perform other operations.
The awaitClose() function is called after emitting all the values. It suspends the flow and waits until the channel is closed. It is typically used to perform cleanup or finalize operations.
4. callbackFlow:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
fun fetchUserData(callback: (Result<User>) -> Unit) {
? ? // Simulating an asynchronous API call
? ? GlobalScope.launch {
? ? ? ? delay(1000) // Simulating a delay
? ? ? ? val user = User("Amit Nadiger", 40)
? ? ? ? val result = Result.success(user)
? ? ? ? callback(result)
? ? }
}
data class User(val name: String, val age: Int)
fun main() = runBlocking {
? ? val userDataFlow = callbackFlow<Result<User>> {
? ? ? ? val callback: (Result<User>) -> Unit = { result ->
? ? ? ? ? ? // Offer the result to the flow
? ? ? ? ? ? trySend(result).isSuccess
? ? ? ? }
? ? ? ??
? ? ? ? // Register the callback
? ? ? ? fetchUserData(callback)
? ? ? ??
? ? ? ? // Unregister the callback when the flow collection is done
? ? ? ? awaitClose {
? ? ? ? ? ? // Perform any cleanup operations here
? ? ? ? ? ? // Unregister the callback
? ? ? ? }
? ? }
? ? userDataFlow.collect { result ->
? ? ? ? if (result.isSuccess) {
? ? ? ? ? ? val user = result.getOrNull()
? ? ? ? ? ? println("User: $user")
? ? ? ? } else {
? ? ? ? ? ? val exception = result.exceptionOrNull()
? ? ? ? ? ? println("Error: $exception")
? ? ? ? }
? ? }
}
Overall, these flow builders provide different approaches to create flows in Kotlin, catering to various scenarios and requirements. It's important to choose the appropriate builder based on the characteristics of your data source, the desired flow behavior, and the level of control you need over flow emissions.
What is emit ?
emit() is a suspending function provided by the Kotlin coroutines library to emit elements from a suspending function or a coroutine. It is used with a Flow object to emit values from the flow. When called, emit() sends a value to the downstream collectors of the Flow.
Here's an example:
fun getNumbers(): Flow<Int> = flow {
? ? for (i in 1..10) {
? ? ? ? emit(i)
? ? }
}
fun main() = runBlocking {
? ? getNumbers().collect { value -> println(value) }
}
In the above example, getNumbers() is a Flow that emits values from 1 to 10 using the emit() function. The collect() function is used to collect the emitted values from the flow and print them to the console. When collect() is called, it suspends the coroutine and waits for values to be emitted by the Flow. The emit() function is called within the flow builder to emit each value to the collector.
Please note that emit is a suspending function provided by the Flow class in Kotlin Coroutines. It is used to emit a single value to a downstream collector.
You cannot use emit without a Flow. However, you can use it in a regular function that is not a Flow, as long as the function is marked as suspend and you have a FlowCollector object to emit values. For example:
suspend fun printNumbers() {
? ? val numbers = listOf(1, 2, 3, 4, 5)
? ? numbers.forEach {
? ? ? ? emit(it)
? ? }
}
Here, printNumbers() is a suspending function that can emit values using emit(), but it is not a Flow. You can use this function as a source for a Flow like this:
val flow = flow {
? ? printNumbers()
}
In this way, the printNumbers() function becomes a source for the Flow and the values it emits can be collected downstream.
Example of Flow for ICC:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val flow = flow {
for (i in 1..5) {
emit(i)
delay(1000)
}
}
launch {
flow.collect { value ->
println("Coroutine 2 received $value")
}
}
println("Coroutine 1 started")
delay(2000)
println("Coroutine 1 resuming")
flow.collect { value ->
println("Coroutine 1 received $value")
}
println("Coroutine 1 completed")
}
/*
Op =>
Coroutine 1 started
Coroutine 2 received 1
Coroutine 2 received 2
Coroutine 1 resuming
Coroutine 1 received 1
Coroutine 2 received 3
Coroutine 1 received 2
Coroutine 2 received 4
Coroutine 1 received 3
Coroutine 2 received 5
Coroutine 1 received 4
Coroutine 1 received 5
Coroutine 1 completed
*/
In the above example, we define a flow that emits the numbers 1 through 5 with a delay of 1 second between each value. We then launch two coroutines. The first coroutine collects the values emitted by the flow and prints them to the console. The second coroutine simply prints a message to indicate that it has started and resumed after a delay.
Based on the output we can see, the flow emits values to both coroutines, but they receive them in a different order. Coroutine 2 receives all values before coroutine 1, because it starts collecting before coroutine 1 resumes.
This is just a simple example, but it demonstrates the basics of using Flow to communicate between coroutines. You can use Flow to emit any kind of data stream, and you can collect the data in any way that suits your needs.
Usage of flow in Android Datastore :
DataStore in Android provides an asynchronous API using Kotlin Flow to achieve async functionality. Here's how it works:
By using Kotlin Flow with DataStore, you can benefit from the following async functionalities:
Exception handling with flow :
Exception handling in Kotlin Flow is similar to handling exceptions in regular Kotlin code. When working with Flow, you can use operators such as catch, onEach, and collect to handle exceptions and propagate them downstream. Here's an explanation of how exceptions are handled in Kotlin Flow:
Here's an example that demonstrates exception handling in Kotlin Flow:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
fun main() = runBlocking {
? ? val flow = flowOf(1, 2, 3,0,8)
? ? ? ? .map { divideByZero(it) }
? ? ? ? .catch { exception ->
? ? ? ? ? ? emit(-1) // Emit fallback value on exception
? ? ? ? ? ? println("Caught exception: $exception")
? ? ? ? }
? ? launch {
? ? ? ? flow.collect { value ->
? ? ? ? ? ? println("Received value: $value")
? ? ? ? }
? ? }
}
fun divideByZero(value: Int): Int {
? ? return if (value != 0) {
? ? ? ? 10 / value
? ? } else {
? ? ? ? throw ArithmeticException("Divide by zero error")
? ? }
}
/*
Op =>
Received value: 10
Received value: 5
Received value: 3
Received value: -1
Caught exception: java.lang.ArithmeticException: Divide by zero error
*/
In this example, we have a flowOf function that emits three values: 1, 2, and 3. We use the map operator to perform a division operation, but we deliberately introduce a divide-by-zero error when the value is 0.
To handle the exception, we use the catch operator after the map operator. Inside the catch block, we emit a fallback value of -1 using the emit function. We also print the caught exception.
Finally, we collect the values emitted by the Flow using the collect function and print each received value.
When you run this code, you'll see that the exception is caught by the catch operator, and the fallback value of -1 is emitted. The flow continues to emit the remaining values, and the collected values are printed.
Note that the catch operator can be placed at different positions within the Flow pipeline based on your specific requirements. It allows you to handle exceptions at different stages and define fallback behaviors accordingly.
By using these exception handling mechanisms, you can handle exceptions gracefully in Kotlin Flow and choose how to handle errors, provide fallback values, or propagate errors to downstream components.
Advantages of Kotlin Flow:
Disadvantages of Kotlin Flow:
Suitable Scenarios for using Kotlin Flow:
Here are some of the commonly used APIs of Flow in Kotlin:
By using Kotlin Flow, you can write concise and readable code that handles asynchronous streams of data in a reactive and non-blocking manner. It provides a powerful and flexible way to work with asynchronous data, making it easier to handle complex scenarios such as data processing, network requests, and UI updates in a reactive and efficient manner.
Thanks, for reading till end, please comment if any!.