Flow in Kotlin

Kotlin Flow is a reactive streams library introduced in Kotlin 1.3 that provides a declarative and composable way to work with asynchronous data streams. It is designed to handle asynchronous and potentially infinite sequences of values, allowing developers to write reactive, non-blocking, and asynchronous code in a more concise and intuitive manner.

Key Concepts of Kotlin Flow:

  1. Asynchronous Streams: Kotlin Flow models asynchronous data streams that emit multiple values over time. It can represent both finite and infinite sequences of values.
  2. Suspending Functions: Flow is built on top of suspending functions, which are functions that can be paused and resumed. This allows developers to write non-blocking code without using callbacks or explicit threading.
  3. Cold Streams: Flow is a cold stream, which means that the data stream is only activated when a terminal operator is applied. It starts emitting values when a collector is connected to it.
  4. Flow Builders: Kotlin Flow provides various flow builders to create data streams, including flowOf, asFlow, channelFlow, and more. These builders allow you to create flows from different data sources or define custom flow behavior.
  5. Operators and Transformations: Flow provides a rich set of operators and transformations that can be applied to manipulate and transform the emitted values. Some commonly used operators include map, filter, flatMap, zip, merge, and more.
  6. Flow Context: Flow maintains the context preservation property, which means that the context in which the flow was created is propagated to downstream operators. This allows for seamless context switching and integration with coroutine features such as withContext and coroutineScope.
  7. Flow Cancellation: Flow supports cancellation, allowing you to cancel the data stream at any point. This helps in managing resources and stopping unnecessary computation when it's no longer needed.
  8. Error Handling: Flow provides error handling mechanisms, such as catch and onEach, to handle exceptions that occur during the flow emission or processing. It allows you to gracefully handle errors and continue the flow execution.
  9. Integration with Coroutines: Kotlin Flow is tightly integrated with coroutines, allowing you to seamlessly combine flows with other coroutine features like async, await, and withTimeout. This integration provides a unified programming model for both synchronous and asynchronous code.


How is flow built internally?

Internally, Kotlin Flow is built on top of coroutines and leverages coroutine builders and suspending functions to achieve its functionality. Let's explore the internal components and mechanisms of Kotlin Flow:

  1. Flow Interface: The Flow interface is the core component of Kotlin Flow. It represents a stream of values that can be asynchronously produced and consumed. It provides functions for transforming, combining, and collecting the emitted values.
  2. Flow Builders: Kotlin Flow provides various flow builders to create flows from different data sources. Some common builders include flowOf, asFlow, channelFlow, and callbackFlow. These builders encapsulate the logic for emitting values and provide an interface for collecting the emitted values.
  3. Suspending Functions: Flow uses suspending functions (suspend fun) to define the transformation operations. These functions can perform asynchronous operations, such as making network requests or querying databases, and emit values using the emit function. Suspending functions allow Flow to suspend and resume execution, making it efficient and non-blocking.
  4. Operators: Kotlin Flow provides a set of operators, such as map, filter, transform, zip, and many more, to transform and combine flows. These operators allow you to apply various transformations to the emitted values, merge multiple flows, and control the flow of values.
  5. Coroutine Scope: Flows are executed within a coroutine scope. When you collect values from a flow using the collect function, it suspends the coroutine until new values are emitted. The coroutine scope ensures the execution context and lifecycle management of the flow.
  6. Coroutine Context: Flows are executed within a coroutine context that defines the execution context and thread pool. The coroutine context can be customized using dispatchers, such as Dispatchers.Default, Dispatchers.IO, or a custom dispatcher, to control the threading behavior of the flow.
  7. Flow Operators: Flow operators, such as collect, transform, map, and reduce, are implemented using suspending functions and coroutine builders. These operators allow for composing and chaining multiple flow operations in a declarative and sequential manner.
  8. Cancellation: Flows are cancellable, which means you can cancel the collection of values at any point using the coroutine cancellation mechanism. When a flow is cancelled, the execution of the flow is terminated, and any ongoing operations within the flow are cancelled, freeing up resources.

By leveraging coroutines and suspending functions, Kotlin Flow provides a powerful and concise way to work with asynchronous data streams. It enables developers to write sequential and non-blocking code that handles backpressure, cancellation, and error handling effectively.

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
? ? // Create a flow that emits a sequence of numbers
? ? val numberFlow: Flow<Int> = flow {
? ? ? ? for (i in 1..5) {
? ? ? ? ? ? delay(1000) // Simulate some asynchronous operation
? ? ? ? ? ? emit(i) // Emit the current number
? ? ? ? }
? ? }

? ? // Collect and print the numbers emitted by the flow
? ? numberFlow.collect { number ->
? ? ? ? println("Received: $number")
? ? }
}
/*
Op => 
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5 
*/        

In the above example, we create a Flow<Int> called numberFlow that emits a sequence of numbers from 1 to 5 with a delay of 1 second between each emission. The flow builder function is used to create the flow. Inside the flow block, we use the emit function to emit each number.

Next, we use the collect operator on the flow to consume and process the emitted values. The collect function is a suspending function that waits for each value emitted by the flow and performs the specified action. In this case, we simply print each received number.

Finally, we call the runBlocking function to start the main coroutine and wait for the flow collection to complete.

When you run the above code, you will see the numbers 1 to 5 printed with a 1-second delay between each number.

This example demonstrates the basic usage of Kotlin Flow, where you create a flow, emit values asynchronously, and collect the emitted values for further processing. You can apply various operators and transformations on the flow to manipulate the emitted values or combine multiple flows together.

Flow builders:

Let's dive into each of the Flow builders in Kotlin and explore their characteristics, usage scenarios, advantages, limitations, and more:

  1. flowOf:

  • Usage: flowOf is used to create a flow from a fixed set of values.
  • Scenario: It is suitable when you have a predefined set of values that you want to emit as a flow.
  • Advantage: It provides a convenient way to create a flow with a fixed set of values.
  • Limitation: Since the values are predetermined, it may not be suitable for scenarios where the values are dynamic or fetched asynchronously.

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
? ? val flow = flowOf(1, 2, 3, 4, 5)
? ?
? ? flow.collect { value ->
? ? ? ? println(value)
? ? }
}        


2. asFlow:

  • Usage: asFlow converts a collection or an iterable into a flow.
  • Scenario: It is useful when you have a collection or iterable and you want to process its elements asynchronously using flows.
  • Advantage: It allows you to easily convert existing collections or iterables into flows, enabling asynchronous processing.
  • Limitation: The entire collection is loaded into memory, so it may not be suitable for very large collections.


import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*


fun main(){
? ? val numbers = listOf(1, 2, 3, 4, 5)
? ? val scope = CoroutineScope(Dispatchers.IO+CoroutineExceptionHandler {_,e->
? ? ? ? println("catching global exception");
? ? })
? ??
? ? val flow = numbers.asFlow()
? ??
? ? scope.launch {
? ? ? ? flow.collect { value ->
? ? ? ? ? ? println(value)
? ? ? ? }
? ? }? ?
}        


3. channelFlow:

  • Usage: channelFlow is used to create a flow using a Channel for more complex scenarios.
  • Scenario: It is suitable when you need fine-grained control over flow emissions and want to use channels to handle custom flows.
  • Advantage: It provides flexibility and control over flow emissions using channels, enabling more complex flow scenarios.
  • Limitation: It requires a deeper understanding of channels and may introduce more complexity compared to other flow builders.

The below code creates a custom flow using channelFlow, emits values using the send() function, waits for the channel to close using awaitClose(), and collects the emitted values using collect(). It demonstrates how to create and use a custom flow with channels in Kotlin coroutines.

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
? ? val flow = channelFlow {
? ? ? ? for (i in 1..5) {
? ? ? ? ? ? send(i)
? ? ? ? }
? ? ? ? awaitClose()
? ? }
? ??
? ? flow.collect { value ->
? ? ? ? println(value)
? ? }
}        

The channelFlow function is used to create a custom flow that emits values using channels. It takes a lambda expression as a parameter, which serves as the body of the flow. Inside this lambda, we can emit values using the send() function and perform other operations.

The awaitClose() function is called after emitting all the values. It suspends the flow and waits until the channel is closed. It is typically used to perform cleanup or finalize operations.


4. callbackFlow:

  • Usage: callbackFlow creates a flow that emits values using callbacks.
  • Scenario: It is useful when you need to integrate with APIs that use callbacks and you want to transform them into flows.
  • Advantage: It allows you to convert callback-based APIs into flows, enabling a more streamlined and coroutine-friendly programming model.
  • Limitation: It requires manual handling of callbacks and may introduce additional complexity in dealing with asynchronous callback-based APIs.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow

fun fetchUserData(callback: (Result<User>) -> Unit) {
? ? // Simulating an asynchronous API call
? ? GlobalScope.launch {
? ? ? ? delay(1000) // Simulating a delay
? ? ? ? val user = User("Amit Nadiger", 40)
? ? ? ? val result = Result.success(user)
? ? ? ? callback(result)
? ? }
}

data class User(val name: String, val age: Int)

fun main() = runBlocking {
? ? val userDataFlow = callbackFlow<Result<User>> {
? ? ? ? val callback: (Result<User>) -> Unit = { result ->
? ? ? ? ? ? // Offer the result to the flow
? ? ? ? ? ? trySend(result).isSuccess
? ? ? ? }
? ? ? ??
? ? ? ? // Register the callback
? ? ? ? fetchUserData(callback)
? ? ? ??
? ? ? ? // Unregister the callback when the flow collection is done
? ? ? ? awaitClose {
? ? ? ? ? ? // Perform any cleanup operations here
? ? ? ? ? ? // Unregister the callback
? ? ? ? }
? ? }

? ? userDataFlow.collect { result ->
? ? ? ? if (result.isSuccess) {
? ? ? ? ? ? val user = result.getOrNull()
? ? ? ? ? ? println("User: $user")
? ? ? ? } else {
? ? ? ? ? ? val exception = result.exceptionOrNull()
? ? ? ? ? ? println("Error: $exception")
? ? ? ? }
? ? }
}        


Overall, these flow builders provide different approaches to create flows in Kotlin, catering to various scenarios and requirements. It's important to choose the appropriate builder based on the characteristics of your data source, the desired flow behavior, and the level of control you need over flow emissions.

What is emit ?

emit() is a suspending function provided by the Kotlin coroutines library to emit elements from a suspending function or a coroutine. It is used with a Flow object to emit values from the flow. When called, emit() sends a value to the downstream collectors of the Flow.

Here's an example:

fun getNumbers(): Flow<Int> = flow {
? ? for (i in 1..10) {
? ? ? ? emit(i)
? ? }
}

fun main() = runBlocking {
? ? getNumbers().collect { value -> println(value) }
}        

In the above example, getNumbers() is a Flow that emits values from 1 to 10 using the emit() function. The collect() function is used to collect the emitted values from the flow and print them to the console. When collect() is called, it suspends the coroutine and waits for values to be emitted by the Flow. The emit() function is called within the flow builder to emit each value to the collector.

Please note that emit is a suspending function provided by the Flow class in Kotlin Coroutines. It is used to emit a single value to a downstream collector.

You cannot use emit without a Flow. However, you can use it in a regular function that is not a Flow, as long as the function is marked as suspend and you have a FlowCollector object to emit values. For example:

suspend fun printNumbers() {
? ? val numbers = listOf(1, 2, 3, 4, 5)
? ? numbers.forEach {
? ? ? ? emit(it)
? ? }
}        

Here, printNumbers() is a suspending function that can emit values using emit(), but it is not a Flow. You can use this function as a source for a Flow like this:

val flow = flow {
? ? printNumbers()
}        

In this way, the printNumbers() function becomes a source for the Flow and the values it emits can be collected downstream.

Example of Flow for ICC:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.* 

fun main() = runBlocking<Unit> {
    val flow = flow {
        for (i in 1..5) {
            emit(i)
            delay(1000)
        }
    }

    launch {
        flow.collect { value ->
            println("Coroutine 2 received $value")
        }
    }

    println("Coroutine 1 started")
    delay(2000)
    println("Coroutine 1 resuming")

    flow.collect { value ->
        println("Coroutine 1 received $value")
    }

    println("Coroutine 1 completed")
}
/*
Op =>
Coroutine 1 started
Coroutine 2 received 1
Coroutine 2 received 2
Coroutine 1 resuming
Coroutine 1 received 1
Coroutine 2 received 3
Coroutine 1 received 2
Coroutine 2 received 4
Coroutine 1 received 3
Coroutine 2 received 5
Coroutine 1 received 4
Coroutine 1 received 5
Coroutine 1 completed
*/        

In the above example, we define a flow that emits the numbers 1 through 5 with a delay of 1 second between each value. We then launch two coroutines. The first coroutine collects the values emitted by the flow and prints them to the console. The second coroutine simply prints a message to indicate that it has started and resumed after a delay.

Based on the output we can see, the flow emits values to both coroutines, but they receive them in a different order. Coroutine 2 receives all values before coroutine 1, because it starts collecting before coroutine 1 resumes.

This is just a simple example, but it demonstrates the basics of using Flow to communicate between coroutines. You can use Flow to emit any kind of data stream, and you can collect the data in any way that suits your needs.

Usage of flow in Android Datastore :

DataStore in Android provides an asynchronous API using Kotlin Flow to achieve async functionality. Here's how it works:

  1. Flow-based API: DataStore exposes an API that allows you to read and write data asynchronously using Kotlin Flow. Kotlin Flow is a reactive streams library that provides a declarative and non-blocking way to handle asynchronous operations.
  2. Flow emission: When you read data from DataStore, it returns a Flow that emits the current stored value and continues to emit any future updates to that value. The Flow is cold, meaning it doesn't start emitting values until it is collected by a collector.
  3. Collecting Flow: To consume the emitted values from the Flow, you need to collect the Flow using a collector. The collector can be a suspending function or a coroutine, allowing you to handle the emitted values in an asynchronous manner.
  4. Asynchronous updates: When you write data to DataStore, it updates the stored value asynchronously. Any observers that are collecting the corresponding Flow will be notified of the updated value through emission. This allows you to react to data changes in an asynchronous and non-blocking way.

By using Kotlin Flow with DataStore, you can benefit from the following async functionalities:

  • Non-blocking operations: Kotlin Flow allows you to perform non-blocking operations on the data stored in DataStore. This means you can read and write data without blocking the main thread, ensuring smooth app performance and responsiveness.
  • Asynchronous updates: When the data stored in DataStore is modified, the corresponding Flow emits the updated value asynchronously. This allows you to observe and react to data changes in real-time without blocking the UI thread.
  • Concurrent data access: Kotlin Flow with DataStore supports concurrent data access. Multiple observers can collect the Flow simultaneously, allowing different parts of your application to independently react to data changes.

Exception handling with flow :

Exception handling in Kotlin Flow is similar to handling exceptions in regular Kotlin code. When working with Flow, you can use operators such as catch, onEach, and collect to handle exceptions and propagate them downstream. Here's an explanation of how exceptions are handled in Kotlin Flow:

  1. catch operator: The catch operator in Kotlin Flow allows you to handle exceptions that occur during the emission or processing of elements in the flow. You can use it to catch specific exceptions or to provide a fallback value or emit a specific error state when an exception occurs. The catch operator returns a new Flow that can continue emitting elements or handle the exception in a specific way.
  2. onEach operator: The onEach operator allows you to perform some action on each emitted element of the Flow. You can use it to check for exceptions or perform additional processing. Inside the onEach block, you can use try-catch to catch and handle exceptions that may occur during the processing of each element.
  3. collect function: The collect function is used to collect and consume elements emitted by the Flow. It is typically used at the end of the Flow chain. When using collect, you can wrap the code in a try-catch block to handle exceptions that may occur during the collection process.

Here's an example that demonstrates exception handling in Kotlin Flow:

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.* 

fun main() = runBlocking {
? ? val flow = flowOf(1, 2, 3,0,8)
? ? ? ? .map { divideByZero(it) }
? ? ? ? .catch { exception ->
? ? ? ? ? ? emit(-1) // Emit fallback value on exception
? ? ? ? ? ? println("Caught exception: $exception")
? ? ? ? }

? ? launch {
? ? ? ? flow.collect { value ->
? ? ? ? ? ? println("Received value: $value")
? ? ? ? }
? ? } 
}

fun divideByZero(value: Int): Int {
? ? return if (value != 0) {
? ? ? ? 10 / value
? ? } else {
? ? ? ? throw ArithmeticException("Divide by zero error")
? ? }
}

/*
Op => 
Received value: 10
Received value: 5
Received value: 3
Received value: -1
Caught exception: java.lang.ArithmeticException: Divide by zero error 
*/        

In this example, we have a flowOf function that emits three values: 1, 2, and 3. We use the map operator to perform a division operation, but we deliberately introduce a divide-by-zero error when the value is 0.

To handle the exception, we use the catch operator after the map operator. Inside the catch block, we emit a fallback value of -1 using the emit function. We also print the caught exception.

Finally, we collect the values emitted by the Flow using the collect function and print each received value.

When you run this code, you'll see that the exception is caught by the catch operator, and the fallback value of -1 is emitted. The flow continues to emit the remaining values, and the collected values are printed.

Note that the catch operator can be placed at different positions within the Flow pipeline based on your specific requirements. It allows you to handle exceptions at different stages and define fallback behaviors accordingly.

By using these exception handling mechanisms, you can handle exceptions gracefully in Kotlin Flow and choose how to handle errors, provide fallback values, or propagate errors to downstream components.

Advantages of Kotlin Flow:

  1. Asynchronous Stream Processing: Kotlin Flow provides a convenient way to work with asynchronous streams of data. It allows you to handle asynchronous operations in a sequential and non-blocking manner.
  2. Coroutines Integration: Kotlin Flow is built on top of coroutines, which provides structured concurrency and allows for easy integration with existing coroutine-based code.
  3. Backpressure Support: Flow supports backpressure, which enables flow control to handle scenarios where the producer emits data faster than the consumer can process it. This helps in preventing resource exhaustion and maintaining optimal performance.
  4. Error Handling: Flow provides built-in error handling mechanisms that allow you to handle exceptions and errors emitted by the flow in a structured way.
  5. Operator and Transformation Support: Kotlin Flow offers a wide range of operators and transformations that allow you to manipulate, combine, and transform the emitted values in a declarative and functional programming style.

Disadvantages of Kotlin Flow:

  1. Learning Curve: Kotlin Flow introduces some new concepts and syntax compared to traditional synchronous programming. It may require some learning and understanding of coroutines and flow operators to effectively use and work with Kotlin Flow.
  2. Requires Coroutine Support: Kotlin Flow relies on coroutines, so it requires the use of coroutines in your project. If you're not already using coroutines, you need to set up and understand coroutines before working with Kotlin Flow.
  3. Limited Backward Compatibility: Kotlin Flow was introduced in Kotlin 1.3, so it may not be fully backward compatible with older versions of Kotlin. If you're working on a project with an older Kotlin version, you may need to update your codebase to leverage Kotlin Flow.

Suitable Scenarios for using Kotlin Flow:

  1. Asynchronous Data Streams: Kotlin Flow is well-suited for handling asynchronous data streams such as network requests, database queries, and event-driven systems where data is produced and consumed asynchronously.
  2. Reactive Programming: If you're following a reactive programming style, Kotlin Flow provides a convenient and expressive way to handle and process streams of data in a functional and declarative manner.
  3. Concurrency and Parallelism: Kotlin Flow, combined with coroutines, allows for easy concurrency and parallelism. It simplifies handling concurrent tasks, performing parallel processing, and coordinating multiple asynchronous operations.
  4. Data Transformation and Filtering: Flow's operators and transformations make it easy to transform, filter, and combine data streams. This is useful for scenarios where you need to perform data manipulation or implement complex data processing pipelines.

Here are some of the commonly used APIs of Flow in Kotlin:

  1. flowOf: This is used to create a flow that emits a fixed set of values.
  2. asFlow: This is used to convert any iterable or sequence into a flow.
  3. map: This is used to transform the emitted values of a flow by applying a function to each value.
  4. filter: This is used to filter the emitted values of a flow based on a predicate.
  5. transform: This is used to transform each value emitted by a flow into a new flow.
  6. onEach: This is used to perform a side effect on each value emitted by a flow.
  7. catch: This is used to handle any exceptions that occur while collecting values from a flow.
  8. collect: This is used to start collecting values from a flow and perform some action on each emitted value.
  9. flowOn: This is used to specify the context in which the flow should run.
  10. zip: This is used to combine two flows into a single flow of pairs, where each pair contains the corresponding values emitted by the two input flows.
  11. combine: This is used to combine two flows into a single flow of values, where each value is computed by applying a given function to the most recent values emitted by the two input flows.
  12. flattenConcat: This is used to flatten a flow of flows into a single flow, where each value is emitted in the order it appears in the input flows.
  13. flattenMerge: This is used to flatten a flow of flows into a single flow, where values are emitted as soon as they become available from any of the input flows.
  14. buffer: This is used to buffer emitted values to reduce the overhead of downstream operators that block.
  15. distinctUntilChanged: This is used to suppress consecutive duplicates emitted by a flow.


By using Kotlin Flow, you can write concise and readable code that handles asynchronous streams of data in a reactive and non-blocking manner. It provides a powerful and flexible way to work with asynchronous data, making it easier to handle complex scenarios such as data processing, network requests, and UI updates in a reactive and efficient manner.


Thanks, for reading till end, please comment if any!.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了