深入理解Kotlin协程(二)——复合协程

本篇文章我们将基于简单协程实现几种复合协程。

序列生成器

仿Python的Generator实现

Python中的Generator可以在函数中调用yield将当前函数挂起并返回yield的参数,效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val nums: (Int) -> Generator<Int> = generator { start: Int ->
for (i in 0..5) {
yield(start + i)
}
}

val gen: Generator<Int> = nums(10)

for (j in gen) {
println(j)
}

// 运行结果
10
11
12
13
14
15

我们通过generator函数来得到一个新的函数nums,通过调用这个函数我们可以得到一个序列生成器Generator,Generator需要重载iterator操作符并且返回一个迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
interface Generator<T> {
operator fun iterator(): Iterator<T>
}

class GeneratorImpl<T>(
private val block: suspend GeneratorScope<T>.(T) -> Unit,
private val parameter: T
) : Generator<T> {
override fun iterator(): Iterator<T> = GeneratorIterator(block, parameter)
}

/**
* 构建一个序列生成器
*/
fun <T> generator(block: suspend GeneratorScope<T>.(T) -> Unit): (T) -> Generator<T> {
return { parameter: T ->
GeneratorImpl(block, parameter)
}
}

我们来思考一下这个GeneratorIterator迭代器要怎么写。当GeneratorIterator#next被调用时,我们需要获取到yield函数返回的数据,同时yield函数将会挂起协程,当我们下一次获取数据时恢复协程执行,获取到下一个数据后再挂起。也就是说next函数除了获取下一个数据之外,还要配合yield函数恢复协程的执行,我们根据当前迭代器的状态总结出下面几种情况:

  • 下个数据未准备完成。此状态表示协程尚未启动或者协程挂起后尚未恢复,此时调用next我们需要恢复协程的执行
  • 下个数据准备完成。恢复执行后,yield函数被调用,此时我们拿到了下一个数据,协程需要被挂起等待恢复执行。
  • 生成器执行完毕。无更多数据产生。

根据上面的分析我们定义出几种状态:

1
2
3
4
5
sealed class State {
class NotReady(val continuation: Continuation<Unit>) : State()
class Ready<T>(val continuation: Continuation<Unit>, val nextValue: T) : State()
object Done : State()
}

序列生成时,next函数和yield相互配合实现状态的流转,next函数需要恢复协程的执行,yield函数需要挂起协程同时将产生的数据传递给next函数返回,因此这里我们将等待恢复的continuation实例和返回的数据作为入参添加到状态中方便两个函数获取。

这几种状态的状态流转关系图如下:

状态流转图

根据状态流转图编写GeneratorIterator的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 获取下一个元素
* 当状态为[State.NotReady]时,表示协程未启动或已经挂起,调用[resume]函数启动或者恢复协程
* 当状态为[State.Ready]时,返回yield携带过来的value,并且将当前状态流转为[State.NotReady]状态
*/
override fun next(): T {
return when (val currentState = state) {
is State.NotReady -> {
resume()
return next()
}
is State.Ready<*> -> {
state = State.NotReady(currentState.continuation)
(currentState as State.Ready<T>).nextValue
}
State.Done -> {
throw IndexOutOfBoundsException("No value left.")
}
}
}

/**
* 使用suspendCoroutine挂起当前协程
* 并将当前状态流转为[State.Ready]状态,返回value
*/
override suspend fun yield(value: T) = suspendCoroutine<Unit> { continuation ->
state = when (state) {
is State.NotReady -> State.Ready(continuation, value)
is State.Ready<*> -> throw IllegalStateException("cannot yield while ready.")
is State.Done -> throw IllegalStateException("cannot yield while done.")
}
}

在yield函数的实现中,我们对当前的状态进行了判断,这是复合协程的一个核心逻辑:状态机。无论是何种场景下的协程,都会有挂起、恢复、结束等相应的状态需要维护,同时在有对应的事件到达时也需要完成状态的转移。状态转移务必考虑原子性,kotlin核心库中的状态转移都通过CAS操作来更新,由于本例仅限于单线程中使用,所以这里没有考虑线程安全问题。yield函数定义在协程作用域中:

1
2
3
interface GeneratorScope<T> {
suspend fun yield(value: T)
}

yield函数处理了挂起事件,对应的我们也需要合适时候处理协程的恢复、完成等事件,下面函数都完成了部分时间的状态转移:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
override fun hasNext(): Boolean {
resume()
return state != State.Done
}

/**
* 启动或者恢复协程
* 假设事实上的挂起发生了n次,那么[resume]将被调用n+1次
*/
private fun resume() {
when (val currentState = state) {
is State.NotReady -> currentState.continuation.resume(Unit)
}
}

/**
* 创建协程的completion回调
* 协程执行完成后回调
*/
override fun resumeWith(result: Result<Any?>) {
state = State.Done
println(result.getOrThrow())
}

现在我们已经完成比较关键的状态流转代码,但我们还没有创建协程,我们在GeneratorIterator构造函数中创建一个协程:

1
2
3
4
5
6
7
init {
val coroutineBlock: suspend GeneratorScope<T>.() -> Unit = { block(parameter) }
// 带Receiver的方式创建协程
val start = coroutineBlock.createCoroutine(this, this)
// 初始化状态
state = State.NotReady(start)
}

使用带Receiver的方式创建协程的好处是我们可以使用作用域中的方法,也就是yield函数。这里createCoroutine的两个个参数我们都传入了this,即GeneratorIterator同时实现了GeneratorScope接口和Continuation接口,如下所示。

1
2
3
4
class GeneratorIterator<T>(
private val block: suspend GeneratorScope<T>.(T) -> Unit,
private val parameter: T
) : GeneratorScope<T>, Iterator<T>, Continuation<Any?>

实现GeneratorScope接口我们可以复写或者实现yield函数,而实现Continuation的好处是GeneratorIterator自身可以监听到协程的执行结束,例如上面resumeWith函数使得我们可以在执行结束后将状态置为State.Done结束迭代。

接下来就是generator函数了,它接收一个参数即协程体,返回一个函数用来创建迭代器/序列生成器,如下所示。

1
2
3
4
5
fun <T> generator(block: suspend GeneratorScope<T>.(T) -> Unit): (T) -> Generator<T> {
return { parameter: T ->
GeneratorImpl(block, parameter)
}
}

现在我们完成了一个序列生成器,该序列生成器比较具备代表性,具有以下代表结构:

  • 返回值:GeneratorIterator的泛型参数T即为元素类型。对于存在结果的协程,一定存在相应的泛型参数生命。
  • 状态机:GeneratorIterator实现Continuation接口之后,自身即可作为协程执行完成后的回调completion参数传入,进而监听协程的完成情况。
  • 作用域:GeneratorIterator实现GeneratorScope接口之后,可以作为协程体的Receiver,这样即可令协程体获得相应的扩展函数,如本例中的yield函数。

kotlin标准库中的序列生成器

Kotlin标准库中提供了类似的生成器实现,通常我们称它为“懒序列生成器”。序列使用方法如下所示。

1
2
3
4
5
6
7
8
9
10
val sequence = sequence {
yield(1)
yield(2)
yield(3)
yieldAll(listOf(1, 2, 3, 4))
}

for (num in sequence) {
println(num)
}

sequence函数接收一个函数作为参数,结合我们自己实现的generator,这个函数实际上就是协程体。除了提供yield函数外,sequence还支持批量生产元素yieldAll。与generator不同的是,这里sequence {}的返回值直接就是迭代器。

我们可以通过sequence来获取一个斐波那契数列,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
val fibonacci = sequence {
yield(1L)
var current = 1L
var next = 1L
while (true) {
yield(next)
next += current
current = next - current
}
}

fibonacci.take(10).forEach(::println) // 使用take(10)获取前10个元素

Promise 模型

Promise模型又叫async/await,是目前最常见也最容易理解和上手的协程实现。

async/await的设计可以在async函数内部对复合Promise协议的异步回调进行await,使得异步逻辑变成了同步代码。这是目前主流的协程实现,它的关键在于将函数分为两种:

  • 普通函数:只能够调用普通函数,不存在协程的挂起和恢复逻辑。
  • async函数:既可以调用普通函数,也可以调用async函数,且可以将回调通过await同步化。

async和wait各司其职,分别实现协程的挂起和恢复的逻辑,上手几乎没有成本。

仿JavaScript的async/await实现

利用kotlin协程实现一个async/await的复合协程,效果如下所示。

1
2
3
4
5
6
7
async {
val user = await { gitHubApi.getUserCallback("sukaidev") }
println(user)
}

// 控制台输出
User(userName=sukaidev, blog=https://www.sukaidev.top, location=GuangZhou,China, bio=Focus on Jetpack)

先思考下async如何实现。async接收一个函数作为参数,这个函数即协程体,必然是个挂起函数。async创建成功后协程即启动,并且内部可以使用await方法来挂起协程。async函数实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun async(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend AsyncScope.() -> Unit
) {
val completion = AsyncCoroutine(context)
block.startCoroutine(completion, completion)
}

class AsyncCoroutine(override val context: CoroutineContext = EmptyCoroutineContext) :
Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow()
}
}

由于async启动的协程不需要返回值,所以作为completion存在的AsyncCoroutine没有泛型参数。

await函数定义在AsyncScope中,使得它可以在async构造的协程中调用,它主要的作用就是将网络请求的回调转为协程,使用suspendCoroutine实现即可,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
interface AsyncScope {
suspend fun <T> await(block: () -> Call<T>) = suspendCoroutine<T> { continuation ->
val call = block()
call.enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
response.body()?.let(continuation::resume)
?: continuation.resumeWithException(NullPointerException())
return
}
continuation.resumeWithException(HttpException(response))
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}

async/await的状态比较简单,只有挂起以及被封装的回调完成状态,状态之间的流转由await单独来完成,因此不需要额外的状态机实现。当我们引入了协程的取消处理、异常处理等逻辑后,状态机的存在就至关重要了。

Luna风格的协程API

我们在使用Kotlin协程时,总是说创建了一个协程,不像创建线程会有一个对应的类或者对象。在之前的复合协程中,我们总是把协程的状态机封装在协程的完成回调Continuation实例competition中,由于这个实例提供了各种的协程能力,所以我们一般它当做复合协程本身。

使用Luna API创建协程与创建线程一样,只需提供一个函数,返回一个协程的控制类来控制协程的执行,我们可以基于Kotlin的简单协程来实现这样的一套API。

非对称API实现

非对称协程的主要特点就是在协程挂起时,协程控制权返回给了调用者,我们先看下实现效果,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val producer = Coroutine.create<Unit, Int> {
for (i in 0..3) {
println("send $i")
yield(i)
}
200
}

val consumer = Coroutine.create<Int, Unit> { parameter: Int ->
println("start $parameter")
for (i in 0..3) {
val value = yield(Unit)
println("receive $value")
}
}

while (producer.isActive && consumer.isActive) {
val result = producer.resume(Unit)
consumer.resume(result)
}

通过Coroutine的伴生对象来创建协程,参数为协程体,协程体的参数类型和返回值类型由泛型参数指定。create的返回值用来控制协程的执行,结合前面几篇文章,我们知道这个返回值其实就是封装了协程状态机的实例,与SafeContinuation的作用十分类似,我们也习惯于将这个实例作为协程的完成回调,即competition。yield函数类似于序列生成器中的yield的作用,将当前协程挂起并将它的参数作为协程这一次resume调用的返回值。

状态机的状态设计是关键点,有如下几个状态。

1
2
3
4
5
6
sealed class Status {
class Created(val continuation: Continuation<Unit>) : Status()
class Yielded<P>(val continuation: Continuation<P>) : Status()
class Resumed<R>(val continuation: Continuation<R>) : Status()
object Dead : Status()
}
  • Created:协程处于刚刚被创建状态,需要等待resume函数的调用来启动协程。
  • Yielded:协程内部调用yield函数后挂起,泛型参数P表示协程的参数类型。
  • Resumed:协程外部调用resume函数之后协程恢复执行,泛型参数R表示协程的返回值类型。
  • Dead:表示协程已经执行完毕。

状态之间的转移如下所示。

状态流转图

创建一个CoroutineScope来约束yield的调用范围:

1
2
3
4
interface CoroutineScope<P, R> {
val parameter: P? // 协程体启动时的参数
suspend fun yield(value: R): P
}

由于Coroutine的对象需要返回给调用者,所以我们这里不能让Coroutine来直接实现CoroutineScope,而是在内部创建了一个CoroutineScope的匿名内部类。协程描述类的部分代码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Coroutine<P, R> private constructor(
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend CoroutineScope<P, R>.(P) -> R
) : Continuation<R> {

companion object {
fun <P, R> create(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope<P, R>.(P) -> R
): Coroutine<P, R> {
return Coroutine(context, block)
}
}

private val scope = object : CoroutineScope<P, R> {
override var parameter: P? = null

override suspend fun yield(value: R): P = suspendCoroutine { continuation ->
... ...
}
}

private val status: AtomicReference<Status>

val isActive: Boolean
get() = status.get() != Status.Dead

init {
val coroutineBlock: suspend CoroutineScope<P, R>.() -> R = { block(parameter!!) }
val start = coroutineBlock.createCoroutine(scope, this)
status = AtomicReference(Status.Created(start))
}
... ....
}

代码结构与之前的序列生成器的迭代器类似,不过有两点区别:

  1. 提供了一个工厂方法create函数,用于手动创建Coroutine对象,同时返回该对象,使得外部可以控制Coroutine对象的恢复逻辑。
  2. 状态机status我们使用了AtomicReference,这是为了确保状态机的流转在多线程情况下仍然能够保证原子性。

我们再来看下yield的实现,如下。

1
2
3
4
5
6
7
8
9
10
11
override suspend fun yield(value: R): P = suspendCoroutine { continuation ->
val previousStatus = status.getAndUpdate {
when (it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
is Status.Resumed<*> -> Status.Yielded(continuation)
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
(previousStatus as? Status.Resumed<R>)?.continuation?.resume(value) // 恢复外部协程
}

yield函数使用suspendCoroutine来挂起函数,这个我们已经很熟悉了。与序列生成器的yield实现不同的是,这里yield是有返回值的,为P类型也就是协程的入参类型,这使得yield同时也可以作为消费者来使用。

status.getAndUpdate接收一个参数为上一个状态,并要求返回新的状态,这个函数可能会被执行多次。当previousStatus确实是Resumed,就调用它的continuation.resume来恢复此前恢复执行当前协程的协程,这里有点绕,其实就是恢复外部协程的执行,外部协程指我们创建producer和consumer的协程。外部协程为什么会挂起跟我们的resume函数实现有关,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
suspend fun resume(value: P): R = suspendCoroutine { continuation ->  // 挂起外部协程
val previousStatus = status.getAndUpdate {
when (it) {
is Status.Created -> {
scope.parameter = value
Status.Resumed(continuation)
}
is Status.Yielded<*> -> Status.Resumed(continuation)
is Status.Resumed<*> -> throw IllegalStateException("Already resumed!")
Status.Dead -> throw IllegalStateException("Already dead!")
}
}

when (previousStatus) { // 恢复当前协程
is Status.Created -> previousStatus.continuation.resume(Unit)
is Status.Yielded<*> -> (previousStatus as Status.Yielded<P>).continuation.resume(value)
else -> {
}
}
}

当外部调用resume恢复该协程时,当前状态可能为:

  • Created,协程只是创建,并未启动。
  • Yielded,协程已执行处于挂起状态。

resume函数的作用是挂起外部协程,并且启动或恢复当前协程的执行。外部协程挂起后,我们将其continuation实例存入到状态机中,当当前协程yield函数被调用时,又会将外部协程恢复,使得外部协程可以继续执行后续工作,例如调用另一个协程的resume函数。文字说起来可能比较抽象,这里画了一个时序图,如下。

非对称API时序图

最后就是resumeWith的实现了,它的调用表示该协程已经执行完毕,需要将状态流转为State.Dead,同时不要忘记恢复外部协程执行,如下所示。

1
2
3
4
5
6
7
8
9
10
11
override fun resumeWith(result: Result<R>) {
val previousStatus = status.getAndUpdate {
when (it) {
is Status.Created -> throw IllegalStateException("Never started!")
is Status.Yielded<*> -> throw IllegalStateException("Already yielded!")
is Status.Resumed<*> -> Status.Dead
Status.Dead -> throw IllegalStateException("Already dead!")
}
}
(previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
}

至此,Lua风格的非对称协程API完成。

对称API实现

对称协程意味着协会可以任意、平等地传递调度权。在传递过程过,调度权转出的协程需要提供目标协程的对象及参数,目标协程应处于挂起状态等待接收调度权,中间应当有一个控制中心来协助完成调度权的转移。控制中心需要具备以下能力:

  • 在当前协程挂起时接收调度权。
  • 根据目标协程对象来完成调度权的最终转移。

这个控制中心显然是一个可以恢复(当前协程挂起)和挂起(传递调度权给目标协程)的协程,实际上我们之前的非对称API中的外部协程已经具备了一些控制中心的能力,我们只需要对其进行改造即可。效果如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object SymCoroutines {
val coroutine0: SymCoroutine<Int> = SymCoroutine.create { params: Int ->
println("coroutine-0 $params")
var result = transfer(coroutine2, 0)
println("coroutine-0 1 $result")
result = transfer(SymCoroutine.main, Unit)
println("coroutine-0 1 $result")
}

val coroutine1: SymCoroutine<Int> = SymCoroutine.create { param: Int ->
println("coroutine-1 $param")
val result = transfer(coroutine0, 1)
println("coroutine-1 1 $result")
}

val coroutine2: SymCoroutine<Int> = SymCoroutine.create { param: Int ->
println("coroutine-2 $param")
var result = transfer(coroutine1, 2)
println("coroutine-2 1 $result")
result = transfer(coroutine0, 2)
println("coroutine-2 2 $result")
}
}

SymCoroutine.main {
println("main 0")
val result = transfer(SymCoroutines.coroutine2, 3)
println("main end $result")
}

我们使用SymCoroutine.create创建了多个协程,并且使用SymCoroutine.main创建了一个外部协程,这个外部协程就是控制中心,后文简称为 Main协程。Main协程通过transfer函数将调度权转移给coroutine2,从而开始了对称协程的调度权转移过程。

调度权转移时序图

与之前一样,我们定义一个作用域接口来提供transfer函数,如下所示。

1
2
3
interface SymCoroutineScope<T> {
suspend fun <P> transfer(symCoroutine: SymCoroutine<P>, value: P): T
}

这里需要注意的是,泛型参数T为对称协程的参数类型,而transfer函数的泛型参数P表示目标协程的参数类型,对称协程自身的定义决定了它不存在返回值,这点与非对称协程不同。

接下来就是协程描述类SymCoroutine以及它的create和main函数的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SymCoroutine<T>(
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend SymCoroutineScope<T>.(T) -> Unit
) : Continuation<T> {

companion object {
lateinit var main: SymCoroutine<Any?>

suspend fun main(
block: suspend SymCoroutineScope<Any?>.() -> Unit
) {
SymCoroutine<Any?> {
block()
}.also {
main = it
}.start(Unit)
}

fun <T> create(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend SymCoroutineScope<T>.(T) -> Unit
): SymCoroutine<T> = SymCoroutine(context, block)
}

private val body: SymCoroutineScope<T> = object : SymCoroutineScope<T> {
... ...
}

val isMain: Boolean
get() = this == main
... ...
}

main函数的作用是创建Main协程,同时会将其赋值给伴生对象属性main,方便其他协程将调度权归还给控制中心。

接下来我们思考下当前协程如何将调度权转出。由于当前协程本质上是由Main协程启动的协程,因此只要调用内部的非对称协程的yield函数将自身挂起,调度权自然就交回到了Main协程手中。Main协程只需要读取它自己的resume的返回值即可得到目标协程对象及参数。因此yield的参数类型定义如下:

1
class Parameter<T>(val coroutine: SymCoroutine<T>, val value: T)

SymCoroutin内部的非对称协程的定义如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SymCoroutine<T>(
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend SymCoroutineScope<T>.(T) -> Unit
) : Continuation<T> {
... ...
private val coroutine = Coroutine.create<T, Parameter<*>>(context) {
Parameter(this@SymCoroutine, suspend {
block(body, it)
if (this@SymCoroutine.isMain) Unit
else throw IllegalStateException("SymCoroutine cannot be dead.")
}() as T)
}

override fun resumeWith(result: Result<T>) {
throw IllegalStateException("SymCoroutine cannot be dead.")
}

suspend fun start(value: T) {
coroutine.resume(value)
}
... ...
}

对于内部的非对称协程而言,yield函数的参数类型**Parameter**自然就是它的返回值类型,因此我们看到携程体内构造了一个Parameter的实例。不过我们在传入Parameter参数的时候,应传入目标协程和目标协程的参数,但这里我们传入了this,这是为什么呢?因为这是协程执行完后的最后一行代码,由于对称协程在执行完成之前必须交出调度权,所以这段代码只会被特权协程执行。第二个参数也是如此,创建了一个Lambda表达式并且理解调用了它,并在其中执行了block触发协程体的执行,普通的对称协程在block内部就会通过调用transfer交出调度权。

接下来是最关键的transfer函数的实现,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SymCoroutine<T>(
override val context: CoroutineContext = EmptyCoroutineContext,
private val block: suspend SymCoroutineScope<T>.(T) -> Unit
) : Continuation<T> {
private val body: SymCoroutineScope<T> = object : SymCoroutineScope<T> {
private tailrec suspend fun <P> transferInner(
symCoroutine: SymCoroutine<P>,
value: Any?
): T {
if (this@SymCoroutine.isMain) { // Main协程在转移调度权
return if (symCoroutine.isMain) {
value as T // ... ③
} else {
val parameter = symCoroutine.coroutine.resume(value as P) // ... ①
transferInner(parameter.coroutine, parameter.value)
}
} else { // 对称协程转移调度权到其他协程或者Main协程
coroutine.run {
return yield(Parameter(symCoroutine, value as P)) // ... ②
}
}
}

override suspend fun <P> transfer(symCoroutine: SymCoroutine<P>, value: P): T {
return transferInner(symCoroutine, value)
}
}
... ...
}

我们按照之前代码的执行逻辑来分析一下transfer的调用逻辑。

  • 程序开始执行时,调度权最开始在特权协程手中,调用transfer将调度权转给coroutine2,transferInner入参symCoroutine为coroutine2,在①处调用coroutine2的resume挂起Main协程,coroutine2开始执行。
  • 接下来coroutine2调用transfer函数转给coroutine1时,先将调度权交出,实际上就是在②处调用yield将自己挂起,此时接收调度权的Main协程在①处的resume函数返回,parameter中携带的其实就是coroutine1和它的参数。
  • 此时Main协程中递归调用transferInner并再次进入①处挂起自己,由于coroutine1尚未启动,因此直接开始执行,直到调用transfer转给coroutine0。
  • 最终,在coroutine()中将调度权归还给Main协程,transferInner落入③处分支直接返回。

至此,基于非对称协程API实现的对称协程API原理分析完毕。

复合协程的实现模式

结合本篇几个案例的实现,我们可以把复合协程实现模式归纳如下。

  • 协程的构造器。我们总是需要一套更好更简便的API来创建协程,例如async{ … }或者Coroutine.create{ … }。
  • 协程的返回值。协程可以有返回值,这一点主要是由协程完成时对completion的调用来保证的。
  • 协程的状态机。在Kotlin协程的基础设施中,协程本身已经存在创建、执行、挂起、完成等状态了,我们通常需要对这些状态进行管理以控制协程的执行逻辑。状态流转过程在并发环境下需要考虑并发安全问题,我们可以在状态流转时通过加锁来确保这一点,也可以采用更高效的CAS算法来确保状态流转的原子性。
  • 协程的作用域。作用域主要用作协程体的Receiver,从而令协程体能够方便地获得协程自身的信息或者调用协程体专属的函数(例如yield)。

参考