Relation b/w CoRoutine ,Thread & Threadpool
Amit Nadiger
Polyglot(Rust??, Move, C++, C, Kotlin, Java) Blockchain, Polkadot, UTXO, Substrate, Sui, Aptos, Wasm, Proxy-wasm,AndroidTV, Dvb, STB, Linux, Cas, Engineering management.
In a coroutine-based program, multiple coroutines can run on the same thread or different threads, depending on the concurrency model and the scheduling strategy being used.
For example, in a single-threaded coroutine context like Dispatchers.Main on Android, all coroutines will be executed on the same UI thread. This ensures that the UI remains responsive and avoids race conditions and synchronization issues.
On the other hand, in a multi-threaded context like Dispatchers.IO, multiple coroutines can run on different threads in a thread pool. This allows for parallel execution of IO-bound tasks without blocking the main thread or other coroutines.
Similarly, a single coroutine can be suspended and resumed multiple times on different threads, depending on how its execution is scheduled and how long it takes to complete. This allows for efficient use of resources and improves the overall throughput of the system.
The relationship between coroutines and threads is complex. One coroutine can run on multiple threads, and one thread can run multiple coroutines. The scheduling of coroutines and threads is managed by the operating system and the coroutine runtime.
When a coroutine is created, it is scheduled to run on the current thread of execution. When the coroutine is suspended, the coroutine runtime determines which coroutine should be run next. This decision is based on a variety of factors, such as the priority of the coroutines and the amount of time that each coroutine has been running.
When a coroutine is resumed, it continues executing on the thread that it was previously running on. However, if that thread is no longer available (e.g., it has been blocked on I/O), the coroutine may be scheduled to run on a different thread. The decision of which thread to run the coroutine on is made by the coroutine runtime.
Which Thread Pools does Coroutine in Kotlin use?
Coroutines in Kotlin use a thread pool called a "dispatcher" to execute the coroutines. When you launch a coroutine, it gets scheduled to run on a dispatcher, which manages a pool of threads to execute the coroutine code. By default, Kotlin provides several built-in dispatchers that you can use without explicitly creating a thread pool.
Dispatcher?– In Kotlin, a "dispatcher" is an object that is responsible for scheduling the execution of coroutines and other asynchronous tasks. A dispatcher can be thought of as a "virtual thread" that is used to execute coroutines.
Dispatcher determines what thread or threads the corresponding coroutine uses for its execution. With the dispatcher, we can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined. Kotlin maintains a pool of active threads and manages how coroutines are assigned to those threads. When an active coroutine is suspended it is saved by the Kotlin runtime and another coroutine resumed to take its place. When the coroutine is resumed, it is simply restored to an existing unoccupied thread within the pool to continue executing until it either completes or is suspended. Using this approach, a limited number of threads are used efficiently to execute asynchronous tasks with the potential to perform large numbers of concurrent tasks without the inherent performance degenerations that would occur using standard multithreading.
Each dispatcher is associated with a specific thread or thread pool, and the coroutines that are run using that dispatcher will be executed in that thread or thread pool. For example, the?dispatchers.Main?dispatcher is associated with the main thread of the application, and the?dispatchers.IO?dispatcher is associated with a shared pool of threads that are optimized for I/O operations.
The?dispatchers.Default?dispatcher is associated with a shared background pool of threads that can be used for CPU-bound or blocking operations. The size of the thread pool used by the?Default?dispatcher can be configured using the?-kotlinx.coroutines.default.parallelism?JVM system property.
More about kotlinx.coroutines.default.parallelism
The kotlinx.coroutines.default.parallelism property controls the default level of parallelism for coroutines in Kotlin.
Parallelism refers to the number of threads that can execute coroutines simultaneously. By default, the value of kotlinx.coroutines.default.parallelism is equal to the number of available CPU cores, which means that coroutines can be executed concurrently on different threads, maximizing the use of CPU resources.
Changing the value of kotlinx.coroutines.default.parallelism can affect the performance and behavior of your coroutines. A higher value can potentially improve the throughput of your application by allowing more coroutines to execute concurrently, but it can also increase the overhead of creating and switching between threads. Conversely, a lower value can reduce the overhead of thread creation and context switching, but may result in fewer coroutines being executed concurrently and potentially lower throughput.
In general, it's a good idea to leave the default value unless you have specific performance requirements or have done careful testing to determine the optimal value for your use case.
How to set and get the kotlinx.coroutines.default.parallelism ?
java -Dkotlinx.coroutines.default.parallelism=4 -jar your-application.jar
2. Set the system property programmatically in your application code:
System.setProperty("kotlinx.coroutines.default.parallelism", "4")
Note that this should be done before any coroutines are launched.
3. Set the system property in your build tool configuration. For example, if you are using Gradle, you can set the system property in the build.gradle file:
tasks.withType(Test) { systemProperty "kotlinx.coroutines.default.parallelism", "4" }
The above will set the system property when running tests.
How to get the system property defaultParallelism
import kotlinx.coroutines.*
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
fun main() {
val defaultParallelism = Executors.newFixedThreadPool(1).asCoroutineDispatcher().use {
// use the ThreadPoolExecutor as a coroutine dispatcher to get the default parallelism
runBlocking(it) { (it.executor as ThreadPoolExecutor).maximumPoolSize }
}
println("The default parallelism is $defaultParallelism")
}
/*
Op => 2023-03-15 11:35:04.811 14834-14834 System.out? The default parallelism is 1
*/
In the example above , we create a new fixed-size thread pool with one thread using Executors.newFixedThreadPool(1). Then, we convert this thread pool to a coroutine dispatcher using the asCoroutineDispatcher() extension function. We use the executor property of the dispatcher to obtain the underlying ThreadPoolExecutor instance and then use the maximumPoolSize property to obtain the default parallelism. The runBlocking function is used to execute the coroutine in the context of the dispatcher. The result is printed to the console.
It's important to note that changing the default parallelism can have a significant impact on the performance and behavior of your coroutines, and it should be done carefully and with proper testing.
To run a coroutine using a specific dispatcher, you can specify the dispatcher as the?context?parameter of the?launch?or?async?functions:
When I say coroutines can run on more than one thread , how to decide which thread they can run on:
CoroutinesDispatchers?comes to rescue:
Dispatchers specify where the coroutines should run i.e Dispatches work to the appropriate thread.
1.Dispatchers .Main: This dispatcher runs coroutines on the main thread of the Android UI, making it suitable for updating the user interface or performing other UI-related tasks. Suitable for main thread on Android UI and perform light work. It uses a thread pool with a single thread, and is designed to ensure that UI updates are executed on the main thread.
2. Dispatchers .IO: This dispatcher uses a larger pool of threads optimized for IO-bound tasks. It is recommended for running blocking IO operations such as reading from or writing to a file, or making network requests.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.File
suspend fun readFile(filename: String): String {
return withContext(Dispatchers.IO) {
File(filename).readText()
}
}
In the above example, the withContext function is used to switch to the Dispatchers.IO dispatcher before reading the contents of the file using the File(filename).readText() function, which is a blocking I/O operation. Once the operation is complete, the result is returned to the calling coroutine.
领英推荐
3. Dispatchers .Default: This dispatcher uses a shared background pool of threads to execute coroutines that are not CPU-bound or blocking.
4. Dispatchers.Unconfined : is a coroutine dispatcher that is designed to run coroutines without any specific context or thread.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
fun main() = runBlocking<Unit> {
launch(Dispatchers.Unconfined) {
val result = expensiveComputation()
println("Result: $result")
}
}
suspend fun expensiveComputation(): Int {
// Perform a long-running computation
delay(1000)
return 42
}
In the above example, the expensiveComputation function performs a long-running computation and returns a result. The main function launches a coroutine on Dispatchers.Unconfined, which starts executing the expensiveComputation function immediately in the caller thread. Once the computation is complete, the result is printed to the console.
It is important to note that Dispatchers.Unconfined should be used with caution, as it can lead to unexpected behavior if used incorrectly. In general, it is recommended to use a more specific dispatcher, such as Dispatchers.Default or Dispatchers.IO, unless you have a specific use case for Dispatchers.Unconfined.
5. You can also create your own custom dispatchers if needed, using the?CoroutineDispatcher?interface. However, in most cases, you can rely on the built-in dispatchers provided by Kotlin.
Below is copied from the Kotlin site:
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {?
? ? launch { // context of the parent, main runBlocking coroutine?
? ? ? ? println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")?
? ? }?
? ? launch(Dispatchers.Unconfined) { // not confined -- will work with main thread?
? ? ? ? ?println("Unconfined : I'm working in thread ${Thread.currentThread().name}")?
? ? }?
? ? launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
? ? ? ? println("Default ? ? ? ? ? ? ? : I'm working in thread ${Thread.currentThread().name}")
? ? }
? ? launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
? ? ? ? println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
? ? }
}
O/P :
Unconfined?: I'm working in thread main
Default?: I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking?: I'm working in thread main
Example 1:
//Example1:
import kotlinx.coroutines.*
fun main() {
? ? val deferred = GlobalScope.async(Dispatchers.IO) {
? ? ? ? // This coroutine will be run in the IO thread pool
? ? ? ? someSuspendingFunction()
? ? }
? ? // Use the result of the coroutine when it completes
? ? val result = deferred.await()
}
Example2:
import kotlinx.coroutines.*
fun main() {
? ? GlobalScope.launch(Dispatchers.Main) {
? ? ? ? // This coroutine will be run in the main thread
? ? ? ? val result = someSuspendingFunction()
? ? ? ? // Use the result of the coroutine here
? ? }
}
It is important to choose the appropriate dispatcher for the task at hand. Using the wrong dispatcher can degrade the performance of the application or cause the program to behave unexpectedly. For example, using the?dispatchers.IO?dispatcher for CPU-bound operations can lead to thread starvation, and using the?dispatchers.Main?dispatcher for blocking operations can cause the user interface to become unresponsive.
In addition to the built-in dispatchers, Kotlin also provides a way to create custom dispatchers using the?Dispatchers.newSingleThreadContext?function, which creates a new dispatcher that uses a single thread. This can be useful for tasks that need to be executed in a specific order, or that have thread-specific state:
Example1:
import kotlinx.coroutines.*
fun main() {
? ? val customDispatcher = Dispatchers.newSingleThreadContext("MyCustomDispatcher")
? ? val deferred = GlobalScope.async(customDispatcher) {
? ? ? ? // This coroutine will be run in a single, dedicated thread
? ? ? ? someSuspendingFunction()
? ? }
? ? // Use the result of the coroutine when it completes
? ? val result = deferred.await()
}
Example2:
import kotlinx.coroutines.*
fun main() {
? ? val customDispatcher = Dispatchers.newSingleThreadContext("MyCustomDispatcher")
? ? GlobalScope.launch(customDispatcher) {
? ? ? ? // This coroutine will be run in a single, dedicated thread
? ? ? ? val result = someSuspendingFunction()
? ? ? ? // Use the result of the coroutine here
? ? }
}
It is important to note that custom dispatchers, like all dispatchers, are not tied to a specific thread or process. They are simply a way to schedule the execution of coroutines, and the actual threads used to execute the coroutines may vary over time.
Custom Dispatcher :
Below is an example for:
import kotlinx.coroutines.*
import java.util.concurrent.Executors
fun main() = runBlocking {
// Create a custom thread pool with 4 threads
val customThreadPool = Executors.newFixedThreadPool(4)
// Create a coroutine context with the custom thread pool
val customCoroutineContext = customThreadPool.asCoroutineDispatcher()
// Create a coroutine scope with the custom context
val customCoroutineScope = CoroutineScope(customCoroutineContext)
// Launch multiple coroutines that can execute from different threads in the pool
val jobs = List(10) {
customCoroutineScope.launch {
println("Coroutine $it running on thread ${Thread.currentThread().name}")
delay(1000)
println("Coroutine $it completed on thread ${Thread.currentThread().name}")
}
}
// Wait for all coroutines to complete
jobs.forEach { it.join() }
// Shutdown the thread pool
customThreadPool.shutdown()
}
/*
?O/P =>
Coroutine 0 running on thread pool-1-thread-1
?Coroutine 2 running on thread pool-1-thread-3
?Coroutine 4 running on thread pool-1-thread-3
?Coroutine 5 running on thread pool-1-thread-3
?Coroutine 6 running on thread pool-1-thread-3
?Coroutine 7 running on thread pool-1-thread-3
?Coroutine 8 running on thread pool-1-thread-3
?Coroutine 9 running on thread pool-1-thread-3
?Coroutine 1 running on thread pool-1-thread-2
?Coroutine 3 running on thread pool-1-thread-4
?Coroutine 0 completed on thread pool-1-thread-3
?Coroutine 2 completed on thread pool-1-thread-2
?Coroutine 4 completed on thread pool-1-thread-2
?Coroutine 6 completed on thread pool-1-thread-2
?Coroutine 7 completed on thread pool-1-thread-4
?Coroutine 8 completed on thread pool-1-thread-4
?Coroutine 9 completed on thread pool-1-thread-4
?Coroutine 5 completed on thread pool-1-thread-1
?Coroutine 1 completed on thread pool-1-thread-4
?Coroutine 3 completed on thread pool-1-thread-3
*/
In above program,
Please note above o/p: Coroutines are started in some thread , but they are completed on different threads .Example below :
Coroutine 0 running on thread pool-1-thread-1
Coroutine 0 completed on thread pool-1-thread-3
Why switching of thread happened?
When a coroutine encounters a?delay?function, it suspends its execution and releases the underlying thread. The coroutine is then added back to the coroutine queue, waiting for the specified delay time to expire. Once the delay time has elapsed, the coroutine is resumed and may execute on any available thread from the thread pool.
The coroutine dispatcher decides which thread the coroutine should resume on. By default, the?Dispatchers.Default?dispatcher is used, which uses a shared pool of threads. This can lead to coroutines executing on different threads from where they were originally started, which can cause synchronization issues.
In general, it is recommended to use the built-in dispatchers (such as?dispatchers.Main,?dispatchers.Default, and?dispatchers.IO) unless you have a specific need to create a custom dispatcher. This can help to ensure that the application is scalable and performs well on different devices and platforms.
Advantages to creating a custom thread pool and attaching it to a custom coroutine scope.
Creating a custom thread pool and attaching it to a custom coroutine scope can help you optimize the performance of your application and manage resources more effectively.
Summary: Coroutines and threads are two abstractions that allow multiple pieces of code to be executed concurrently. Coroutines are lightweight and well-suited for use cases where large numbers of concurrent tasks need to be executed. Threads are more heavyweight but can execute code in parallel on separate cores of the CPU. Thread pools can be used to optimize the execution of concurrent tasks by allowing multiple coroutines to run on a shared pool of threads. The scheduling of coroutines and threads is complex.
I will be discussing communication between co-routines in next article.
Thanks for reading till end . Please comment if you have any !