深入理解Kotlin协程(一)——基本概念与原理

协程的基本概念

在当前主流的计算机操作系统中,进程和线程是我们比较熟悉的东西。在早期的计算机系统中,并没有线程的概念,而进程是操作系统进行资源分配和调度的基本单位,也是程序执行的最小单位,每个进程都有其单独的内存空间,使得进程之间的内存相互独立。随着计算机的发展,越来越多的场景需要多任务并发执行,而进程的创建、切换、销毁开销大,且进程之间内存无法共享等问题无法满足复杂的场景需求,这些问题促使了线程的诞生。

线程是cpu调度和分派的基本单位,它可以理解为进程的一条执行路径。一个进程可以有多个线程,线程之间共享内存空间,同时线程存在自己的私有工作空间以及上下文,极大的缩小了进程创建和切换带来的开销。而线程之间通过时间片轮转来分配cpu时间片,使得单核cpu也能做到“并发”的效果,这使得多任务并发的性能得到了很大的提高。

协程是很早就存在的概念,但近几年被广泛的使用。协程出现最初是为了实现与线程的“抢占式调度”不同的“协同式调度”多任务并发程序。与线程的抢占式调度不同,协同式调度讲究的是任务完成后主动通知cpu任务已经执行完成并交回cpu使用权,但随着主流操作系统都采用抢占式调度后,协程也被人们遗忘。

随着对程序性能的追求,减少线程上下文切换,不少语言都实现了自己的协程,例如Golang、Python、C++、Lua、Kotlin等。Kotlin的协程由语言层面提供,不少人对它的解释是“线程框架”,实际上它的功能就是在当前以抢占式线程的多任务并发机制为主的操作系统,以协同式的调度思想来解决多任务问题,并且尽量的减少线程之间的切换开销,提升程序在高并发时的性能。

说了那么多,那么到底什么是协程?

  1. 挂起和恢复
  2. 程序自行处理挂起恢复
  3. 程序自行处理挂起恢复来实现程序执行流程的协作调度

协程的分类

按调用栈分类

我们知道,java程序之所以能实现方法内部调用方法的功能,是因为Jvm中维护了一个栈,当方法被调用时,其相关的数据与属性都会被栈中,也就是我们常说的JVM中的Java栈。这个栈就是函数调用栈,是用来保存函数调用时的状态信息的数据结构。

由于协程需要挂起和恢复,因此对于挂起点的状态保存就显得很重要。按照是否开辟了函数调用栈我们可以对协程进行分类。

  • 有栈协程(Stackful Coroutine):每一个协程都有自己的调用栈,类似于线程的调用栈
  • 无栈协程(Stackless Coroutine):协程没有自己的调用栈,挂起点的状态由状态机或者闭包等语法来实现

有栈协程的优点是可以在任意函数调用层级的任意位置挂起,并转移调度权。无栈协程的有点是不需要开辟栈空间,因此在内存紧张的程序上有优势。

Kotlin的协程通常被认为是一种无栈协程的实现,它的控制流转依靠对协程体本身编译生成的状态机的状态流转来实现,变量保存也是通过闭包语法来实现。不过,kotlin协程可以在挂起函数范围内的任意调用层次挂起,这也是有栈协程的一个重要特性之一。

按调度方式分类

调度过程中,根据协程调度权的转移目标的不同又可将协程分为对称协程非对称协程

  • 对称协程(Symmetric Coroutine):任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移
  • 非对称协程(Asymmetric Coroutine):协程出让调度权的目标只能是它的的调用者,即协程之间存在调用和被调用关系

对称协程的概念跟线程十分类似,例如go routine可以通过读写不同的channel来实现控制权的自由转移。常见语言的协程大多是非对称实现。实际上在非对称的基础上,我们只需要添加一个中立的第三方作为协程调度权的分发中心,所有协程在挂起时都将调度权转移给分发中心,分发中心根据参数来决定将调度权转移给哪个协程,即可实现对称协程。

协程基础

协程的创建

我们可以通过createCoroutine方法快速创建一个协程:

1
2
3
4
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

可以看到该方法是个扩展方法,Receiver类型为suspend ()->T,是一个挂起函数。completion是协程完成后的回调,有点类似于我们常写的Callback。方法返回一个Continuation对象, 拿到了这个对象后我们就可以随时的启动协程了。

1
2
3
4
5
6
7
8
9
10
11
val continuation = suspend {
println("Coroutine Start.")
"return value."
}.createCoroutine(object : Continuation<String> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

override fun resumeWith(result: Result<String>) {
println("Coroutine End : $result")
}
})

协程的启动

上面代码中我们顺利创建一个协程,那么如何启动这个协程呢?Continuation中并没有类型startXXX()的方法,而是调用Continuation#resume来启动一个协程。

1
2
3
4
5
continuation.resume(Unit)   // 启动协程

// 控制台输出:
Coroutine Start.
Coroutine End : Success(return value.)

当然我们也可通过startCoroutine来直接创建和启动一个协程:

1
2
3
4
5
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

上面有个非常令人疑惑的点是,为什么调用返回的Continuation对象就可以启动协程呢?按照我们上面的写的,continuation的resumeWith方法应该作为回调成功的方法才对。

通过上面createCoroutine方法源码我们得知返回给我们的是一个SafeContinuation对象,SafeContinuation其实只是个”马甲“,它的所有操作均交由其私有属性delegate来执行,当然delegate也是一个Continuation对象。当我们执行了

1
continuation.resume(Unit)

这行代码时,实际上是执行了delegate.resumeWith方法。那么新问题来了,delegate对象又是哪来的呢?通过查阅字节码和反编译文件我们发现,我们写的协程体也就是suspend修饰的这个lambda编译后实际上变成了一个匿名内部类,而我们的协程体中的代码实际被包装在了其invokeSuspend方法中。而当我们调用Suspend Lambda的扩展方法createCoroutine创建一个协程时,我们的Suspend Lambda也就是协程体实际被传入SafeContinuation的构造方法中,也就是说SafeContinuation的delegate属性其实就是我们的协程体。

这样看来就比较清晰了,创建协程返回的Continuation实例其实就是套了几层马甲的协程体,协程体先被编译器封装到一个匿名内部类内部的invokeSuspend方法中,再传入SafeContinuation充当其代理,当我们调用SafeContinuation#resume的时候,我们的协程体自然就得到了执行。

协程体的Receiver

与协程创建和启动相关的API有两组,现在看下第二组:

1
2
3
4
5
6
7
8
9
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
)

public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
)

两组区别仅仅在于这一组协程体多了一个Receiver类型R。协程体的Receiver可以为协程提供一个作用域,使得我们可以在协程体内使用作用域提供的函数或者状态等。

由于Kotlin本身没有提供带有Receiver的Lambda表达式的语法,这里我们自己实现一个:

1
2
3
4
5
6
7
8
9
10
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

override fun resumeWith(result: Result<T>) {
println("Coroutine End : $result")
}
})
}

使用时首先需要一个作用域,这里我们来手动模拟一个协程作用域:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MainScope<T> {
suspend fun print(value: T) {
println(value.toString())
}

suspend fun hashCode(value: T) = value.hashCode() xor (value.hashCode() ushr 16)
}

fun callLaunchCoroutine() {
launchCoroutine(MainScope<Int>()) {
println("Start Coroutine.")
print(1000)
delay(1000L)
hashCode(1000)
}
}

可以看到我们可以直接使用作用域提供的函数,也可以调用作用域外部定义的挂起函数。

除了可以提供函数支持外,作用域也可以用来增加限制,例如RestrictsSuspension注解,为作用域添加这个注解后,使用作用域构造的协程体将无法调用外部的挂起函数,例如上面代码中的delay将会报红。

可挂起的main函数

从Kotlin Version 1.3开始,我们main函数可以直接被声明为suspend:

1
2
3
suspend fun main(){
... ...
}

这意味着我们可以在JVM启动的时候直接获得一个协程。首先可以确定的是这个可挂起的main函数肯定不会是真正的程序入口,因为JVM根本不会知道什么是协程,实际上我们反编译字节码后发现main方法是这样的:

1
2
3
public static void main(String[] var0) {
RunSuspendKt.runSuspend(new CoroutineKt$$$main(var0));
}

我们在suspend main()中写的可挂起的代码实际上由RunSuspendKt#runSuspend来执行,来看看源码:

1
2
3
4
5
internal fun runSuspend(block: suspend () -> Unit) {
val run = RunSuspend()
block.startCoroutine(run)
run.await()
}

可以看到这里用我们的suspend main()来启动一个协程。

这里还有一个RunSuspend类,它也是一个Continuation,作为我们整个程序运行完成的回调,这里我们可以关注下await函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override fun resumeWith(result: Result<Unit>) = synchronized(this) {
this.result = result
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).notifyAll()
}

fun await() = synchronized(this) {
while (true) {
when (val result = this.result) {
null -> @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).wait()
else -> {
result.getOrThrow() // throw up failure
return
}
}
}
}

可以看到当我们调用了block.startCoroutine(run)启动了协程之后,main()方法所在线程实际会进入阻塞状态,当协程执行完毕,run的resumeWith方法会被调用,main()方法所在线程从而被唤醒,使得JVM可以正常退出。

函数的挂起

挂起函数

用suspend修饰的函数称为挂起函数。挂起函数只能在协程体内或者另一个挂起函数内部调用,这样kotlin的函数就分为了两种:普通函数挂起函数。其中挂起函数可以调用任何函数,而普通函数只能调用普通函数。

1
2
3
4
5
6
7
8
9
suspend fun suspendFunc01(a: Int) {
return
}

suspend fun suspendFunc02(a: String, b: String) = suspendCoroutine<Int> { continuation ->
thread {
continuation.resumeWith(Result.success(5))
}
}

可以看到,挂起函数既可以像普通函数一样同步返回,也可以处理异常逻辑。suspendFunc02中使用suspendCoroutine来获取当前所在协程体的Continuation的实例作为参数将挂起函数当成异步函数里来处理,内部新建一个线程来执行Continuation.resultWith操作,因此协程调用suspendFunc02后会进入挂起状态,直到结果返回。所谓挂起其实就是指当前的程序执行流程发生了异步调用,执行流程进入等待状态。

挂起点

在前面的suspendFunc02中我们发现,一个挂起函数想要挂起,所需要的无非是一个Continuation实例,我们可以通过suspendCoroutine函数来获取到它。而协程内部挂起函数的调用处被称为挂起点,挂起点如果出现异步调用,那么当前协程就会被真正挂起,直到对应的Continuation#resume函数被调用才会恢复执行。

我们已经知道suspendCoroutine函数可以获得当前协程的Continuation实例,结合我们通过suspend{}创建的协程体,不难得出这个Continuation实例其实是一个SafeContinuation的实例。SafeContinuation的作用是确保协程只有在发生异步调用时才会挂起,例如下方代码虽然也有发生resume函数的调用,但协程并不会真正挂起:

1
2
3
suspend fun notSuspend() = suspendCoroutine<Int> { continuation ->
continuation.resume(100)
}

而异步调用是否发生,取决于resume函数与其对应的挂起函数调用是否在相同的调用栈上。这里列举两个函数调用栈发生了切换的场景:

  1. 发生了线程切换
  2. 操作被post到了事件循环队列中等待执行

CPS变换

CPS变换全名叫Continuation-Passing-Style Transformation,即连续传递样式变换。CPS是一种编程风格,用来将内部要执行的逻辑封装到一个闭包里面,然后再返回给调用者,这就将它的程序流程显式的暴露给程序员。而CPS变换就是将原本不是CPS风格的代码转变为CPS风格,通常由编译器来对代码进行优化,而Kotlin协程能够采用同步的方式书写异步代码的原理正是由于编译器使用了CPS变换

Kotlin协程在挂起时,最关键的是要保存挂起点。挂起点的信息被保存在Continuation对象中,Continuation携带了协程继续执行所需要的上下文,恢复执行的时候只需要执行它的恢复调用并且把需要的参数或者异常传入即可。

我们前面讲到,挂起函数如果需要挂起,则需通过suspendCoroutine来获取Continuation,问题是这个Continuation是怎么传入suspendCoroutine里来的呢?

我们通过反编译上一节的notSuspend函数发现,notSuspend函数实际上有一个入参为Continuation实例,结合挂起函数必须在协程体中执行这个特点,不难得出这个实例由外部协程传入。而编译器将我们的挂起函数编译成需要一个Continuation实例入参的这种行为,就是CPS变换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Nullable
public static final Object notSuspend(@NotNull Continuation $completion) {
boolean var1 = false;
boolean var2 = false;
boolean var3 = false;
SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation continuation = (Continuation)var4;
int var6 = false;
Integer var8 = Boxing.boxInt(100);
boolean var9 = false;
Companion var10 = Result.Companion;
boolean var11 = false;
continuation.resumeWith(Result.constructor-impl(var8));
Object var10000 = var4.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}

我们仔细观察这段代码,发现由外部传入的$completion的resume方法实际上并没有得到调用,而是使用其创建了一个SafeContinuation对象,并最终调用了SafeContinuation#resumeWith。关于SafeContinuation我们上文说过,用来保证结果的正常返回。

上述反编译代码还有一个特别的地方在于,我们写的notSuspend函数本来是没有返回值的,但编译后返回了Object。而这个Object对象是通过SafeContinuation#getOrThrow函数来获得,我们看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}

SafeContinuation#getOrThrow方法会根据resumeWith被调用后的结果来返回,除了返回结果外,它还要可能会返回COROUTINE_SUSPENDED标志位或者一个Exception。

COROUTINE_SUSPENDED标志位用来表示返回这个标志的挂起函数已经发生了事实上的挂起。什么叫事实上的挂起呢?上文我们已经说到了主要resume函数与其对应挂起函数不在同一个函数调用栈,那么就说明它发生了事实上的挂起。这里要说明的一点是,如果在调用挂起函数时,协程虽然有异步调用但实际上已经得到结果,那么这里就没有发生事实上的挂起,例如:

1
2
3
4
5
6
7
8
9
10
11
launch {
val deferred = async {
// 发起了一个网络请求
......
}
// 做了一些操作
......
deferred.await() // 假设在这里deferred已经执行完毕
// 后续的一些操作
......
}

假设我们在调用deferred.await()时,deferred对应协程已经执行完毕,那么这里就没有发生事实上的挂起。

我们来总结一下挂起函数的返回情况:

  • 同步返回。作为参数的Continuation的resumeWith不会被调用,函数的返回值就是它作为挂起函数的返回值。
  • 返回挂起标志。当挂起函数发生了事实上的挂起时,返回挂起标志表示挂起点进入挂起状态,等待异步调用结束时调用resumeWith再执行恢复操作。
  • 返回Exception。当函数执行报错,返回Exception。

协程上下文

上下文的概念很容易理解,如Android中的Context、Spring中的ApplicationContext,它们在各自的场景下主要承载了资源获取、配置管理等工作,是执行环境相关的通用数据资源的统一提供者。

前面说到,Continuation除了可以通过恢复调用来控制执行流程的异步返回外,还有一个重要的属性就是协程上下文

协程上下文的集合特征

协程上下文的数据结构特征特别明显,类似一个集合。跟创建集合一样,我们也可以创建一个空的协程上下文:

1
2
var list: List<Int> = emptyList()
var coroutineContext: CoroutineContext = EmptyCoroutineContext

EmptyCoroutineContext是标准库自带的object,里面没有数据。

类似的,我们可以往协程上下文中添加数据:

1
2
list += 0
coroutineContext += Dispatchers.IO // Dispatchers.IO实现了Element接口

我们来看下协程上下文中的元素类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>

public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null

public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)

public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}

Element定义在CoroutineContext内部,重点有两个:

  1. Element本身也实现了CoroutineContext。
  2. Element接口中有一个属性key,表示元素在上下文中的索引。

协程上下文元素的实现

Element有一个子接口AbstractCoroutineContextElement,能让我们在实现协程上下文的元素时更加方便:

1
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

这样我们只需要提供一个Key就可以创建自己的Element了,例如下面CoroutineName可以为协程绑定一个名字,CoroutineExceptionHandler可以为协程绑定一个全局错误处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineName>
}

class CoroutineExceptionHandler(val onErrorAction: (Throwable) -> Unit) :
AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

fun onError(error: Throwable) {
error.printStackTrace()
onErrorAction(error)
}
}

协程上下文的使用

把上面定义好的元素添加到协程上下文中,并将其绑定到协程上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
coroutineContext += CoroutineName("Download Coroutine")
coroutineContext += CoroutineExceptionHandler{
// ... ...
}

suspend { ... }.startCoroutine(object : Continuation<Unit> {
override val context: CoroutineContext = coroutineContext

override fun resumeWith(result: Result<Unit>) {
println(context[CoroutineName]?.name)
result.onFailure {
context[CoroutineExceptionHandler]?.onError(it)
}
}
})

可以看到我们使用对应的Key就能获取到Element,从而使用其属性/方法。

协程的拦截器

协程标准库中提供了一个叫作拦截器的组件,它允许我们拦截协程异步回调时的恢复调用。

拦截的位置

我们来看下方代码,这个过程发生了几次恢复调用呢?

1
2
3
4
5
6
suspend {
suspendFunc02("Hello", "World")
suspendFunc02("Hello", "Coroutine")
}.startCoroutine(object : Continuation<Int>) {
// ...
}

我们在启动协程时,通过恢复调用来执行协程,这是一次,而这里suspendFunc02发生了事实上的挂起,那么这里就发生了两次恢复调用。也就是说,如果协程体内发生了n次事实上的挂起操作,那么恢复调用总共会执行n+1次。这n+1次恢复执行的位置都可以通过添加拦截器来实现一些AOP操作。

拦截器的使用

我们可以通过实现ContinuationInterceptor接口来快速实现一个协程拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LogInterceptor : ContinuationInterceptor {

override val key = ContinuationInterceptor

override fun <T> interceptContinuation(continuation: Continuation<T>) =
LogContinuation(continuation)
}

class LogContinuation<T>(private val continuation: Continuation<T>) :
Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith : $result")
continuation.resumeWith(result)
println("after resumeWith.")
}
}

拦截的关键函数是interceptContinuation,根据需要返回一个新的Continuation实例来实现拦截。拦截器本身也是一个协程上下文的元素类型,因此可以直接添加到协程上下文中。拦截后的协程执行情况如下:

1
2
3
4
5
6
before resumeWith : Success(kotlin.Unit)
after resumeWith.
before resumeWith : Success(5)
after resumeWith.
before resumeWith : Success(5)
after resumeWith.

可以看到两次挂起函数的恢复调用处都执行了一次拦截。

拦截器的执行细节

在上一篇文章我们提到,SafeContinuation其内部有个delegate,我们称其为协程体。实际上这是在没有拦截器进行拦截的情况下,当添加了拦截器后,delegate就是拦截器拦截后返回的Continuation实例了,例如我们上面例子中的LogContinuation。

我们在上面分析suspendFunc02的字节码反编译代码时,提到协程体传入挂起函数的Continuation实例$completion实际上会包装成SafeContinuation,代码如下:

1
SafeContinuation(IntrinsicsKt.intercepted($completion));

$completion并没有简单的传入SafeContinuation的构造方法中,而是使用了IntrinsicsKt.intercepted($completion)的返回值,来看看调用链:

1
2
3
4
5
6
7
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this

public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

可以看到IntrinsicsKt.intercepted($completion)的返回值其实就是调用拦截器的interceptContinuation函数的返回,协程体在挂起点处先被拦截器拦截,再被SafeContinuation保护了起来。

除了打印日志外,拦截器最常见的作用就是线程调度,这个我们在后续文章中讨论。

参考