并发与异步编程
这篇讲 Rust 的并发与异步编程——从 std::thread 到 Tokio 的 async/await。Rust 的类型系统(Send + Sync trait)让「无畏并发」不只停留在口号。
Send 与 Sync——并发安全的基石
Rust 用两个 trait 在编译期保证线程安全:
| Trait | 含义 |
|---|---|
Send | 类型的所有权可以在线程间传递 |
Sync | 类型的不可变引用可以在线程间安全共享 |
大多数 Rust 类型自动实现了这两个 trait。但如果用了 Rc<T>(非原子引用计数)、RefCell<T>、*const T(裸指针),编译器会拒绝跨线程共享——这杜绝了数据竞争的可能。
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
// Arc = Atomic Rc(线程安全的引用计数)
// Mutex = 互斥锁(保证同时只有一个线程访问)
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 10
}多线程
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子线程: {i}");
thread::sleep(Duration::from_millis(10));
}
});
for i in 1..=3 {
println!("主线程: {i}");
thread::sleep(Duration::from_millis(10));
}
handle.join().unwrap(); // 等待子线程结束
println!("所有线程结束");
}通道(Channel)
Rust 使用 MPSC(多生产者单消费者)Channel——类似 Go 的 channel:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 多个生产者
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send("来自线程 1 的消息").unwrap();
});
thread::spawn(move || {
tx.send("来自线程 2 的消息").unwrap();
});
// 消费者
for received in rx {
println!("收到: {received}");
}
}异步编程(Tokio)
Tokio 是 Rust 的异步运行时,类似于 Node.js 的事件循环或 Go 的 goroutine 调度器:
[dependencies]
tokio = { version = "1", features = ["full"] }use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 并发执行两个异步任务
let handle1 = tokio::spawn(async {
for i in 1..=5 {
println!("任务 1: {i}");
sleep(Duration::from_millis(10)).await;
}
});
let handle2 = tokio::spawn(async {
for i in 1..=5 {
println!("任务 2: {i}");
sleep(Duration::from_millis(10)).await;
}
});
// 等待两个任务都完成
let _ = tokio::join!(handle1, handle2);
println!("所有任务完成");
}async 函数在被 .await 调用之前不会执行任何代码——它们是惰性的。这与 Go 的 goroutine(创建即执行)不同。Rust 的 tokio::spawn 才真正启动一个异步任务,类似于 Go 的 go func()。异步 I/O
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("监听 8080 端口...");
loop {
let (mut socket, addr) = listener.accept().await?;
println!("新连接来自: {addr}");
tokio::spawn(async move {
let mut buf = [0; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => return, // 连接关闭
Ok(n) => {
if socket.write_all(&buf[..n]).await.is_err() {
return; // 写入失败
}
}
Err(_) => return,
}
}
});
}
}线程 vs 异步
| 场景 | 推荐方案 |
|---|---|
| I/O 密集型(网络、文件) | async/await(Tokio) |
| CPU 密集型(计算、加密) | thread::spawn 或 rayon |
| 混合场景 | Tokio + tokio::task::spawn_blocking |
一句话小结
Send/Sync 在编译期保证线程安全,Arc<Mutex<T>> 跨线程共享数据,mpsc::channel 线程间通信。异步用 Tokio,async fn + .await 写异步代码,tokio::spawn 启动并发任务。更多 Rust 高级特性详见 智能指针。
练习
- 使用多线程计算 1 到 1,000,000 的和:分成 4 个线程,每个线程计算 250,000 个数字的和,最后汇总。
- 用 Tokio 实现一个异步定时器:每隔 1 秒打印一次 “tick”,总共打印 5 次。
参考答案
// 1. 多线程求和
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
let total = Arc::new(Mutex::new(0u64));
let mut handles = vec![];
for chunk in 0..4 {
let total = Arc::clone(&total);
let handle = thread::spawn(move || {
let start = chunk * 250_000 + 1;
let end = start + 249_999;
let sum: u64 = (start..=end).sum();
let mut total = total.lock().unwrap();
*total += sum;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("总和: {}", *total.lock().unwrap());
}
// 2. 异步定时器
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
for i in 1..=5 {
println!("tick {i}");
sleep(Duration::from_secs(1)).await;
}
}最后更新于