Location>code7788 >text

vernacular kotlin coroutine

Popularity:410 ℃/2025-04-01 19:53:30

The article is published simultaneously on the official account: What's going on with mobile developmentvernacular kotlin coroutine

1 What is a coroutine

Kotlin coroutine(Coroutine) is a lightweight thread management framework that allows developers to handle asynchronous operations in a more concise and efficient way, avoiding callback hell and thread blocking. It has several core features:

  • Suspend and restoresuspend: Can be suspended during time-consuming operation, release thread resources, and automatically restored after completion;
  • Non-blocking concurrency: enable task switching through collaborative scheduling;
  • Structured concurrency: automatically manage coroutine life cycles through scope;

2. Use of coroutines

2.1 Starting and scheduling of coroutines

Generally, a coroutine scope is needed to start and manage coroutines, and then in the scope
uselaunchorasyncFunction starts coroutine, usesuspendTo suspend coroutines:

  • launch: Used for non-blocking asynchronous tasks;
  • async: For asynchronous tasks that may return results
fun main() = runBlocking {
	 // Start coroutines in the default environment
     val scope = CoroutineScope()
      {
         // Execute asynchronous tasks here
         val result = requestData();
     }
     val result = {
    	 // Execute asynchronous tasks that can return results

     }
     // () Get specific results

 }
 // suspend keyword means that this method is called within the coroutine domain and will not block threads
 suspend fun requestData():Map {
	 // Actual execution
 }

And during the use of coroutines, it can beDispatchersTo specify which environment the coroutine runs in (the environment here mainly refers to which thread pool it runs in. Different thread pools have different processing strategies):

  • Main: Main thread
  • IO: Network/file operation
  • Default: CPU-intensive computing
    example:
// Start coroutine in IO environment
 val result = async() {
	 Perform asynchronous operations
 }

2.2 Cancel of coroutines

Cancel of coroutines is availablecanclekeywords to process. When starting the coroutine, aJobObject, can be called when it needs to be cancelledto achieve this goal;

val job: Job = {
     // Perform asynchronous operations
 }
 // Cancel
 ()

2.3 Exception handling of coroutines

Exception handling can be customizedCoroutineExceptionHandlerortry-catchblock to implement;

val task1 = launch(CoroutineExceptionHandler { _, e ->
         logError(e)
     }) { /* ... */ }
    
     try {
         ()
     } catch (e: CancellationException) {
         // Handle cancel exception
     }

3 The underlying implementation principle of coroutines

The underlying coroutine is processed based on the lifecycle state of the coroutine, and the lifecycle of the coroutine includes:

  • New: The initial state when it was created but not started;
  • Active
    • Running: Execution of code will occupy thread resources
    • Suspended: implementsuspendWhen the function is used, the thread is released and the recovery is waited;
  • Completed:Complete status (normal completion or abnormal completion)
  • Cancelling: Cancel state, enter the resource cleanup stage, but can be executedfinallyCode block;

3.1 Core Principle

Its core principle is based on a state machine. When a suspended function is encountered, the code of the coroutine will be replaced with a state machine. For example, there is a suspended function in the code:

suspend fun doSomething() {
     delay(1000) // suspend function
     println("Something done")
 }

 fun main() = runBlocking {
     launch {
         doSomething()
     }
     println("Main function continues")
 }

The compiler will compile the above code into:

// Simplified state machine code diagram
 class DoSomethingCoroutine : Continuation<Unit> {
	 // Current status
     var state = 0
     override fun resumeWith(result: Result<Unit>) {
         When (state) {
             0 -> {
                 state = 1
                 // Call the delay function and pass the current coroutine as a continuation
                 // After the delay function is executed, the resumeWith method will be called
                 delay(1000, this)
             }
             1 -> {
                 println("Something done")
             }
         }
     }
 }

After the suspended asynchronous operation is completed, the coroutine will be calledresumeWithMethod, pass the result to the coroutine, which will resume execution from the paused position and continue to execute subsequent code according to the state of the state machine;

3.2 Scheduler (CoroutineDispatcher)principle

The coroutine scheduler is responsible for determining which thread or thread pool the coroutine is used on;

  • Default: used for CPU-intensive, using a thread pool by default
  • IO: Special thread pool
  • Main: is used to execute coroutines on the main thread, usually used to update the UI;

3.2.1 Default

Used a basedForkJoinPoolthread pool.ForkJoinPoolIt is a special thread pool introduced in Java 7. It uses a Work-Stealing Algorithm and can efficiently handle a large number of small tasks.
When a coroutine passesWhen scheduling execution,dispatchMethods encapsulate coroutine tasks into oneRunnableobject and submit it toForkJoinPoolmiddle. ForkJoinPool selects an idle thread from the thread pool to perform the task.

3.2.2 IO

A thread pool is also used, but the size of this thread pool can be dynamically adjusted according to system resources. Its purpose is to handle a large number of I/O blocking operations and avoid blocking the execution of other coroutines.
When a coroutine passesWhen scheduling execution,dispatch Methods encapsulate coroutine tasks into oneRunnableobject and submit it to the IO thread pool. Since I/O operations usually block threads, the IO thread pool will have enough threads to handle these blocking operations, so that other coroutines can continue to execute.

3.2.3 Main

When a coroutine passesWhen scheduling execution,dispatch Methods encapsulate coroutine tasks into oneRunnableobject, and passHandlerSend it to the message queue of the main thread. The message loop of the main thread will take out the tasks in the message queue in turn and execute them.

4 Flow

FlowIt is a responsive programming in kotlin coroutine, built on coroutines, mainly used to process asynchronous data streams, and is a cold stream, and elements will only start to be sent when collected (at the same time,FlowWith backpressure mechanisms used to deal with the problem of speed mismatch between producers and consumers), it has several key components:

  • Flow interface: represents a cold stream that starts transmitting elements only when collected, and provides a series of operators for conversion and processing of data streams;
  • FlowCollector: used for collectionFlowThe element of the launch;
  • FlowBuilder: used to build Flow objects. Common construction methods are:flow,flowOf, asFlow

4.1 Common application scenarios

  • Asynchronous data flow
  • UI data update (data is exposed to the UI in the form of Flow, and the UI can be automatically updated when the data changes)
  • Event processing (convert various events to Flow for processing)
val dataFlow:Flow<String> = flow {
	 emit("data")
 }
 // The backpressure mechanism is not used by default. When the speed does not match, the producer will first pause and wait for the previous data to be processed before continuing to produce the data.
 {data ->
	 .....
 }

4.2 Backpressure mechanism

Backpressure is a feedback mechanism used to process situations in which producers generate data faster than consumers process data. When consumers have limited ability to process data, if the producer continues to generate data quickly, it may cause consumer memory to overflow or exhaust system resources. The backpressure mechanism allows consumers to feedback their processing capabilities to the producer, allowing producers to adjust the speed of data generation to achieve a balance between producers and consumers.
Several common backpressure operators:

  • buffer: Create a buffer, and the contents of the message will be placed in the buffer;
  • conflate: Unprocessed data in the buffer will be discarded and only the latest data will be retained.
  • collectLatest: When there is new data, the data currently being processed will be cancelled and only the latest data will be processed.
val dataFlow:Flow<String> = flow {
	 emit("data")
 }

 // Specify the backpressure strategy using buffer
 ()
	 .collect{data ->
	 .....
 }

5 Channel

ChannelIt is a tool in the Kotlin coroutine library for communicating between coroutines. It is similar to a queue, which supports one or more coroutines to send elements to it, and also supports one or more coroutines to receive elements from it. It can be used to implement the producer-consumer pattern, which has different ways of creating:

  • Channel<Type>(10): Create a fixed-size buffered channel
  • Channel<type>(): Create an unbuffered channel, sending and receiving should be synchronized
  • Channel<type>(): Create an infinite buffered channel,

A simple useChannelExample:

fun main() = runBlocking {
	 // Initialize a default size channel;
     val channel = Channel<Int>()

     // Producer Coroutine
     launch {
         for (i in 1..5) {
            
             // Send elements through send. If the buffer is full, the operation will hang until there is room for it
             (i)
             println("Sent $i")
         }
         // Close the Channel. After closing, you cannot send data, but you can continue to receive data.
         ()
     }

     // Consumer coroutine
     launch {
    	 // When the channel is closed and there are no more elements, the loop will automatically end
         for (element in channel) {
        	 // Elements will be received through the receive() method
             println("Received $element")
         }
         println("Channel closed")
     }
 }

5.1 Channel type

5.1.3 Buffered Channel with Capacity Limit

Specifies a fixed capacity, and the sending operation will hang when the buffer is full.

// Create a Channel with a capacity of 10
 val channel = Channel<Int>(10)

5.1.3 Unbuffered Channel()

The sender and the receiver must be prepared at the same time, and the sending operation will hang until there is a receiver receiving element; the receiving operation will hang until there is a sender sending element. This type is suitable for scenarios that require strict synchronization.

// Create an unbuffered channel
 val channel = Channel<Int>()

5.1.3 Unlimited buffered Channel()

(Created by default) The buffer can hold any number of elements, and the sending operation will not hang. However, it should be noted that if the producer speed is much higher than the consumer speed, it may lead to excessive memory usage.

// Create a Channel with no capacity limits. The following two methods are equivalent
 val channel = Channel<Int>()
 // val channel = Channel<Int>())

5.2 Channel underlying principle

ChannelThe core of the underlying layer is the queue:

  • Unbuffered Channel: Use special queues, which do not store elements themselves, but coordinate the synchronization of sender and receiver;
  • Buffered Channel (specified capacity): Use normalArrayDeque
  • Unlimited buffered Channel(): Use unbounded queues, such asLinkedList;

6 References

  • kotlin coroutine