Relation b/w CoRoutine ,Thread & Threadpool

In a coroutine-based program, multiple coroutines can run on the same thread or different threads, depending on the concurrency model and the scheduling strategy being used.

For example, in a single-threaded coroutine context like Dispatchers.Main on Android, all coroutines will be executed on the same UI thread. This ensures that the UI remains responsive and avoids race conditions and synchronization issues.

On the other hand, in a multi-threaded context like Dispatchers.IO, multiple coroutines can run on different threads in a thread pool. This allows for parallel execution of IO-bound tasks without blocking the main thread or other coroutines.

Similarly, a single coroutine can be suspended and resumed multiple times on different threads, depending on how its execution is scheduled and how long it takes to complete. This allows for efficient use of resources and improves the overall throughput of the system.

The relationship between coroutines and threads is complex. One coroutine can run on multiple threads, and one thread can run multiple coroutines. The scheduling of coroutines and threads is managed by the operating system and the coroutine runtime.

When a coroutine is created, it is scheduled to run on the current thread of execution. When the coroutine is suspended, the coroutine runtime determines which coroutine should be run next. This decision is based on a variety of factors, such as the priority of the coroutines and the amount of time that each coroutine has been running.

When a coroutine is resumed, it continues executing on the thread that it was previously running on. However, if that thread is no longer available (e.g., it has been blocked on I/O), the coroutine may be scheduled to run on a different thread. The decision of which thread to run the coroutine on is made by the coroutine runtime.

Which Thread Pools does Coroutine in Kotlin use?

Coroutines in Kotlin use a thread pool called a "dispatcher" to execute the coroutines. When you launch a coroutine, it gets scheduled to run on a dispatcher, which manages a pool of threads to execute the coroutine code. By default, Kotlin provides several built-in dispatchers that you can use without explicitly creating a thread pool.

Dispatcher?– In Kotlin, a "dispatcher" is an object that is responsible for scheduling the execution of coroutines and other asynchronous tasks. A dispatcher can be thought of as a "virtual thread" that is used to execute coroutines.

Dispatcher determines what thread or threads the corresponding coroutine uses for its execution. With the dispatcher, we can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined. Kotlin maintains a pool of active threads and manages how coroutines are assigned to those threads. When an active coroutine is suspended it is saved by the Kotlin runtime and another coroutine resumed to take its place. When the coroutine is resumed, it is simply restored to an existing unoccupied thread within the pool to continue executing until it either completes or is suspended. Using this approach, a limited number of threads are used efficiently to execute asynchronous tasks with the potential to perform large numbers of concurrent tasks without the inherent performance degenerations that would occur using standard multithreading.

Each dispatcher is associated with a specific thread or thread pool, and the coroutines that are run using that dispatcher will be executed in that thread or thread pool. For example, the?dispatchers.Main?dispatcher is associated with the main thread of the application, and the?dispatchers.IO?dispatcher is associated with a shared pool of threads that are optimized for I/O operations.

The?dispatchers.Default?dispatcher is associated with a shared background pool of threads that can be used for CPU-bound or blocking operations. The size of the thread pool used by the?Default?dispatcher can be configured using the?-kotlinx.coroutines.default.parallelism?JVM system property.

More about kotlinx.coroutines.default.parallelism

The kotlinx.coroutines.default.parallelism property controls the default level of parallelism for coroutines in Kotlin.

Parallelism refers to the number of threads that can execute coroutines simultaneously. By default, the value of kotlinx.coroutines.default.parallelism is equal to the number of available CPU cores, which means that coroutines can be executed concurrently on different threads, maximizing the use of CPU resources.

Changing the value of kotlinx.coroutines.default.parallelism can affect the performance and behavior of your coroutines. A higher value can potentially improve the throughput of your application by allowing more coroutines to execute concurrently, but it can also increase the overhead of creating and switching between threads. Conversely, a lower value can reduce the overhead of thread creation and context switching, but may result in fewer coroutines being executed concurrently and potentially lower throughput.

In general, it's a good idea to leave the default value unless you have specific performance requirements or have done careful testing to determine the optimal value for your use case.

How to set and get the kotlinx.coroutines.default.parallelism ?

  1. Set the system property in the command line when launching your application:

java -Dkotlinx.coroutines.default.parallelism=4 -jar your-application.jar         

2. Set the system property programmatically in your application code:

System.setProperty("kotlinx.coroutines.default.parallelism", "4")         

Note that this should be done before any coroutines are launched.

3. Set the system property in your build tool configuration. For example, if you are using Gradle, you can set the system property in the build.gradle file:

tasks.withType(Test) { systemProperty "kotlinx.coroutines.default.parallelism", "4" }         

The above will set the system property when running tests.

How to get the system property defaultParallelism

import kotlinx.coroutines.*
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor

fun main() {
    val defaultParallelism = Executors.newFixedThreadPool(1).asCoroutineDispatcher().use {
        // use the ThreadPoolExecutor as a coroutine dispatcher to get the default parallelism
        runBlocking(it) { (it.executor as ThreadPoolExecutor).maximumPoolSize }
    }
    println("The default parallelism is $defaultParallelism")
}
/*
Op =>  2023-03-15 11:35:04.811 14834-14834 System.out? The default parallelism is 1
*/        

In the example above , we create a new fixed-size thread pool with one thread using Executors.newFixedThreadPool(1). Then, we convert this thread pool to a coroutine dispatcher using the asCoroutineDispatcher() extension function. We use the executor property of the dispatcher to obtain the underlying ThreadPoolExecutor instance and then use the maximumPoolSize property to obtain the default parallelism. The runBlocking function is used to execute the coroutine in the context of the dispatcher. The result is printed to the console.

It's important to note that changing the default parallelism can have a significant impact on the performance and behavior of your coroutines, and it should be done carefully and with proper testing.

To run a coroutine using a specific dispatcher, you can specify the dispatcher as the?context?parameter of the?launch?or?async?functions:

When I say coroutines can run on more than one thread , how to decide which thread they can run on:

CoroutinesDispatchers?comes to rescue:

Dispatchers specify where the coroutines should run i.e Dispatches work to the appropriate thread.

1.Dispatchers .Main: This dispatcher runs coroutines on the main thread of the Android UI, making it suitable for updating the user interface or performing other UI-related tasks. Suitable for main thread on Android UI and perform light work. It uses a thread pool with a single thread, and is designed to ensure that UI updates are executed on the main thread.

2. Dispatchers .IO: This dispatcher uses a larger pool of threads optimized for IO-bound tasks. It is recommended for running blocking IO operations such as reading from or writing to a file, or making network requests.

  • Unlike Dispatchers.Default, which creates a thread pool optimized for CPU-bound operations, Dispatchers.IO creates a thread pool optimized for I/O-bound operations.
  • This thread pool has a larger number of threads than the default dispatcher, which allows for more concurrent I/O operations to be executed without blocking the threads.
  • In addition, Dispatchers.IO also has a longer delay before creating new threads compared to Dispatchers.Default. This delay allows the threads to be reused for multiple operations, which reduces the overhead of thread creation and destruction.
  • When using Dispatchers.IO, it is important to note that blocking operations should still be avoided as much as possible, as they can cause the threads to block and prevent other coroutines from executing. If you need to perform blocking I/O operations, you can use the withContext function to switch to a dedicated blocking dispatcher, such as Dispatchers.IO, before performing the blocking operation, and then switch back to the original dispatcher after the operation is complete.


import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.File

suspend fun readFile(filename: String): String {
    return withContext(Dispatchers.IO) {
        File(filename).readText()
    }
}        

In the above example, the withContext function is used to switch to the Dispatchers.IO dispatcher before reading the contents of the file using the File(filename).readText() function, which is a blocking I/O operation. Once the operation is complete, the result is returned to the calling coroutine.

  • Suitable and optimized for Disk and Network IO operations , data base operation ,etc

3. Dispatchers .Default: This dispatcher uses a shared background pool of threads to execute coroutines that are not CPU-bound or blocking.

  • By default, Dispatchers.Default creates a thread pool with a number of threads equal to the number of available processors on the system. This means that it can parallelize operations across multiple threads, which can lead to significant performance improvements for computationally-intensive tasks. However, it is important to note that creating too many coroutines can lead to contention and decreased performance, so it is generally recommended to limit the number of concurrent coroutines to the default parallelism value, which can be obtained using kotlinx.coroutines.default.parallelism.
  • It is suitable for running IO-bound tasks such as network calls, file operations, or database queries. . Suitable or optimized for CPU intensive work such as sorting operation,mathematical computations, data processing, and network operations etc .

4. Dispatchers.Unconfined : is a coroutine dispatcher that is designed to run coroutines without any specific context or thread.

  • When a coroutine is launched on Dispatchers.Unconfined, it starts executing immediately in the caller thread and continues until the first suspension point, at which point it resumes execution on the next available thread in the thread pool.
  • This can be useful in certain scenarios, such as when you want to perform a computation that is not bound to any specific thread or context, or when you want to perform an operation that requires a specific context that is not available on the current thread.
  • It is important to note that Dispatchers.Unconfined does not impose any thread constraints or thread safety guarantees, which means that it can be unsafe to use in certain scenarios. For example, if you are performing an I/O operation on a coroutine launched on Dispatchers.Unconfined, the operation may execute on the caller thread, which could block the thread and cause the application to become unresponsive.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {
        val result = expensiveComputation()
        println("Result: $result")
    }
}

suspend fun expensiveComputation(): Int {
    // Perform a long-running computation
    delay(1000)
    return 42
}        

In the above example, the expensiveComputation function performs a long-running computation and returns a result. The main function launches a coroutine on Dispatchers.Unconfined, which starts executing the expensiveComputation function immediately in the caller thread. Once the computation is complete, the result is printed to the console.

It is important to note that Dispatchers.Unconfined should be used with caution, as it can lead to unexpected behavior if used incorrectly. In general, it is recommended to use a more specific dispatcher, such as Dispatchers.Default or Dispatchers.IO, unless you have a specific use case for Dispatchers.Unconfined.

5. You can also create your own custom dispatchers if needed, using the?CoroutineDispatcher?interface. However, in most cases, you can rely on the built-in dispatchers provided by Kotlin.

Below is copied from the Kotlin site:

import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {?
? ? launch { // context of the parent, main runBlocking coroutine?
? ? ? ? println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")?
? ? }?

? ? launch(Dispatchers.Unconfined) { // not confined -- will work with main thread?
? ? ? ? ?println("Unconfined : I'm working in thread ${Thread.currentThread().name}")?
? ? }?

? ? launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
? ? ? ? println("Default ? ? ? ? ? ? ? : I'm working in thread ${Thread.currentThread().name}")
? ? }

? ? launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
? ? ? ? println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
? ? }
}        

O/P :

Unconfined?: I'm working in thread main

Default?: I'm working in thread DefaultDispatcher-worker-1

newSingleThreadContext: I'm working in thread MyOwnThread

main runBlocking?: I'm working in thread main

Example 1:

//Example1:

import kotlinx.coroutines.*

fun main() {
? ? val deferred = GlobalScope.async(Dispatchers.IO) {
? ? ? ? // This coroutine will be run in the IO thread pool
? ? ? ? someSuspendingFunction()
? ? }
? ? // Use the result of the coroutine when it completes
? ? val result = deferred.await()
}

Example2:
import kotlinx.coroutines.*


fun main() {
? ? GlobalScope.launch(Dispatchers.Main) {
? ? ? ? // This coroutine will be run in the main thread
? ? ? ? val result = someSuspendingFunction()
? ? ? ? // Use the result of the coroutine here
? ? }
}        

It is important to choose the appropriate dispatcher for the task at hand. Using the wrong dispatcher can degrade the performance of the application or cause the program to behave unexpectedly. For example, using the?dispatchers.IO?dispatcher for CPU-bound operations can lead to thread starvation, and using the?dispatchers.Main?dispatcher for blocking operations can cause the user interface to become unresponsive.

In addition to the built-in dispatchers, Kotlin also provides a way to create custom dispatchers using the?Dispatchers.newSingleThreadContext?function, which creates a new dispatcher that uses a single thread. This can be useful for tasks that need to be executed in a specific order, or that have thread-specific state:

Example1:
import kotlinx.coroutines.*


fun main() {
? ? val customDispatcher = Dispatchers.newSingleThreadContext("MyCustomDispatcher")


? ? val deferred = GlobalScope.async(customDispatcher) {
? ? ? ? // This coroutine will be run in a single, dedicated thread
? ? ? ? someSuspendingFunction()
? ? }
? ? // Use the result of the coroutine when it completes
? ? val result = deferred.await()
}

Example2:
import kotlinx.coroutines.*


fun main() {
? ? val customDispatcher = Dispatchers.newSingleThreadContext("MyCustomDispatcher")


? ? GlobalScope.launch(customDispatcher) {
? ? ? ? // This coroutine will be run in a single, dedicated thread
? ? ? ? val result = someSuspendingFunction()
? ? ? ? // Use the result of the coroutine here
? ? }
}        

It is important to note that custom dispatchers, like all dispatchers, are not tied to a specific thread or process. They are simply a way to schedule the execution of coroutines, and the actual threads used to execute the coroutines may vary over time.

Custom Dispatcher :

Below is an example for:

  1. Creates a custom thread pool
  2. Attaches it to a coroutine scope
  3. Runs multiple coroutines that can execute from different threads in the pool:

import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() = runBlocking {
    // Create a custom thread pool with 4 threads
    val customThreadPool = Executors.newFixedThreadPool(4)

    // Create a coroutine context with the custom thread pool
    val customCoroutineContext = customThreadPool.asCoroutineDispatcher()

    // Create a coroutine scope with the custom context
    val customCoroutineScope = CoroutineScope(customCoroutineContext)

    // Launch multiple coroutines that can execute from different threads in the pool
    val jobs = List(10) {
        customCoroutineScope.launch {
            println("Coroutine $it running on thread ${Thread.currentThread().name}")
            delay(1000)
            println("Coroutine $it completed on thread ${Thread.currentThread().name}")
        }
    }

    // Wait for all coroutines to complete
    jobs.forEach { it.join() }

    // Shutdown the thread pool
    customThreadPool.shutdown()
}
/*
?O/P => 
 Coroutine 0 running on thread pool-1-thread-1
?Coroutine 2 running on thread pool-1-thread-3
?Coroutine 4 running on thread pool-1-thread-3
?Coroutine 5 running on thread pool-1-thread-3
?Coroutine 6 running on thread pool-1-thread-3
?Coroutine 7 running on thread pool-1-thread-3
?Coroutine 8 running on thread pool-1-thread-3
?Coroutine 9 running on thread pool-1-thread-3
?Coroutine 1 running on thread pool-1-thread-2
?Coroutine 3 running on thread pool-1-thread-4
?Coroutine 0 completed on thread pool-1-thread-3
?Coroutine 2 completed on thread pool-1-thread-2
?Coroutine 4 completed on thread pool-1-thread-2
?Coroutine 6 completed on thread pool-1-thread-2
?Coroutine 7 completed on thread pool-1-thread-4
?Coroutine 8 completed on thread pool-1-thread-4
?Coroutine 9 completed on thread pool-1-thread-4
?Coroutine 5 completed on thread pool-1-thread-1
?Coroutine 1 completed on thread pool-1-thread-4
?Coroutine 3 completed on thread pool-1-thread-3
*/        

In above program,

  1. We first create a custom thread pool with 4 threads using the?newFixedThreadPool?method from the?java.util.concurrent.Executors?class.
  2. We then create a coroutine context with the custom thread pool using the?asCoroutineDispatcher?extension function.
  3. We create a coroutine scope with the custom context using the?CoroutineScope?class.
  4. We then launch multiple coroutines using the?launch?function from the coroutine scope. Each coroutine prints a message indicating which coroutine it is and which thread it's running on, waits for 1 second using the?delay?function, and then prints another message indicating that it's completed. Since we're using a custom thread pool with 4 threads, we can see that the coroutines can execute from different threads in the pool.
  5. Finally, we wait for all the coroutines to complete using the?join?function on each coroutine job, and then shutdown the custom thread pool using the?shutdown?method. \

Please note above o/p: Coroutines are started in some thread , but they are completed on different threads .Example below :

Coroutine 0 running on thread pool-1-thread-1
Coroutine 0 completed on thread pool-1-thread-3        

Why switching of thread happened?

When a coroutine encounters a?delay?function, it suspends its execution and releases the underlying thread. The coroutine is then added back to the coroutine queue, waiting for the specified delay time to expire. Once the delay time has elapsed, the coroutine is resumed and may execute on any available thread from the thread pool.

The coroutine dispatcher decides which thread the coroutine should resume on. By default, the?Dispatchers.Default?dispatcher is used, which uses a shared pool of threads. This can lead to coroutines executing on different threads from where they were originally started, which can cause synchronization issues.

In general, it is recommended to use the built-in dispatchers (such as?dispatchers.Main,?dispatchers.Default, and?dispatchers.IO) unless you have a specific need to create a custom dispatcher. This can help to ensure that the application is scalable and performs well on different devices and platforms.

Advantages to creating a custom thread pool and attaching it to a custom coroutine scope.

  1. Fine-tuned control: By creating a custom thread pool, you have fine-tuned control over the number of threads, their priorities, and other properties. This can help optimize the performance of your application.
  2. Resource management: If you have specific resource requirements, such as the need to limit the number of threads or allocate specific hardware resources, you can create a custom thread pool that meets those needs.
  3. Separation of concerns: By creating a custom coroutine scope and attaching it to a custom thread pool, you can separate the concerns of your application into different contexts. For example, you can use a custom thread pool for blocking operations and the default thread pool for non-blocking operations.
  4. Testing: By creating a custom coroutine scope and thread pool, you can easily test your application with different thread pool configurations, priorities, and thread counts.

Creating a custom thread pool and attaching it to a custom coroutine scope can help you optimize the performance of your application and manage resources more effectively.

Summary: Coroutines and threads are two abstractions that allow multiple pieces of code to be executed concurrently. Coroutines are lightweight and well-suited for use cases where large numbers of concurrent tasks need to be executed. Threads are more heavyweight but can execute code in parallel on separate cores of the CPU. Thread pools can be used to optimize the execution of concurrent tasks by allowing multiple coroutines to run on a shared pool of threads. The scheduling of coroutines and threads is complex.

I will be discussing communication between co-routines in next article.

Thanks for reading till end . Please comment if you have any !

要查看或添加评论,请登录

Amit Nadiger的更多文章

  • Atomics in Rust

    Atomics in Rust

    Atomics in Rust are fundamental building blocks for achieving safe concurrent programming. They enable multiple threads…

  • Frequently used Thread API - Random notes

    Frequently used Thread API - Random notes

    Thread Creation and Management: thread::spawn: Creates a new thread and executes a closure within it. It returns a…

  • Difference b/w Cell and RefCell

    Difference b/w Cell and RefCell

    Both Cell and RefCell are used in Rust to introduce interior mutability within immutable data structures, which means…

  • Tokio::spawn() in depth

    Tokio::spawn() in depth

    Tokio::spawn() is a function provided by the Tokio runtime that allows you to create a new concurrent task. Unlike…

  • tokio::spawn() Vs Async block Vs Async func

    tokio::spawn() Vs Async block Vs Async func

    Asynchronous programming is a powerful paradigm for handling I/O-bound operations efficiently. Rust provides several…

  • Tokio Async APIS - Random notes

    Tokio Async APIS - Random notes

    In this article, we will explore how to effectively use Tokio and the Futures crate for asynchronous programming in…

  • Reactor and Executors in Async programming

    Reactor and Executors in Async programming

    In asynchronous (async) programming, Reactor and Executor are two crucial components responsible for managing the…

  • Safe Integer Arithmetic in Rust

    Safe Integer Arithmetic in Rust

    Rust, as a systems programming language, emphasizes safety and performance. One critical aspect of system programming…

  • iter() vs into_iter()

    iter() vs into_iter()

    In Rust, iter() and into_iter() are methods used to create iterators over collections, but they have distinct…

  • Zero-cost abstraction in Rust

    Zero-cost abstraction in Rust

    Rust supports zero-cost abstractions by ensuring that high-level abstractions provided by the language and standard…

社区洞察

其他会员也浏览了