根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。
从逻辑上来说,协程和线程的主要区别,在于协程是协作式处理多任务,而线程是抢占式处理多任务。协程之间的切换一般不涉及系统调用,在用户态就可以完成。
在Golang中,使用go
关键字,可以将函数立即创建为一个goroutine
。例如main.go文件内容如下
package main
func add(a, b int64) (int64, int64) {
var tmp int64 = 1
tmp = tmp + a
return a + b, a - b
}
func main() {
var c int64 = 10
var d int64 = 12
go add(c, d)
}
使用go tool
命令,可以其编译为汇编代码(部分前面的汇编与主题无关,略去),进一步查看go
关键字底层的实现机制。
go tool compile -N -l -S main.go
SP
是栈指针寄存器,一般指向局部调用栈的栈顶,也可以用来在函数调用时传参。SB
是静态区寄存器,用来获取函数指针。函数调用或创建。通过SUBQ
指令修改SP
的值,分配新的栈空间,通过ADDQ
指令修改SP
的值,回收或释放栈空间。
0x001d 00029 (main.go:10) MOVQ $10, "".c+56(SP)
0x0026 00038 (main.go:11) MOVQ $12, "".d+48(SP)
0x002f 00047 (main.go:12) MOVL $32, (SP);
0x0036 00054 (main.go:12) LEAQ "".add·f(SB), AX;将函数指针保存到AX寄存器
0x003d 00061 (main.go:12) MOVQ AX, 8(SP);将AX寄存器保存到新分配的8字节
0x0042 00066 (main.go:12) MOVQ "".c+56(SP), AX;将参数c保存到AX寄存器
0x0047 00071 (main.go:12) MOVQ AX, 16(SP);将AX寄存器保存到新分配的8字节
0x004c 00076 (main.go:12) MOVQ $12, 24(SP);将参数d保存到新分配的8字节
0x0055 00085 (main.go:12) PCDATA $1, $0;和GC相关,可忽略
0x0055 00085 (main.go:12) CALL runtime.newproc(SB);!!此处创建goroutine
0x005a 00090 (main.go:13) MOVQ 64(SP), BP;一般是接收返回值
0x005f 00095 (main.go:13) ADDQ $72, SP;回收main本地栈空间
0x0063 00099 (main.go:13) RET
可以看到源码12行的go
关键字,实际调用了runtime
的newproc
函数。
而在newproc
函数内部,先在stack
上分配了一段连续的栈空间(通常是2KB,栈帧最小值),也可以叫做栈帧(stackframe),将通过SP
传入的参数值拷贝到这个空间中,且这个空间仅属于这个goroutine
(也就是GPM模型里的g)。然后把goroutine
加入到执行队列中,供调度器去调度执行。
以上整个过程作为一个函数,运行在system stack
上,可简化视作直接调用函数。
关于函数——func systemstack(fn func()) systemstack is being called from the limited stack of an ordinary goroutine. In this case, systemstack switches to the per-OS-thread stack, calls fn, and switches back.
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
如果goroutine
执行过程中,预先分配的栈空间不足,那么会分配更大的一块栈空间,并将旧的栈内容完全拷贝到新的空间中去,栈里的内容不会被其他goroutine
共享。
// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g
我们可以在add
函数中,增加以下几行代码,可以打印goroutine
调用堆栈的信息。
var (
buf [256]byte
n = runtime.Stack(buf[:], false)
stk = string(buf[:n])
)
println(stk)
输出内容内,可以看到包含了传入参数的值10和12,以及add
函数指针的值,另外,也可以通过这种方法获取goroutine
的ID。
goroutine 5 [running]:
main.add(0xa, 0xc, 0x1068800, 0xc00001c0b8)
main.go:11 +0x69
有栈协程的好处,由于栈帧可以直接完全保存运行期上下文(主要是寄存器值),因此可以在任何时刻暂停协程的运行,这就很方便地支持了抢占式的调度器。而无栈协程的上下文是一般通过类似结构体的方式保存在内存中,它依赖使用者显式地切换协程,否则协程不会主动让出执行权。
另外,有栈协程更方便将同步代码改造为异步代码,就像我们的例子一样,只需改动一行,加上go
关键字就可以了。而无栈协程,同步改造为异步则更为复杂,甚至会导致牵一发动全身(async关键字扩散问题)。
既然已经有了有栈协程,那么无栈协程是否还有优势呢。答案肯定的!
通常,无栈协程在内存空间和协程上下文切换的效率更高。值得说明的是,无栈协程并不是说不需要运行时的栈空间,而是和协程的创建者共用同一块运行时的栈空间。
如果一定要用一句话概括无栈协程,那就是:无栈协程可以看做是有状态的函数(generator),每次执行时会根据当前的状态和输入参数,得到(generate)输出,但不一定为最终结果。
在Rust中,async fn
用来定义一种可以在执行中暂停的函数,通过await将控制权转移给runtime,等一段时间之后被重新唤醒执行。
调用async fn
所产生的返回值被包在Future
中。但与其他语言不同,直接调用async fn
,异步函数不会立即被调度器调度执行,只有调用方通过future.await
才能实际触发async fn
的执行,并拿到结果。
对于Rust普通业务开发来说,一般来说只需要使用底层库提供的异步API,结合async
和await
关键字,就可以实现程序整体的异步化。
如果要搞懂异步API的实现,不断优化程序性能,那么理解Rust异步的实现机制,就是必不可少的了。
运行测试代码前,先添加如下依赖
[dependencies]
futures = "0.3"
tokio = { version = "0.3", features = ["full"] }
以及导入依赖模块
use futures::Future;
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
Arc,
};
use tokio::runtime::Runtime;
use futures::task::AtomicWaker;
use std::task::Context;
use std::task::Poll;
use std::{thread, time::Duration};
我们首先定义一个结构体Test
,用来模拟计算的过程。
struct Test {
waker: Arc<AtomicWaker>, // 用来通知Executor执行poll
result: AtomicU64, // 用来暂存每次运行的中间结果
signal: Arc<AtomicBool>, // 用来模拟事件消息
}
提到Rust协程,首先需要介绍的是Future
Trait,它是理解协程整个执行过程的关键。对于Golang使用者来说,可以将Trait看做Golang里的interface,Trait包含可以被实现的方法。
Future
Trait仅定义了一个方法,那就是poll
,顾名思义,这个方法可以被调用很多次。Future
Trait可以看做是计算过程的抽象。调用poll
方法意味着:异步计算过程由于某种原因暂停,或满足执行条件继续执行,直至计算出最终的结果。
poll
方法的签名如下,Self
可以是用户自定义的结构体,用来保存每一次调用poll
方法计算的结果。
Context
目前仅仅是对Waker
的封装,Waker
是唤起再次执行任务的结构体,后面会详细介绍。
poll
方法返回的Poll
是一个枚举值,表示本次调用后,当前协程需要暂停等待(Pending),还是已经完成(Ready)。
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
我们的Test
实现了Future
Trait,代码如下:
impl Future for Test {
type Output = u64;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.result.load(SeqCst);
self.waker.register(cx.waker());
println!("poll called {}", result);
self.result.store(result + 1, SeqCst);
if self.signal.load(SeqCst) {
println!("poll ready");
Poll::Ready(result)
} else {
println!("poll not ready");
Poll::Pending
}
}
}
既然每次执行poll
,返回值是个枚举值,那么把协程的执行,看做状态机的状态转换过程,是不是也是自然而然了呢?答案是肯定的。
现在,我们对于Future
有了明确的概念 ,那么一个协程是如何在暂停后被唤醒的?该Waker
发挥它的作用了。
和Future
不同,Future
是一个Trait,抽象了异步的计算过程,对不同的异步执行场景,它的实现不同。而Waker
的功能就简单多了,对一些系统资源(fs, timer, channel, socket),它可以作为回调会被注册到系统的事件循环(event loop),每次事件产生后,Waker
负责告诉调度器:我收到了一个事件通知,这个Future
此时需要执行下(poll)。也可以使用代码驱动协程执行,我们的例子中就是这样实现的。
fn main() {
let a = AtomicU64::new(0);
let b = Arc::new(AtomicBool::new(false));
let waker = Arc::new(AtomicWaker::new());
let waker_clone = waker.clone();
let c = b.clone();
thread::spawn(move || {
let t = Test {
waker: waker,
result: a,
signal: b,
};
let rt = Runtime::new().unwrap();
rt.spawn(async {
let result = t.await;
println!("finally result is {}", result);
});
thread::sleep(Duration::from_secs(10));
println!("thread runtime exited");
});
thread::sleep(Duration::from_secs(2));
c.store(true, SeqCst);
println!("notify to poll");
waker_clone.wake();
thread::sleep(Duration::from_secs(2));
}
首先,我们要在实现Future
的结构体中创建一个AtomicWaker
,它的内部其实是一个自旋锁,每次调用register
其实都会更新内部Waker
。
waker: Arc<AtomicWaker>
每次调用poll
时,我们都要调用register
将Context入参中的Waker
取出,保存在waker
字段中。
self.waker.register(cx.waker());
这样,我们在main thread中,就可以通过waker
字段保存的值,唤醒Executor
再次执行任务。
let waker_clone = waker.clone();
waker_clone.wake();
我们不难发现,wake
方法执行的操作是和Executor
实现有关的,而RawWaker
是Rust标准库中就包括的结构体,且不是Trait,这是怎么做到的?答案是:通过虚拟函数指针表实现(Virtual function pointer table),简而言之就是结构体内保存的不是具体实现,而是函数指针,由不同的Executor
进行初始化创建。
// RawWakerVTable用来定义函数指针表的字段
clone: unsafe fn(*const ()) -> RawWaker
wake: unsafe fn(*const ())
wake_by_ref: unsafe fn(*const ())
drop: unsafe fn(*const ())
在tokio库中,初始化这个函数指针表,而当我们实际调用wake
的时候,调用的其实是tokio
库中的实现,
RawWakerVTable::new(
clone_arc_raw::<W>,
wake_arc_raw::<W>,
wake_by_ref_arc_raw::<W>,
drop_arc_raw::<W>,
)
当事件产生后或满足执行条件,wake
会导致Executor
去尝试执行这个异步函数(poll)。Executor
一般来说有如下几种实现思路:
Waker
修改一个全局的原子布尔值(AtomicBool)表示Executor
当前是否可执行,这种方法的缺点是Executor
同时只能运行最多一个异步函数,一般用于嵌入式平台上。Executor
使用一个map保存全部待执行的异步函数,每个函数对应一个Task ID
,Waker
将可执行的Task ID
发送给Executor
,Executor
就可以执行对应的异步函数了。Executor
使用一个或多个队列保存待执行的任务,Waker
同时是指向某个任务的带引用计数功能的指针,当Waker
发现任务可执行时,将自身放入Executor
的执行队列中即可。通常采用第三种方式实现。例如tokio
库,实现是这样的:
//harness.rs
pub(super) fn wake_by_ref(&self) {
if self.header().state.transition_to_notified() {
self.core().schedule(Notified(self.to_task()));
}
}
可以看到,wake
调用后,任务便进入了调度,Scheduler
分配线程后,由Executor
执行。
Rust的Future
仅仅定义了异步计算过程,还需要一个调度执行的角色——它实际驱动着整个异步计算流程进行。这个角色就叫做Executor
。
Executor
每时每刻可能要执行成千上万个异步任务 ,因此它需要队列来管理这些任务,使用一个/多个线程执行这些就绪的任务。
当调用Runtime
传入Future
时,tokio库会先将Future
封装成一个Task
,然后放入内部的全局队列。
// 向Runtime提交一个新的Future
let rt = Runtime::new().unwrap();
rt.spawn(async {
let result = t.await;
println!("finally result is {}", result);
});
// Runtime全局队列字段
inject: queue::Inject<Arc<Worker>>
然后,查看当前有几个空闲线程,找出一个空闲线程执行
// 查找空闲线程的过程
fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
}
}
而unpark
调用的实际上是parking_lot
这个库,这个库会唤醒等待的线程
fn unpark_condvar(&self) {
drop(self.mutex.lock());
self.condvar.notify_one()
}
线程启动后,从本地的工作队列获取任务,开始执行,如果本地工作队列没有任务,那么可以从其他线程的执行队列中偷取任务。
fn run( & self, mut core: Box < Core > ) - > RunResult {
while !core.is_shutdown {
// Increment the tick
core.tick();
// Run maintenance, if needed
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task( & self.worker) {
core = self.run_task(task, core) ? ;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work( & self.worker) {
core = self.run_task(task, core) ? ;
} else {
// Wait for work
core = self.park(core);
}
}
// Signal shutdown
self.worker.shared.shutdown(core, self.worker.clone());
Err(())
}
可以看到,Executor
核心逻辑其实并不复杂,单纯从队列中获取Task
然后执行,不过加入了一些从其他线程队列中偷取任务的逻辑。用户也可以实现自己的Executor。
三年前开始接触Golang开发,很快就被Golang简洁、清晰的开发模式所吸引。曾经Java里并发编程的繁琐过程,此刻只需要一个go
关键字就能代替。goroutine这种有栈协程的设计,最大程度模仿了OS Thread的执行过程,对开发人员非常友好。美中不足的是,runtime为goroutine的实现封装了大量实现细节,而这些细节对于不了解runtime的使用者来说,是很难修改和调试的。
Rust和Golang类似,也是一门非常年轻的语言,但它遵循的原则是zero-cost abstraction,目的是最终生成安全高效的程序。为了实现这个目的,Rust既有编译器的严格检查、无runtime、无GC的设计,又有对于标准库范围的严格限制。Rust并发编程的实现思路,也充分体现着上述的特点:
Future
Trait,但Executor
Reactor
Scheduler
交给第三方库实现。Golang和Rust现在都在快速发展当中,它们虽然设计思想上大相径庭,但追求更易用、更安全、更高性能的初心,是不会变的。
https://www.programmersought.com/article/8537662156/
http://www.mit.edu/afs.new/sipb/project/golang/doc/asm.html
https://golang.org/src/runtime/stubs.go
https://mthli.xyz/stackful-stackless/
https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/
https://samsartor.dev/coroutines-1/
https://rust-lang.github.io/async-book/02_execution/01_chapter.html
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。