
Send 与 Sync 可能是 Rust 多线程以及异步代码种最常见到的约束。在前面一篇讨论多线程的文章中介绍过这两个约束的由来。但是,真正书写比较复杂的代码时,还是会经常遇到编译器的各种不配合。这里借用我的同事遇到的一个问题再次举例谈一谈 Send 与 Sync 的故事。
基本场景
C/C++中不存在 Send/Sync 的概念,数据对象可以任意在多线程中访问,只不过需要程序员保证线程安全,也就是所谓“加锁”。而在 Rust 中,由于所有权的设计,不能直接将一个对象分成两份或多份,每个线程都放一份。一般地,如果一份数据仅仅子线程使用,我们会将数据的值转移至线程中,这也是 Send 的基础含义。因此,Rust 代码经常会看到将数据 clone(),然后 move 到线程中:
let b = aa.clone();
thread::spawn(move || {
b...
})假如,数据需要在多线程共享,情况会复杂一些。我们一般不会在线程中直接使用外部环境变量引用。原因很简单,生命周期的问题。线程的闭包要求 ‘static,这会与被借用的外部环境变量的生命周期冲突,错误代码如下:
let bb = AA::new(8);
thread::spawn( || {
let cc = &bb; //closure may outlive the current function, but it borrows `bb`, which is owned by the current function
});包裹一个 Arc 可以解决这个问题,Arc 恰好就是用来管理生命周期的,改进后的代码如下:
let b = Arc::new(aa);
let b1 = b.clone();
thread::spawn(move || {
b1...
})Arc 提供了共享不可变引用的功能,也就是说,数据是只读的。如果我们需要访问多线程访问共享数据的可变引用,即读写数据,那么还需要在原始数据上先包裹 Mutex<T>,类似于 RefCell<T>,提供内部可变性,因此我们可以获取内部数据的 &mut,修改数据。当然,这需要通过 Mutex::lock() 来操作。
let b = Arc::new(Mutex::new(aa));
let b1 = b.clone();
thread::spawn(move || {
let b = b1.lock();
...
})为什么不能直接使用 RefCell 完成这个功能?这是因为 RefCell 不支持 Sync,没办法装入 Arc。注意 Arc 的约束:
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}若 Arc<T>是Send,条件是 T:Send+Sync。RefCell 不满足 Sync,因此 Arc> 不满足 Send,无法转移至线程中。错误代码如下:
let b = Arc::new(RefCell::new(aa));
let b1 = b.clone();
thread::spawn(move || {
^^^^^^^^^^^^^ `std::cell::RefCell<AA<T>>` cannot be shared between threads safely
let x = b1.borrow_mut();
})异步代码:跨越 await 问题
如上所述,一般地,我们会将数据的值转移入线程,这样只需要做正确的 Send 和Sync 标记即可,很直观,容易理解。典型的代码如下:
fn test1<T: Send + Sync + 'static>(t: T) {
let b = Arc::new(t);
let bb = b.clone();
thread::spawn( move|| {
let cc = &bb;
});
}根据上面的分析,不难推导出条件 T: Send + Sync + 'static 的来龙去脉:Closure: Send + 'static ⇒ Arc<T>: Send + ’static ⇒ T: Send + Sync + 'static。
然而,在异步协程代码中有一种常见情况,推导过程则显得比较隐蔽,值得说道说道。考察以下代码:
struct AA<T>(T);
impl<T> AA<T> {
async fn run_self(self) {}
async fn run(&self) {}
async fn run_mut(&mut self) {}
}
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run_self().await;
});
}test2 中,限定 T: Send + ‘static,合情合理。async fn 生成的 GenFuture 要求 Send + ‘static,因此被捕获置入 GenFuture 匿名结构中的 AA 也必须满足 Send + ‘static,进而要求 AA 泛型参数也满足Send + ‘static。
然而,类似的方式调用 AA::run() 方法,编译失败,编译器提示 GenFuture 不满足 Send。代码如下:
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
^^^^^^^^^^^^^^^^^^^^^^ future returned by `test2` is not `Send`
aa.run().await;
});
}原因在于,AA::run() 方法的签名是 &self,所以 run() 是通过 aa 的不可变借用 &AA 来调用。而 run() 又是一个异步方法,执行了 await,也就是所谓的 &aa 跨越了 await,故而要求 GenFuture 匿名结构除了生成 aa 之外,还需要生成 &aa,示意代码如下:
struct {
aa: AA
aa_ref: &AA
}正如之前探讨过,生成的 GenFuture 需要满足 Send,因此 AA 以及 &AA 都需要满足 Send。而 &AA 满足 Send,则意味着 AA 满足 Sync。这也就是各种 Rust 教程中都会提到的那句话的真正含义:
对于任意类型 T,如果 &T是 Send ,T 就是 Sync 的
之前出错的代码修改为如下形式,增加 Sync 标记,编译通过。
fn test2<T: Send + Sync + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run().await;
});
}另外,值得指出的是上述代码中调用 AA::run_mut(&mut self) 不需要 Sync 标记:
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run_mut().await;
});
}这是因为 &mut self 并不要求 T: Sync。参见以下标准库中关于 Sync 定义代码就明白了:
mod impls {
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Sync + ?Sized> Send for &T {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Send + ?Sized> Send for &mut T {}
}可以看到,&T: Send 要求 T: Sync,而 &mut T 则 T: Send 即可。
总 结
总而言之,Send 约束在根源上是由 thread::spawn() 或是 task::spawn() 引入的,因为两个方法的闭包参数必须满足 Send。此外,在需要共享数据时使用 Arc<T>会要求 T: Send + Sync。而共享可写数据,需要 Arc<Mutex<T>>,此时 T: Send 即可,不再要求 Sync。
异步代码中关于 Send/Sync 与同步多线程代码没有不同。只是因为 GenFuture 的特别之处使得跨越 await 的变量必须是 T: Send,此时需要注意通过 T 调用异步方法的签名,如果为 &self,则必须满足 T:Send + Sync。
最后,一点经验分享:关于 Send/Sync 的道理并不复杂,更多时候是因为代码中层次比较深,调用关系复杂,导致编译器的错误提示很难看懂,某些特定场合编译器可能还会给出完全错误的修正建议,这时候需要仔细斟酌,追根溯源,找到问题的本质,不能完全依靠编译器提示。
作
者
谢敬伟,江湖人称“刀哥“,20年IT老兵,数据通信网络专家,电信网络架构师,目前任 Netwarps 技术总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。