Coroutines are not just about concurrency
Vadym Yaroshchuk
Kotlin Multiplatform Developer ? Android Specialist ? Experienced in Kotlin, Java, PHP ? Tutor, Mentor, and Course Creator
Every time you hear about Kotlin Coroutines, you probably think about an easy, concise, and performant solution for handling asynchronous tasks like, for example, network requests. But is that their only purpose? Let's consider the usages of Kotlin Coroutines beyond concurrency.
Kotlin Coroutines Primitives
Let's start by understanding, how Coroutines underlying mechanism works. If we take a look at Kotlin Coroutines Primitives, it's only about a few classes and functions:
That's all that we have in our kotlin-stdlib. But what are they used for? Let's dive deeper into how Kotlin Coroutines are designed to work.
Continuation
Continuation is just an interface with two members: context and resumeWith:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
What does it do and what its purpose? Continuation – is literally Coroutine in primary. It's like a buffed callback with ability to propagate additional information and provide useful result of execution to coroutine.
We haven't talked about CoroutineContext yet, let's consider only resumeWith for now.
resumeWith
Like any other callback, this is a function that is called when coroutine finishes its work. It uses kotlin.Result to propagate any exceptions that occur inside coroutine in a safe way.
So, we can literally create our concurrency logic using Continuation in the next way:
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
thread {
continuation.resumeWith(someBlockingRequest())
}
}
suspendCoroutine – is a bridge between kotlin coroutines code and non-coroutine-based code.
Or use existing asynchronous (pseudo-code):
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
apiService.getSomething()
// onSuccess & onFailure is a callback
.onSuccess(continuation::resume)
.onFailure(continuation::resumeWithException)
.executeAsync()
}
Continuation<in T>.resume(..) is the extension to avoid passing kotlin.Result every time.
So, we can not only implement our concurrency logic but use existing and make it work with Kotlin Coroutines.
startCoroutine
Also, we can start suspend functions from non-suspend contexts using startCoroutine. In Kotlin it's always used in the end if your main function is suspend.
kotlinx.coroutines also uses it to run coroutines, but the mechanism there is much harder, of course.
import kotlin.coroutines.*
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
// we will talk about it lower
override val coroutineContext = EmptyCoroutineContext
// called when coroutine finished its work
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
suspend fun a() {...}
But, of course, you can't just call suspend functions from kotlinx.coroutines when executing coroutines in such way.
CoroutineContext
Now we came to another member of Continuation – CoroutineContext. What is it for?
CoroutineContext is a provider that propagates needed data to the coroutine. In the real world, it's usually about passing parameters across complex chain of coroutines.
To be more clear, CoroutineContext in kotlinx.coroutines stands for structured concurrency .
Simple example
Let's create from our previous example for startCoroutine code with the ability to retrieve values from CoroutineContext:
import kotlin.coroutines.*
// define our container for data we need inside coroutine
// it should always inherit 'CoroutineContext.Element'
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
override val key = EXECUTION_CONTEXT_KEY
}
// define type-safe key using which we will get our value
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}
// define coroutine context that we will pass into Continuation
private val myCoroutineContext = object : CoroutineContext {
private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
)
override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
return values[key] as? E
}
// .. we omit other functions for simplicity
}
suspend fun a() {
// here we retrieve value from coroutine context
// coroutineContext is compiler intrinsic, can be called
// only from suspend world
val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
println(executionContext.someInt!!)
}
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
override val context: CoroutineContext = myCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
CoroutineContext.Element – is the abstract that is used for storing elements inside CoroutineContext
You can play around with this code here .
Real-project example
Let's imagine that we have our API service. Usually, we need to have some layer of authorization, so let's consider next example (as for such, I took gRPC):
// let's define an element that will persist in `CoroutineContext`
data class AuthorizationContext(
val accessHash: String?,
val provider: AuthorizationProvider,
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<AuthorizationContext>
override val key: CoroutineContext.Key<*> = Key
}
// now we define our interceptor
class AuthorizationInterceptor(
private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
companion object {
private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
}
override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return AuthorizationContext(
accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
provider = authorizationProvider,
)
}
}
CoroutineContext in kotlinx.coroutines is literally just a Map<K : CoroutineContext.Key, V : CoroutineContext.Element> (to be more accurate, it's ConcurrentHashMap on JVM, for example), same as it was in our example above. But, if we talk about kotlinx.coroutines, it's propagated to all children coroutines within the desired coroutine (we didn't have such a mechanism).
So, now we can get it in children coroutines:
suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
val authContext = coroutineContext[AuthorizationContext]
authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)
val userId = authContext.provider.provide(authContext.accessHash)
return block(userId)
}
Interesting fact: coroutineContext is the only property in Kotlin that has suspend modifier. ??
For gRPC, we also need to register our Interceptor and write our RPCs. But the idea of this solution for gRPC is simple – decouple logic and simplify developer experience.
领英推荐
For Java, gRPC uses ThreadLocal , so we can also consider CoroutineContext as an alternative to ThreadLocal. We cannot use ThreadLocal within coroutines, because usually coroutine is not linked to a specific thread (especially, when we talk about withContext). Coroutines are more likely to be resumed on another thread (in addition, different coroutines can run on a single thread).
But, doesn't it mean that the only reason why coroutines exist – is concurrency? Let me explain.
Sequence
One of the most common-place examples is the – kotlin.sequences.Sequence<T>. In brief, it's a lazy collection that iterates only when you start consuming elements. You can read about them more here .
If you ever looked at SequenceScope sources, it uses suspend functions under the hood:
@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
/**
* Yields a value to the [Iterator] being built and suspends
* until the next value is requested.
*/
public abstract suspend fun yield(value: T)
// ... other
}
@RestrictSuspension disallows consumers from calling non-members suspend functions.
So, the idea is elements are consumed lazily. You can use them as regular collections and take advantage of lazy iteration.
But how does it work under the hood? Let's take a look at implementation sources:
private typealias State = Int
private const val State_NotReady: State = 0
private const val State_ManyNotReady: State = 1
private const val State_ManyReady: State = 2
private const val State_Ready: State = 3
private const val State_Done: State = 4
private const val State_Failed: State = 5
private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {
private var state = State_NotReady
private var nextValue: T? = null
private var nextIterator: Iterator<T>? = null
var nextStep: Continuation<Unit>? = null
override fun hasNext(): Boolean {
while (true) {
when (state) {
State_NotReady -> {}
State_ManyNotReady ->
if (nextIterator!!.hasNext()) {
state = State_ManyReady
return true
} else {
nextIterator = null
}
State_Done -> return false
State_Ready, State_ManyReady -> return true
else -> throw exceptionalState()
}
state = State_Failed
val step = nextStep!!
nextStep = null
step.resume(Unit) // IMPORTANT: it starts executing next yield
}
}
override fun next(): T {
when (state) {
State_NotReady, State_ManyNotReady -> return nextNotReady()
State_ManyReady -> {
state = State_ManyNotReady
return nextIterator!!.next()
}
State_Ready -> {
state = State_NotReady
@Suppress("UNCHECKED_CAST")
val result = nextValue as T
nextValue = null
return result
}
else -> throw exceptionalState()
}
}
private fun nextNotReady(): T {
if (!hasNext()) throw NoSuchElementException() else return next()
}
private fun exceptionalState(): Throwable = when (state) {
State_Done -> NoSuchElementException()
State_Failed -> IllegalStateException("Iterator has failed.")
else -> IllegalStateException("Unexpected state of the iterator: $state")
}
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow() // just rethrow exception if it is there
state = State_Done
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
}
COROUTINE_SUSPENDED is a special constant used internally by the Kotlin compiler to manage coroutine suspension and resumption. It's not something developers typically interact with directly, but rather, it serves as an internal signal within the coroutine machinery.
Looks a bit hard to read, isn't it? Let's go step by step:
To finish my explanation, let's just end with how could we make the same functionality just by using callbacks:
In the end, it doesn't look that complex, am I right?
DeepRecursiveScope
Now, let's discuss another case for Kotlin Coroutines – DeepRecursiveScope. As you probably know, usually when specific function call itself, we have a probability of running into StackOverflowError as every call contributes it to our stack.
For the same purpose, for example, also exists tailrec language construction. The difference is that tailrec cannot have branching (conditional checks) with calls to other functions.
So, DeepRecursiveScope doesn't rely on traditional stack flow, but uses all the features Coroutines offer. To understand it more, let's consider classic example with fibonacci numbers:
val fibonacci = DeepRecursiveFunction<Int, Int> { x ->
when {
x < 0 -> 0
x == 1 -> 1
else -> callRecursive(x - 1) + callRecursive(x - 2)
}
}
println(fibonacci(12)) // output: 144
For more complex example, you can refer to the kdoc .
We won't stop on exact implementation details of DeepRecursiveScope (you can take a look at it here ) as it has the same idea to Sequence with additional behaviour to support mechanisms that is provided, but let's discuss how Kotlin Coroutines solves this particular problem. Furthermore, there's very good article about it from Roman Elizarov.
Coroutines internally
How exactly it solves the problem? As I mentioned before, Coroutines are inspired by CPS (Continuation Passing Style) , but it's not exactly what Kotlin Compiler does to handle coroutines that efficiently.
Kotlin Compiler uses a combination of optimizations to manage coroutines stack and execution efficiently. Let's check what exactly it does:
The exact mechanism of coroutines is next:
Stack restoring also helps us with resuming on different threads as they don't know anything about our coroutine's stack.
So, from now we can understand how coroutines internally work. Let's move on to other examples where Kotlin coroutines are used beyond concurrency.
Jetpack Compose
If you ever worked with Compose and, for example, handling pointer events, you have probably mentioned that there're some hacks used from Coroutines for listening to updates:
@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
// ...
suspend fun awaitPointerEvent(
pass: PointerEventPass = PointerEventPass.Main
): PointerEvent
suspend fun <T> withTimeoutOrNull(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T? = block()
suspend fun <T> withTimeout(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T = block()
// ...
}
So, as you see, scope that is used to handle pointer events is marked with @RestrictsSuspension. If we come to the documentation provided, we'll see next:
This is a restricted suspension scope. Code in this scope is
always called un-dispatched and may only suspend for calls
to [awaitPointerEvent]. These functions resume synchronously and the caller may mutate the result
**before** the next await call to affect the next stage of the input processing pipeline.
awaitPointerEvent is handled using kotlin primitives, without kotlinx.coroutines. As in such situation we don't need any kotlinx.coroutines logic (it's fairly just a callback that is called from Main-looper thread after user's action).
Conclusion
In conclusion, this article has explored various facets of Kotlin Coroutines, emphasizing their versatility beyond traditional concurrency tasks. We've delved into the inner workings of coroutines primitives, discussed their use in Sequences and complex problem-solving scenarios like deep recursion, and examined real-world examples that showcase their broad applicability. The title, "Coroutines are not just about concurrency," aptly reflects the diverse capabilities that Kotlin coroutines offer in modern software development.
Feel free to casually drop your expertise in the water cooler chat!