Rust和Go语言类似,通过 Channel(标准库提供)来进行线程间消息传递。
介绍
什么是消息传递:线程通过彼此通过发送消息(数据)来进行通信
Go 语言:不要用共享内存来通信,要用通信来共享内存。参考:Go channel 信号量
Rust 通过 Channel 在线程间消息传递
Channel
- Channel 包括:
- 发送端:
send
方法
- 参数:要发送的数据
- 返回:
Result<T, E>
- 接收端:
recv
方法:阻止当前线程的执行,直到 channel 中有数据被传送来
- 收到数据,返回
Result<T, E>
- 发送端关闭,返回错误
try_recv
方法:不阻塞线程,通常使用循环来检查 try_recv
的结果
- 立即返会
Result<T, E>
- 收到数据,返回
Ok(data)
- 未收到数据,返回错误
- 调用发送端的方法,发送数据
- 接收端会检查和接收到达的数据
- 如果发送端或接收端任意一端断开,该 Channel 关闭
创建 Channel
- 使用
mpsc::channel
(mpsc: multiple producer, single consumer, 多个生产者,一个消费者
)函数创建
- 返回一个
tuple(元组)
:(发送端, 接收端)
发送一个值示例
use std::{thread, sync::mpsc};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let v = String::from("hello");
tx.send(v).unwrap(); // v 的所有权 move,后边不能在使用
});
let received = rx.recv().unwrap();
println!("received: {}", received);
}
连续发送多个值示例
use std::{thread, sync::mpsc, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("h"),
String::from("e"),
String::from("l"),
String::from("l"),
String::from("o"),
];
for v in vals {
tx.send(v).unwrap();
thread::sleep(Duration::from_millis(10));
}
});
for received in rx {
println!("received: {}", received);
}
}
多个发送者示例
use std::{thread, sync::mpsc, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("tx1: h"),
String::from("tx1: e"),
String::from("tx1: l"),
String::from("tx1: l"),
String::from("tx1: o"),
];
for v in vals {
tx1.send(v).unwrap();
thread::sleep(Duration::from_millis(10));
}
});
thread::spawn(move || {
let vals = vec![
String::from("h"),
String::from("e"),
String::from("l"),
String::from("l"),
String::from("o"),
];
for v in vals {
tx.send(v).unwrap();
thread::sleep(Duration::from_millis(10));
}
});
for received in rx {
println!("received: {}", received);
}
}
共享状态的并发
- Golang 语言(建议不要使用共享内存来通信)不同的做法,Rust 支持通过共享状态实现并发。
- Channel 类似于单所有权,一旦将值的所有权转移至 Channel,就无法使用它
- 共享内存并发类似于多所有权,多个线程可以同时访问同一块内存
互斥锁
Rust 中使用 Mutex(mutual exclusion,互斥锁)
来保证每次只允许一个线程访问数据
- 线程通过获取互斥锁的 lock 数据结构来跟踪谁对数据拥有独占访问权
- mutex 通过锁来保护它所持有的锁具
- mutex 使用规则
- 使用数据前,必须尝试获取锁(lock)
- 阻塞当前线程
- lock 可能失败,继续等待
- 返回
MutexGuard
(智能指针,实现了 Deref 和 Drop)
- 使用完 mutex 所保护的数据,必须对数据进行解锁,以便其它线程可以访问锁
Mutex<T>
- 通过
Mutex::new(保护的数据)</T>
创建示例:
use std::sync::{Mutex, MutexGuard};
fn main() {
let m: Mutex<i32> = Mutex::new(8);
{
let mut num: MutexGuard<i32> = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
- 多线程共享
Mutex<T>
:Mutex<T>
可以改变 Arc<T>
里的内容
use std::thread;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::Arc;
use std::thread::JoinHandle;
fn main() {
let c: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let mut handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..10 {
let c: Arc<Mutex<i32>> = Arc::clone(&c);
let handle: JoinHandle<()> = thread::spawn(move || {
let mut num: MutexGuard<i32> = c.lock().unwrap();
*num +=1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("c = {:?}", *c.lock().unwrap());
}
并发
Rust 语言的并发有个便签 Trait:
std::marker::Sync
:允许从多线程安全的访问
std::marker::Send
:允许线程间转移所有权,几乎所有的基础类型都实现该 Trait,但 Rc<T>
未实现,Arc
比 Rc<T>
多实现该 Trait
但是,手动实现 Send
和 Sync
是不安全的。