更易使用的并发编程工具——协程

协程最早诞生于1958年,被应用于汇编语言中,对它的完整定义发表于1963 年,协程是一种通过代码执行的恢复与暂停来实现协作式的多任务的程序组件。而线程的出现则要晚一些,伴随着操作系统的出现,线程大概在1967年被提出。

时代在变化,协程的内涵也在发生变化,目前有关协程的定义和分类依旧存在不少争议。

笔者认为,协程是一种在程序层面支持异步操作的抽象。与线程不同,协程不需要依赖于操作系统的线程调度,而是通过程序间的协作来实现异步操作。

在现代化编程中,为了突出协程的优点,我们可以将协程看作一种轻量级的并发执行单元,协程交由应用自身管理,其调度不依靠底层系统,更加轻量和易于使用。协程不会占用和浪费过高的系统资源,这样可以提高CPU和内存的利用效率,同时在某些高并发情况下带来更好的性能表现。

对比:协程与线程

线程(thread)是一种用来支持异步操作的抽象,一个应用可以拥有多个线程,通过实现抢占式的多任务来进行异步操作。目前操作系统底层一般使用线程模型,将线程作为由操作系统调度最小执行组件。直接使用线程模型,需要向操作系统申请线程,并交由操作系统直接调度。

协程(Coroutine)与线程的相似之处在于,协程也是一种用来支持异步操作的抽象,但是协程的切换由应用程序自身控制,可以不依赖于多线程实现异步操作。

抢占式调度,抢占式执行的任务的执行是连续的,并且在任务执行期间不会主动让出执行权。抢占式调度下,最高优先级的任务一旦就绪,总能获得CPU的执行权。这种调度方式适用于实时系统和需要快速响应的环境,比如计算密集型任务。

协作式调度,协作式执行的任务支持主动让出执行权给其他任务。任务自己规划运行时间,适用于资源紧张的环境。比如 I/O 密集型任务,可以在等待 I/O 操作时切换到其他协程,从而充分利用CPU。

对比:协程与子程序

子程序(Subroutine)是一段封装了特定功能的代码,可以通过函数调用来执行。子程序的执行是顺序的,从调用点开始执行,然后按照定义的逻辑顺序执行,直到遇到返回语句或结束标记。

协程(Coroutine)是一种特殊的程序执行方式,可以与其他协程或子程序交替执行

总结

协程是一种在程序层面支持异步操作的抽象,但不一定只被用来实现异步。使用协程就可以加简单理解为,使程序的执行权多个协程中进行流转,这需要两个关键的操作:协程的挂起和恢复

两种异步抽象

系统级的线程

在现代编程环境中,线程往往由操作系统管理,所以线程一般使系统级别的资源。大多数现代操作系统采用优先级算法和抢占式调度来管理线程。这意味着系统会根据线程的优先级来分配CPU时间,优先级高的线程会抢占CPU资源,以确保实时任务能够快速响应。

线程由系统管理,代表着线程的创建和管理需要额外的开销。操作系统必须进行一系列工作保证线程的安全和稳定运行,例如权限验证,为线程分配内存,并在适当的时候为其分配CPU时间片。

当线程因为I/O操作或其他原因被阻塞时,操作系统会执行上下文切换,将该线程的状态保存到内存中,并挂起该线程。当线程再次就绪时,系统会恢复其状态,使线程可以继续执行。线程间的上下文切换非常占用内存和浪费时间。

应用程序要尽量减少线程的数量,用更少的资源完成更多的任务。

用户级的协程

协程是应用级别的抽象,并不会被操作系统感知。

与线程相比,协程具体的调度方式有很多种,可以通过程序员的控制或应用程序的调度来进行切换。这种切换不需要操作系统的介入,因此开销比线程切换要小得多。

协程间轻量的切换不涉及操作系统级别的上下文切换,从而避免了系统线程切换的开销,因此更加轻量和高效。

协程不是银弹

协程 != 异步 && 协程 != 无阻塞

协程本身只是一种异步抽象,它可以用于实现异步操作,但并不仅限于异步操作。

如果一个协程在没有进行特殊设计的情况下进行了阻塞I/O操作,它仍然会阻塞线程。

没有协程同样可以使用多线程模型完成优秀的并发设计,但通常更加复杂和难以理解。协程主要的优点是减少了多线程编程可能带来的大量上下文切换,同时带来另一种异步任务派发机制。

典型协程实现

每种语言对协程的支持区别较大,但我们可以通过以下两种方式大致对其进行分类。

  1. 有栈与无栈(Stackful vs Stackless Coroutine),协程需要支持主动挂起和恢复来控制和切换任务。而常见的切换的方式有两种,及通过开辟相应的调用栈来保存,或使用闭包或其他数据结构进行保存。按照是否开辟相应的调用栈,我们可以将协程分为有栈协程与无栈协程。
  2. 对称协程与非对称协程(Symmetric vs Asymmetric Coroutine),根据协程之间的关系,我们又可以将协程分为对称协程与非对称协程。对称协程间是相互独立且平等的,其调度权可以在任意协程之间转移。非对称协程出让调度权的目标只能是它的调用者,即协程之间存在调用和被调用关系。实际上两者并不是完全对立的关系,对称协程的底层一般是由非对称协程实现的,而使用对称协程也可以实现非对称协程。

有栈实现

有栈协程的实现,在内存中给每个协程开辟一个栈内存,当协程挂起时会将它的运行时上下文(即栈空间)从系统栈中保存至其所分配的栈内存中,当协程恢复时会将其运行时上下文从栈内存中恢复至系统栈中(这里也存在一定的开销)。

当涉及到有栈协程的协作时,通常会使用特定的挂起函数来实现。这些调用函数后系统会保存当前协程的上下文(包括栈和寄存器状态),并切换到另一个协程的上下文。

具体的挂起函数的名称和实现方式会因编程语言、协程库或框架而异。常见的有suspend()yield()await()park()等,和直接直接操作线程挂起非常类似。

因为涉及到系统栈的操作,通常需要编译器或运行时环境的支持。

有栈协程的具体实现会应语言和库的不同产生差异,接下来我们以目前较为典型的有栈协程(Lua,Java,Golang)为例:

Lua Coroutine

Lua 是一种轻量小巧的脚本语言,支持面向过程编程和函数式编程。由于单个 Lua 虚拟机只能工作在一个线程下,所以为了支持异步编程,Lua 必须另辟蹊径,Lua 的协同程序便是为了在脚本语言中进行异步编程而诞生的。

在 Lua 语言的视角下,协同程序可以理解为一种特殊的”线程“,它可以暂停和恢复其执行,从而允许非抢占式的多任务处理。每个 Lua 协同程序都拥有独立的堆栈,独立的局部变量,独立的指令指针,同时又与其它协同程序共享全局变量和其它大部分东西。

使用 Lua 协同程序和直接操作线程很类似,但允许两段程序互相交换执行权,我们可以使用以下 API:

  1. 使用 coroutine.create 创建一个新的协同程序对象,并使用 coroutine.resume 启动它的执行。
  2. 通过调用 coroutine.yield 来主动暂停自己的执行,并将控制权交还给调用者。

具体可以通过以下一段代码来理解其使用:

1
2
3
4
5
6
7
8
9
local thread = coroutine.create(function(param)
print("foo"..param)
local value = coroutine.yield("yield")
print(tostring(value))
end)

local status, result = coroutine.resume(thread," ..with param foo")
print(result)
local status, result = coroutine.resume(thread,"bar")

当使用 coroutine.create 函数创建一个新的协程时,实际上是创建了一个独立的执行环境,包括了一份独立的栈空间。每个协程都有自己的栈,用于存储函数调用的参数、局部变量和中间结果。

当调用 coroutine.resume 函数来恢复一个协程时,实际上是将当前执行环境切换到了目标协程的执行环境。这个切换过程涉及到栈的交换。具体来说,将当前协程的栈保存起来,然后将目标协程的栈恢复到当前执行环境中,使得目标协程可以继续执行。

在协程执行的过程中,可以通过调用 coroutine.yield 函数来主动让出执行权,将当前协程挂起。这个过程也涉及到栈的交换。当协程调用 coroutine.yield 时,会将当前协程的栈保存起来,并将执行权返回给调用者。接着,可以通过调用 coroutine.resume 函数来恢复协程的执行,再次进行栈的交换,将之前保存的栈恢复到当前执行环境中。

通过 Lua 的 Coroutine ,其实我们现在已经能充分理解有栈协程的底层原理,实际上就是栈的保存与切换。

Java Virtual Thread

在理解 Java 协程之前,我们先来看一下 Java 历史上的异步的发展:

引入函数式编程后,Java 团队使用了很多的纯函数式概念对 Java 进行改造,首先便是使用函数式语言中类似的策略去对待一个非确定性的计算:如果一个非确定性的计算可以得到一个非确定性的结果,我们使用同一种类型对这种结果的类型进行控制,便可以得到一个单子。

Java 团队在 Java 8 中引入的 CompletableFuture 模仿了单子(Monad),用来处理执行的不确定性,我们可以从一个 Future 返回另一个 Future ,所有的 CompletableFuture 可以自由编排并串联组合起来,同时保持了异步操作的上下文。但有一些很明显的缺陷:

  1. 首先,这些 CompletableFuture 依旧使用线程模型并会阻塞线程。
  2. 其次,CompletableFuture 中两个操作在命令式编程来看并不算完全连续,他们拥有不同的操作栈,如果第二个 CompletableFuture 中发生异常,上一个 CompletableFuture 并不能很直观的感知到,这时依旧需要我们手动处理异步操作异常抛出后的遗留现场。
  3. 最后,CompletableFuture 是一个子程序,并不能和协程一样支持随时挂起和恢复。

Java 8 后,Java 团队使用了更多的函数式概念对 Java 进行改造,为了解决 CompletableFuture 的缺陷,Java 团队选择使用一个新的模型来代表操作,Continuation 便是 Java 团队在编程哲学上跨出的又一步。

Continuation 最初常见于于函数式语言中,是一种表示程序剩余部分的数据结构,也被称为计算续体,或者被称为一个延续,这种方法允许程序在某个点暂停执行,并在稍后的某个时刻从同一点继续执行,通常用于实现高级控制结构和异常处理。(稍后我们在讲某些无栈协程实现的时候会讲到)

在命令式编程中模拟函数式编程的 continuation 确实可以通过使用栈来保存和恢复状态来实现。

在 Java 中,Continuation 类可以看作一个可以支持实现的协程的子结构,每个延续都可以在适当时机挂起和恢复。

Continuation 类通过提供一种方式来保存和恢复执行状态(通过类似于之前讲的类似 Lua 有栈协程的机制),使得可以实现一次性多分段截断延续(One-Shot multi-prompt delimited continuations),即:

  • One-Shot:表示一个 continuation 只能被调用一次。一旦 continuation 被调用,它就会被消耗掉,不能再次使用。
  • Multi-prompt:表示支持多个 continuation point(即 prompt)。通常情况下,continuation 是从某个程序点(prompt)开始计算的。
  • Delimited continuations:是一种 continuation 的抽象,它只包含了从当前程序点开始的计算上下文,而不是整个程序的计算状态。这种限定的 continuation 在某个程序点之前截断了控制流,只允许在该点之后的计算进行。

实际上,当我们每次开始执行一段逻辑时,都会创建一个新的延续,并在被挂起时将当前执行栈保存下来,当恢复时,会从之前保存的执行栈位置继续执行。这种机制使得 Java 能够实现轻量级的线程切换,而不需要操作系统级别的线程切换开销。当执行完毕后当前,最终虚拟线程任务执行完成,标记 Continuation 终结,标记虚拟线程为终结状态,清空一些上下文变量,运载线程返还到调度器(线程池)中作为平台线程等待处理下一个任务。

Continuation 类是底层的结构,被封装在虚拟线程内部,不允许外部直接访问,并通过 Scheduler (一般是一个使用 Fork Join Pool 的 Executor) 作为调度器挂载到平台线程完成执行,这意味着虚拟线程使用了抢占式调度。虚拟线程是 Continuation 的封装,可以被归类到有栈和对称协程。

当一个虚拟线程需要挂起时,它会将当前的执行上下文保存,并将 Continuation 交给调度器(这里的 Continuation 可能是一个新的,也可能被复用,取决于具体的实现和性能优化需求)。当恢复执行时,调度器会将之前保存的 Continuation 恢复,并将执行权交还给相应的虚拟线程。

虚拟线程为我们带来了以下几种可能的未来:

  1. 从一个方面来讲,所有阻塞操作全部堆栈连续,并且前后异步进行,阻塞不会占用底层线程时间,不会产生上下文切换。
  2. 从另一个方面讲,阻塞的代价被减小了一个数量级,这意味着阻塞几乎不可能成为性能瓶颈。就算使用原有的线程阻塞模型的企业级服务器,也可以获得较大的性能提升。
  3. 最后,Continuation 类允许我们模拟函数式编程构造出各种真正的流式处理,诞生真正的异步流,提供给数据绑定等功能。

Golang Goroutine

Go 语言中,所有的程序都直接运行在协程上,使用go关键字可以简单地开启一个 Goroutine,由 Go 运行时进行管理。

Goroutine 是一种轻量级的线程,Go 运行时采用了 M:N 调度模型,将M个用户级线程映射到 N 个内核级线程,使得Go语言能够高效地进行并发处理。由于脱离了系统管理,Goroutine的创建和销毁成本非常低。

每个 Goroutine 都有自己的栈空间,用于保存调用栈。当 Goroutine 被执行时,会将其栈恢复到系统栈上执行;而在挂起时,会将当前的调用栈保存到 Goroutine 上,并切换到下一个 Goroutine 执行。

协程间可以通过通道传递执行结果,也可以使用无缓冲的 Channel 实现类似的挂起操作。

调度方面,Go 采用了 GMP 模型,其中 M(Machine)代表 Go 语言运行时系统中的线程,负责执行 Goroutine ,P(Processor)是用于执行Goroutine 的资源池,负责管理和调度 Goroutine 的执行。

在 GMP 模型中,P 的数量通常由 Go 运行时根据可用的 CPU 核心数来设置。每个 P 都会尝试维护一个本地的 Goroutine 队列,而 M 则从这个队列中取出 Goroutine 来执行。如果一个 M 没有关联的 P 或者它的本地队列为空,它会尝试从其他 P 的队列中偷取 Goroutine 来执行。(与 Java 封装好的 Fork Join Pool 非常类似,是更加轻量和底层的实现)

无栈实现

无栈协程通常使用特定的语法标记,由协程自身显式地让出执行权实现协作,不依赖于调用栈的概念。

无栈协程通过状态机,闭包等数据结构,保存协程的执行状态(包括寄存器值等)来实现协程的挂起和恢复,其挂起和恢复操作完全在代码层面完成。这样可以减少内存消耗和协程创建销毁的开销,并减少了内存占用和栈溢出的风险。由于没有独立的栈空间,协程之间的数据共享和状态管理可能更加复杂。

不同的无栈协程实现的差别也很大,接下来我们举几个例子无栈协程的例子(Python,JavaScript,C#,Kotlin):

Python

在 Python 中,一种简单的协程是生成器。当使用 yield 语句时,函数会返回一个生成器。

1
2
3
4
5
6
7
8
def generat():
for i in range(1,10):
yield i

gen = generat()

for i in range(1,10):
print(next(gen))

当生成器函数执行到 yield 语句时,会返回一个值给调用者,并在将执行权交还给调用者后挂起。生成器的状态保存在生成器对象中,包括当前的执行位置、局部变量等。当调用 next() 时,生成器会将之前保存的状态重新加载到当前执行环境中,并从暂停的位置继续执行,直到再次挂起或执行结束。

生成器是一种能够产生多个值的迭代器对象。生成器是一种特殊的迭代器对象,它可以产生多个值,并且在生成值的过程中可以挂起和恢复,这使得它非常适合实现协程和状态机的逻辑。

Python 3.5 后,Python基于 Event Loop 实现了自己的 async/await 语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def async_operation():
print("async_operation start")
await asyncio.sleep(1)
print("await & async_operation end")
return 1

async def main():
result = await async_operation()
print("await res is", result)

asyncio.run(main()) #run with event loop

首先我们通过 asyncio 引入事件循环,我们可以使用 async def 返回一个协程对象,然后使用 await 等待,await会将协程对象的调度和自身的控制权交给事件循环,当任务完成,事件循环会恢复 await 后续逻辑的执行。

Python 中 await 是用于等待异步可等待对象的完成,这里的可等待对象指 Future 对象、Task 对象、协程对象或其他实现了异步协议的对象。

这里定义的协程对象本身包含了协程的定义和执行逻辑,并且保存了挂起点的相关信息以恢复执行。其存在不依赖于状态机的概念,它只是一个用于表示异步任务的对象,包含了协程的定义和执行逻辑。当事件循环调度执行一个协程对象时,它会直接执行协程对象所对应的异步函数,并根据并根据 await 的出现来决定是否挂起执行。

JavaScript

与Python非常类似,但是出现相较Python更早,首先来看生成器语法。

1
2
3
4
5
6
7
8
9
const iterator = (function* () {
for (let index = 0; index < 10; index++) {
yield index;
}
})();

for (let index = 0; index <= 10; index++) {
console.log(iterator.next());
}

首先使用 function* () { … } 定义生成器函数,该函数返回一个生成器对象,函数内部通过 yield 返回值并挂起,然后通过生成器的next()方法恢复。

除了 yield 以外,在 ES6 后,JavaScript 定义了 async/await 语法。但在了解 JavaScript 中的 async/await 前,我们需要先了解一个异步编程工具 Promise。

Promise 是一个常见的异步编程工具,用于表示一个异步操作的最终完成 (或失败),及其结果值。与 Java 的 CompletableFuture 非常类似,支持代表一个操作并返回一个结果,同时支持将操作串联起来,所以我们可以很方便的通过回调派发任务,JavaScript 运行时可以借助事件循环最为调度器实现异步。

最简单的写法便是串联回调,在 async/await 语法推出前,实际上我们都是按照以下标准书写代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
function asyncOperation() {
console.log("asyncOperation start");
return new Promise((resolve) => setTimeout(resolve, 1000));
}

asyncOperation()
.then(() => {
console.log("await & asyncOperation end");
return 1;
})
.then((result) => {
console.log("await res is", result);
});

在 ES6 后,定义了 async/await 语法,我们可以使用 async function(){ … } 定义异步函数,该函数返回一个被包装好的 Promise 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
async function asyncOperation() {
console.log("asyncOperation start")
await new Promise(resolve => setTimeout(resolve, 1000))
console.log("await & asyncOperation end")
return 1
}

async function main() {
const result = await asyncOperation()
console.log("await res is", result)
}

main();

以上代码可以在运行前被编译为(如果有多个 await ,会在 Promise 后进行多个 then 语句的串联):

1
2
3
4
5
6
7
async function asyncOperation() {
console.log("asyncOperation start");
return new Promise(resolve => setTimeout(resolve, 1000)).then(() => {
console.log("await & asyncOperation end");
return 1;
});
}
1
2
3
4
5
async function main() {
return asyncOperation().then(result => {
console.log("await res is", result);
});
}

从语义层面来看,运行时,遇到 await 语句,函数会暂停执行,并等待 Promise 对象的解决。一旦 Promise 对象解决,函数会恢复执行,并返回 Promise 对象的解决值,这样便实现了代码层面类似于暂停和恢复的操作。

这样通过编译,将剩余代码注册为 await 的回调,避免大量手写回调造成回调地狱,并在原本的代码层面,实现了暂停和恢复操作。回到我们最初对协程的定义来看,代码确实在这里实现了挂起和恢复两个操作,所以 async/await 也可以被看作是一种协程。

直接依靠编译和状态机完成异步,使得暂停和挂起具有一定的传染性:由于 await 的存在会改变整个异步方法的执行流程,在 await 后的代码,都会在异步操作执行完毕之前被挂起,这些代码都会受到 await 的影响。

JavaScript 对 async/await 的支持非常轻量,使其在使用上更像是一种语法糖,但这种风格大大减少了回调次数,值得推广。

C#

C# 最早开始使用 async 和 await 模型,使用 async 修饰函数表示异步函数,返回一个对异步结果的封装。await 代表挂起,并在异步结果返回后恢复执行。

与其他使用 async/await 的概念类似,C# 中使用 async 标注的方法被称为异步方法。

编译时,编译器会将包含 await 的异步方法进行转换,将其转换为状态机,其中每个 await 语句都会成为状态机中的一个状态,具体转换过程如下:

  1. 在异步方法中遇到第一个 await 语句时,编译器会生成一个状态机类,并将当前方法的剩余部分作为该状态机类的一个方法(通常是 MoveNext() 方法)的一部分。
  2. 编译器将在 await 语句之前的代码生成为状态机类的构造函数。每个 await 语句后面的代码都会成为状态机类的一个状态,当执行到某个状态时,状态机会在异步操作完成后继续执行下一个状态。

通过状态机的方式,编译器能够以一种可靠和高效的方式管理异步方法的执行流程,使得多个异步操作能够按顺序执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using System;
using System.Net.Http;
using System.Threading.Tasks;

class Program
{
static async Task Main(string[] args)
{
string result = await GetWebContentAsync("http://example.com");
Console.WriteLine(result);
}

static async Task<string> GetWebContentAsync(string url)
{
//...
}
}

接下来编译器会将其变为以下状态机的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
using System;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

class Program
{
static Task Main(string[] args)
{
return new Maind__async(0).MoveNext();
}

class Maind__async : IAsyncStateMachine
{
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
private TaskAwaiter<string> <>u__1; // awaiter, used to save the async task

public Maind__async(int state)
{
<>1__state = state;
<>t__builder = AsyncTaskMethodBuilder.Create();
}

public void MoveNext()
{
try
{
if (<>1__state == -1) // start
{
Task<string> task = GetWebContentAsync("http://example.com");
if (!task.IsCompleted)
{
<>1__state = 0; // set next state
<>u__1 = task.GetAwaiter();
<>t__builder.AwaitUnsafeOnCompleted(ref <>u__1, ref this);
return;
}
}
else if (<>1__state == 0) // resume
{
<>1__state = -2; // set state end
}

string result = <>u__1.GetResult(); // get async result
Console.WriteLine(result);
}
catch (Exception e)
{
<>1__state = -2;
<>t__builder.SetException(e);
return;
}
<>1__state = -2;
<>t__builder.SetResult();
}

public void SetStateMachine(IAsyncStateMachine stateMachine) { }
}

static async Task<string> GetWebContentAsync(string url)
{
//...
}
}

上述代码是反编译的结果(很难看懂),但其中代码的大致执行过程如下:

  1. 调用协程方法:当主线程调用一个协程方法后,就会进入编译生成的状态机,将任务交给协程调度器,并保存局部变量等状态信息。然后,主线程会直接跳出协程方法,继续执行剩余的代码。
  2. 执行任务:调度器负责调度并执行状态机某状态下的代码片段,
  3. 执行完毕跳转状态:当前一个状态内的代码段执行完毕后,状态机跳转到下一个状态,将下一次任务交给协程调度器。
  4. 下一次执行开始前进行检查:检查上一次执行的结果,如果没有异常,就通过之前保存的局部变量等状态信息恢复现场,并继续执行下一个状态或结束状态机(下一状态跳转到 2 或 5)
  5. 结束挂起状态并返回结果:结束状态机后,检查是否发生异常并返回结果。

最终,所有的操作最终被交由调度器管理,实际上 async/await 代码可能运行在两个不同的线程上,相当于派发了一个异步操作。为了解决两个线程内堆栈不连续的问题,C# 人为地生成一个堆栈跟踪,用以捕获实际上下文,这样另外一个线程内发生的异常依旧直接使用 try/catch 捕获到。

Kotlin

Kotlin 使用 suspend 修饰函数,并通过编译和框架来共同完成协程的实现,我们可以将 Kotlin 协程理解为一个完整的协程框架,Kotlin 为了兼容不同平台,为各种平台提供了丰富的协程底层实现。

在 Kotlin 中,协程通过 suspend 函数来定义一个挂起函数,它标志着一个挂起点,表示在该点可以挂起执行(实际是否挂起是由Kotlin决定的)。当确定挂起时,Kotlin 的编译器会对协程进行 CPS 变换,将代码转换为基于在方法中传入延续的形式。

在 CPS 变换中,函数的执行结果传递给延续函数,协程在执行到某个挂起点时,会将控制权交给其他协程或主程序,并注册一个延续函数,当协程被恢复时,延续函数会被调用,协程继续执行。

Kotlin 实际上是一门函数式风格比较强烈的语言,使用了非常多函数式的概念来实现其底层类库。

如果你对纯函数式语言非常熟悉,就应该知道,CPS 变换实际上是支持 first-class continuation 从而实现 call/cc(call with current continuation)的一种方式,而 call/cc 允许程序保存当前的计算状态(continuation),这样就可以在未来的某个时刻恢复和继续执行。通过这种方式,就可以模拟挂起和恢复,从而实现协程。除此之外,continuation 还可以用来实现高级控制结构,例如异常处理和回溯。

Java 中也有类似的无栈协程框架,比如 Quasar 框架,实际上就是利用 CPS 技术和字节码增强技术,先对代码进行 CPS 转换得到中间代码,然后对经过 CPS 转换的代码进行修改,以插入必要的字节码指令和数据结构,从而实现协程的状态管理、调度和异常处理等功能,所有协程会交由某个调度器进行管理。

举一个例子让我们更好的理解什么是CPS变换:

1
2
3
4
5
6
7
8
suspend fun add(a: Int, b: Int): Int {
return a + b
}

fun main(){
val result = add(2,3)
println("The result is: $result")
}

我们将 Continuation 作为函数类型抽象,然后传递剩余的计算作为其延续,并将延续作为参数的一部分进行传递:

1
2
3
4
5
6
7
8
9
10
11
12
typealias Continuation<T> = (T) -> Unit

fun add(a: Int, b: Int, cont: Continuation<Int>) {
val result = a + b
cont(result)
}

fun main(){
add(2,3) { result ->
println("The result is: $result")
}
}

以上代码实际上已经是CPS变换的结果了,我们将 Continuation 作为结尾的参数传入函数,并在最后调用。

但实际上为了实现一个协程框架,只是用 CPS 变换是远远不够的,我们需要的最终结果是延续可以交由调度器进行调度,为了达到这样的效果,Kotlin 首先自己实现了 CPS 变换结果中的 Continuation:

1
2
3
4
5
@SinceKotlin("1.3")
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}

调用与之关联的 Continuation 对象的 resumeWith() ,就可以使挂起函数恢复执行,并将结果传递给调用方。实际上我们创建的挂起函数最后会被编译为:

1
2
3
4
fun add(a: Int, b: Int, cont: Continuation<Int>) {
val result = a + b
cont.resumeWith(result)
}

这里的剩余的计算部分会被编译为一个 lambda,并作为 Continuation 的实现传递进来,实际上的效果和上述相同。

同 C# 一样,如果有多个挂起点,就会有多个延续,Kotlin 也会在此之上将其编译并状态机用以更好的控制程序流程。

最后,也是整个链路中最核心的一部分,Kotlin 要求我们必须在协程构建器内调用 suspend 函数,因为 Kotlin 会在协程构建器内拦截所有的 Continuation 并将其交给协程调度器调度,Continuation 作为编译结果可以与底层框架结合起来,共同完成剩余的调度过程。

调度器的具体实现我们可以自己选择,根据平台的不同我们可以拥有不同的实现。(例如,在安卓平台我们可以利用安卓提供的调度模式以获取最好的性能收益,或者可以将调度交由 Event Loop 实现,此时协程摇身一变,成为了响应式)

1
2
3
4
5
6
7
8
9
10
11
12
13
import kotlinx.coroutines.*

fun main() {
runBlocking {
CoroutineScope(Dispatchers.Default).launch {
test()
}.join()
}
}

suspend fun test(){
delay(1000)
}

总结一下,我们可以将 Kotlin 协程理解为一个完整的协程框架,由以下部分实现:

  1. suspend,编译器进行 CPS 处理。
  2. 协程构建器,例如 launch、async 等,用于创建协程。这些构建器函数会接收一个挂起函数作为参数,并创建一个协程来执行该挂起函数。
  3. 协程调度器,用于调度和管理协程的执行。通过调度器,可以将协程的执行从一个线程切换到另一个线程,从而实现线程的灵活管理。

无栈协程的实现更加轻量,但在底层需要配合特殊的实现,例如特定的异步实现进行承接才能得到性能的提升,如果直接在无栈协程上使用含有大量阻塞 I/O 的实现,大部分基于编译时期实际上更加类似于语法糖的协程实现,依旧会阻塞线程,使得协程无法发挥减低线程开销的作用。

总结

  1. 协程是一种用来实现异步的抽象,但其实现不一定只用来实现异步。
  2. 需要进行协程之间的跳转来实现异步,而跳转的实现主要依靠两个操作:挂起和恢复。
  3. 我们可以将协程按照实现挂起和恢复底层的原理,分为有栈和无栈协程。
  4. 协程的对称与非对称不是绝对的,可以利用语言特性互相转化和实现。
  5. 由于不同语言需要协程的目的不同,且平台生态和能力有较大差别,所以对协程的实现差别较大。

附录

结构化并发

和结构化编程类似,利用结构化并发来组织并发任务的执行,使得并发编程和异步任务处理更加可控、可组织和可靠。从而避免非结构化的,例如:滥用回调函数,直接使用线程抽象等,带来的类似 goto 的问题。

常见的async/await,suspend,go/channel,consumer/producer,yield,Future/Promise,Task,CoroutineScope 等抽象可以在任务之间建立明确的调用者关系,从而更好地控制执行流程,支持我们建立父子任务间的关系,更方便地实现并行执行,支持和传递异常或错误,为结构化并发提供了基础。

单子

在函数式编程中,”单子”(Monad)是一种代表计算过程的抽象概念。它被广泛应用于处理非确定性计算、副作用、异步操作等场景。下面解释为什么单子被用来处理非确定性的计算:

  1. 抽象非确定性: 在非确定性的计算中,结果可能不止一个,并且计算的顺序和结果可能取决于外部因素(例如并行计算或随机性)。单子提供了一种抽象,能够描述这种非确定性计算的结构和行为,使得代码更加清晰和可维护。
  2. 提供了组合操作: 单子通过定义一组操作(例如mapflatMap等),使得对计算过程进行组合和转换变得更加容易。通过这些组合操作,可以将复杂的非确定性计算拆分为简单的组件,并且可以灵活地组合这些组件来构建更复杂的计算过程。
  3. 处理副作用: 在函数式编程中,副作用(side effect)是指与函数执行结果无关的行为,例如修改全局变量、打印输出等。单子可以用来封装具有副作用的计算,从而使得程序更加纯粹(pure),避免了副作用对程序的影响。
  4. 错误处理: 单子还可以用来处理错误。通过定义适当的单子类型和操作,可以使得错误处理变得更加统一和方便。例如,Either单子可以表示计算结果要么是一个值,要么是一个错误。