前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈Rust和Golang协程设计

浅谈Rust和Golang协程设计

原创
作者头像
云云众生
修改2020-12-04 16:32:05
7.8K1
修改2020-12-04 16:32:05
举报
文章被收录于专栏:巫山跬步

浅谈Rust和Golang协程设计

go_rust.jpg
go_rust.jpg

何为有栈协程——以goroutine为例

根据维基百科的定义,协程,是指在非抢占式地处理多任务场景下,用于生成子程序的计算机程序组件,它允许在执行过程中被暂停或恢复。

从逻辑上来说,协程和线程的主要区别,在于协程是协作式处理多任务,而线程是抢占式处理多任务。协程之间的切换一般不涉及系统调用,在用户态就可以完成。

在Golang中,使用go关键字,可以将函数立即创建为一个goroutine。例如main.go文件内容如下

代码语言:txt
复制
package main

func add(a, b int64) (int64, int64) {
	var tmp int64 = 1
	tmp = tmp + a
	return a + b, a - b
}

func main() {
	var c int64 = 10
	var d int64 = 12
	go add(c, d)
}

使用go tool命令,可以其编译为汇编代码(部分前面的汇编与主题无关,略去),进一步查看go关键字底层的实现机制。

代码语言:txt
复制
go tool compile -N -l -S main.go

SP是栈指针寄存器,一般指向局部调用栈的栈顶,也可以用来在函数调用时传参。SB是静态区寄存器,用来获取函数指针。函数调用或创建。通过SUBQ指令修改SP的值,分配新的栈空间,通过ADDQ指令修改SP的值,回收或释放栈空间。

代码语言:txt
复制
        0x001d 00029 (main.go:10)       MOVQ    $10, "".c+56(SP)
        0x0026 00038 (main.go:11)       MOVQ    $12, "".d+48(SP)
        0x002f 00047 (main.go:12)       MOVL    $32, (SP);
        0x0036 00054 (main.go:12)       LEAQ    "".add·f(SB), AX;将函数指针保存到AX寄存器
        0x003d 00061 (main.go:12)       MOVQ    AX, 8(SP);将AX寄存器保存到新分配的8字节
        0x0042 00066 (main.go:12)       MOVQ    "".c+56(SP), AX;将参数c保存到AX寄存器
        0x0047 00071 (main.go:12)       MOVQ    AX, 16(SP);将AX寄存器保存到新分配的8字节
        0x004c 00076 (main.go:12)       MOVQ    $12, 24(SP);将参数d保存到新分配的8字节
        0x0055 00085 (main.go:12)       PCDATA  $1, $0;和GC相关,可忽略
        0x0055 00085 (main.go:12)       CALL    runtime.newproc(SB);!!此处创建goroutine
        0x005a 00090 (main.go:13)       MOVQ    64(SP), BP;一般是接收返回值
        0x005f 00095 (main.go:13)       ADDQ    $72, SP;回收main本地栈空间
        0x0063 00099 (main.go:13)       RET

可以看到源码12行的go关键字,实际调用了runtimenewproc函数。

而在newproc函数内部,先在stack上分配了一段连续的栈空间(通常是2KB,栈帧最小值),也可以叫做栈帧(stackframe),将通过SP传入的参数值拷贝到这个空间中,且这个空间仅属于这个goroutine(也就是GPM模型里的g)。然后把goroutine加入到执行队列中,供调度器去调度执行。

以上整个过程作为一个函数,运行在system stack上,可简化视作直接调用函数。

关于函数——func systemstack(fn func()) systemstack is being called from the limited stack of an ordinary goroutine. In this case, systemstack switches to the per-OS-thread stack, calls fn, and switches back.

代码语言:txt
复制
systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})

如果goroutine执行过程中,预先分配的栈空间不足,那么会分配更大的一块栈空间,并将旧的栈内容完全拷贝到新的空间中去,栈里的内容不会被其他goroutine共享。

代码语言:txt
复制
// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g

我们可以在add函数中,增加以下几行代码,可以打印goroutine调用堆栈的信息。

代码语言:txt
复制
var (
		buf [256]byte
		n   = runtime.Stack(buf[:], false)
		stk = string(buf[:n])
	)
	println(stk)

输出内容内,可以看到包含了传入参数的值10和12,以及add函数指针的值,另外,也可以通过这种方法获取goroutine的ID。

代码语言:txt
复制
goroutine 5 [running]:
main.add(0xa, 0xc, 0x1068800, 0xc00001c0b8)
        main.go:11 +0x69

有栈协程的好处,由于栈帧可以直接完全保存运行期上下文(主要是寄存器值),因此可以在任何时刻暂停协程的运行,这就很方便地支持了抢占式的调度器。而无栈协程的上下文是一般通过类似结构体的方式保存在内存中,它依赖使用者显式地切换协程,否则协程不会主动让出执行权。

另外,有栈协程更方便将同步代码改造为异步代码,就像我们的例子一样,只需改动一行,加上go关键字就可以了。而无栈协程,同步改造为异步则更为复杂,甚至会导致牵一发动全身(async关键字扩散问题)。

Rust无栈协程

既然已经有了有栈协程,那么无栈协程是否还有优势呢。答案肯定的!

通常,无栈协程在内存空间和协程上下文切换的效率更高。值得说明的是,无栈协程并不是说不需要运行时的栈空间,而是和协程的创建者共用同一块运行时的栈空间。

如果一定要用一句话概括无栈协程,那就是:无栈协程可以看做是有状态的函数(generator),每次执行时会根据当前的状态和输入参数,得到(generate)输出,但不一定为最终结果

Async-await

在Rust中,async fn用来定义一种可以在执行中暂停的函数,通过await将控制权转移给runtime,等一段时间之后被重新唤醒执行。

调用async fn所产生的返回值被包在Future中。但与其他语言不同,直接调用async fn,异步函数不会立即被调度器调度执行,只有调用方通过future.await才能实际触发async fn的执行,并拿到结果。

对于Rust普通业务开发来说,一般来说只需要使用底层库提供的异步API,结合asyncawait关键字,就可以实现程序整体的异步化。

1.png
1.png

如果要搞懂异步API的实现,不断优化程序性能,那么理解Rust异步的实现机制,就是必不可少的了。

运行前准备

运行测试代码前,先添加如下依赖

代码语言:txt
复制
[dependencies]
futures = "0.3"
tokio = { version = "0.3", features = ["full"] }

以及导入依赖模块

代码语言:txt
复制
use futures::Future;
use std::pin::Pin;
use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
    Arc,
};
use tokio::runtime::Runtime;
use futures::task::AtomicWaker;
use std::task::Context;
use std::task::Poll;
use std::{thread, time::Duration};

我们首先定义一个结构体Test,用来模拟计算的过程。

代码语言:txt
复制
struct Test {
    waker: Arc<AtomicWaker>, // 用来通知Executor执行poll
    result: AtomicU64,       // 用来暂存每次运行的中间结果
    signal: Arc<AtomicBool>, // 用来模拟事件消息
}

Future

提到Rust协程,首先需要介绍的是Future Trait,它是理解协程整个执行过程的关键。对于Golang使用者来说,可以将Trait看做Golang里的interface,Trait包含可以被实现的方法。

Future Trait仅定义了一个方法,那就是poll,顾名思义,这个方法可以被调用很多次。Future Trait可以看做是计算过程的抽象。调用poll方法意味着:异步计算过程由于某种原因暂停,或满足执行条件继续执行,直至计算出最终的结果。

poll方法的签名如下,Self可以是用户自定义的结构体,用来保存每一次调用poll方法计算的结果。

Context目前仅仅是对Waker的封装,Waker是唤起再次执行任务的结构体,后面会详细介绍。

poll方法返回的Poll是一个枚举值,表示本次调用后,当前协程需要暂停等待(Pending),还是已经完成(Ready)。

代码语言:txt
复制
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>

我们的Test实现了Future Trait,代码如下:

代码语言:txt
复制
impl Future for Test {
    type Output = u64;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let result = self.result.load(SeqCst);
        self.waker.register(cx.waker());
        println!("poll called {}", result);
        self.result.store(result + 1, SeqCst);
        if self.signal.load(SeqCst) {
            println!("poll ready");
            Poll::Ready(result)
        } else {
            println!("poll not ready");
            Poll::Pending
        }
    }
}

既然每次执行poll,返回值是个枚举值,那么把协程的执行,看做状态机的状态转换过程,是不是也是自然而然了呢?答案是肯定的。

image.png
image.png

现在,我们对于Future有了明确的概念 ,那么一个协程是如何在暂停后被唤醒的?该Waker发挥它的作用了。

Waker

Future不同,Future是一个Trait,抽象了异步的计算过程,对不同的异步执行场景,它的实现不同。而Waker的功能就简单多了,对一些系统资源(fs, timer, channel, socket),它可以作为回调会被注册到系统的事件循环(event loop),每次事件产生后,Waker负责告诉调度器:我收到了一个事件通知,这个Future此时需要执行下(poll)。也可以使用代码驱动协程执行,我们的例子中就是这样实现的。

代码语言:txt
复制
fn main() {
    let a = AtomicU64::new(0);
    let b = Arc::new(AtomicBool::new(false));
    let waker = Arc::new(AtomicWaker::new());
    let waker_clone = waker.clone();
    let c = b.clone();
    thread::spawn(move || {
        let t = Test {
            waker: waker,
            result: a,
            signal: b,
        };
        let rt = Runtime::new().unwrap();
        rt.spawn(async {
            let result = t.await;
            println!("finally result is {}", result);
        });
        thread::sleep(Duration::from_secs(10));
        println!("thread runtime exited");
    });
    thread::sleep(Duration::from_secs(2));
    c.store(true, SeqCst);
    println!("notify to poll");
    waker_clone.wake();
    thread::sleep(Duration::from_secs(2));
}

首先,我们要在实现Future的结构体中创建一个AtomicWaker,它的内部其实是一个自旋锁,每次调用register其实都会更新内部Waker

代码语言:txt
复制
waker: Arc<AtomicWaker>

每次调用poll时,我们都要调用register将Context入参中的Waker取出,保存在waker字段中。

代码语言:txt
复制
self.waker.register(cx.waker());

这样,我们在main thread中,就可以通过waker字段保存的值,唤醒Executor再次执行任务。

代码语言:txt
复制
let waker_clone = waker.clone();
waker_clone.wake();

我们不难发现,wake方法执行的操作是和Executor实现有关的,而RawWaker是Rust标准库中就包括的结构体,且不是Trait,这是怎么做到的?答案是:通过虚拟函数指针表实现(Virtual function pointer table),简而言之就是结构体内保存的不是具体实现,而是函数指针,由不同的Executor进行初始化创建。

代码语言:txt
复制
// RawWakerVTable用来定义函数指针表的字段
clone: unsafe fn(*const ()) -> RawWaker
wake: unsafe fn(*const ())
wake_by_ref: unsafe fn(*const ())
drop: unsafe fn(*const ())

在tokio库中,初始化这个函数指针表,而当我们实际调用wake的时候,调用的其实是tokio库中的实现,

代码语言:txt
复制
RawWakerVTable::new(
    clone_arc_raw::<W>,
    wake_arc_raw::<W>,
    wake_by_ref_arc_raw::<W>,
    drop_arc_raw::<W>,
)
image.png
image.png

当事件产生后或满足执行条件,wake会导致Executor去尝试执行这个异步函数(poll)。Executor一般来说有如下几种实现思路:

  1. Waker修改一个全局的原子布尔值(AtomicBool)表示Executor当前是否可执行,这种方法的缺点是Executor同时只能运行最多一个异步函数,一般用于嵌入式平台上。
  2. Executor使用一个map保存全部待执行的异步函数,每个函数对应一个Task IDWaker将可执行的Task ID发送给ExecutorExecutor就可以执行对应的异步函数了。
  3. Executor使用一个或多个队列保存待执行的任务,Waker同时是指向某个任务的带引用计数功能的指针,当Waker发现任务可执行时,将自身放入Executor的执行队列中即可。

通常采用第三种方式实现。例如tokio库,实现是这样的:

代码语言:txt
复制
//harness.rs
pub(super) fn wake_by_ref(&self) {
    if self.header().state.transition_to_notified() {
        self.core().schedule(Notified(self.to_task()));
    }
}

可以看到,wake调用后,任务便进入了调度,Scheduler分配线程后,由Executor执行。

Executor

Rust的Future仅仅定义了异步计算过程,还需要一个调度执行的角色——它实际驱动着整个异步计算流程进行。这个角色就叫做Executor

Executor每时每刻可能要执行成千上万个异步任务 ,因此它需要队列来管理这些任务,使用一个/多个线程执行这些就绪的任务。

当调用Runtime传入Future时,tokio库会先将Future封装成一个Task,然后放入内部的全局队列。

代码语言:txt
复制
// 向Runtime提交一个新的Future
let rt = Runtime::new().unwrap();
rt.spawn(async {
    let result = t.await;
    println!("finally result is {}", result);
});
// Runtime全局队列字段
inject: queue::Inject<Arc<Worker>>

然后,查看当前有几个空闲线程,找出一个空闲线程执行

代码语言:txt
复制
// 查找空闲线程的过程
fn notify_parked(&self) {
    if let Some(index) = self.idle.worker_to_notify() {
        self.remotes[index].unpark.unpark();
    }
}

unpark调用的实际上是parking_lot这个库,这个库会唤醒等待的线程

代码语言:txt
复制
fn unpark_condvar(&self) {
    drop(self.mutex.lock());
    self.condvar.notify_one()
}

线程启动后,从本地的工作队列获取任务,开始执行,如果本地工作队列没有任务,那么可以从其他线程的执行队列中偷取任务。

代码语言:txt
复制
fn run( & self, mut core: Box < Core > ) - > RunResult {
    while !core.is_shutdown {
        // Increment the tick
        core.tick();

        // Run maintenance, if needed
        core = self.maintenance(core);

        // First, check work available to the current worker.
        if let Some(task) = core.next_task( & self.worker) {
            core = self.run_task(task, core) ? ;
            continue;
        }

        // There is no more **local** work to process, try to steal work
        // from other workers.
        if let Some(task) = core.steal_work( & self.worker) {
            core = self.run_task(task, core) ? ;
        } else {
            // Wait for work
            core = self.park(core);
        }
    }

    // Signal shutdown
    self.worker.shared.shutdown(core, self.worker.clone());
    Err(())
}

image.png
image.png

可以看到,Executor核心逻辑其实并不复杂,单纯从队列中获取Task然后执行,不过加入了一些从其他线程队列中偷取任务的逻辑。用户也可以实现自己的Executor。

小结

三年前开始接触Golang开发,很快就被Golang简洁、清晰的开发模式所吸引。曾经Java里并发编程的繁琐过程,此刻只需要一个go关键字就能代替。goroutine这种有栈协程的设计,最大程度模仿了OS Thread的执行过程,对开发人员非常友好。美中不足的是,runtime为goroutine的实现封装了大量实现细节,而这些细节对于不了解runtime的使用者来说,是很难修改和调试的。

Rust和Golang类似,也是一门非常年轻的语言,但它遵循的原则是zero-cost abstraction,目的是最终生成安全高效的程序。为了实现这个目的,Rust既有编译器的严格检查、无runtime、无GC的设计,又有对于标准库范围的严格限制。Rust并发编程的实现思路,也充分体现着上述的特点:

  1. 标准库直接支持Native Thread。
  2. 语言支持关键字Async-await和Future Trait,但Executor Reactor Scheduler交给第三方库实现。
  3. 第三库对协程的实现细节用户完全可见,因为无栈协程的结构简单,代码可读性要远好于golang runtime相关的代码。

Golang和Rust现在都在快速发展当中,它们虽然设计思想上大相径庭,但追求更易用、更安全、更高性能的初心,是不会变的。

参考资料

https://www.programmersought.com/article/8537662156/

http://www.mit.edu/afs.new/sipb/project/golang/doc/asm.html

https://golang.org/src/runtime/stubs.go

https://mthli.xyz/stackful-stackless/

https://blog.aloni.org/posts/a-stack-less-rust-coroutine-100-loc/

https://samsartor.dev/coroutines-1/

https://rust-lang.github.io/async-book/02_execution/01_chapter.html

https://boats.gitlab.io/blog/post/wakers-i/

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 浅谈Rust和Golang协程设计
    • 何为有栈协程——以goroutine为例
      • Rust无栈协程
        • Async-await
        • 运行前准备
        • Future
        • Waker
        • Executor
      • 小结
        • 参考资料
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档