跳至内容

并发与异步编程

这篇讲 Rust 的并发与异步编程——从 std::thread 到 Tokio 的 async/await。Rust 的类型系统(Send + Sync trait)让「无畏并发」不只停留在口号。

Send 与 Sync——并发安全的基石

Rust 用两个 trait 在编译期保证线程安全:

Trait含义
Send类型的所有权可以在线程间传递
Sync类型的不可变引用可以在线程间安全共享

大多数 Rust 类型自动实现了这两个 trait。但如果用了 Rc<T>(非原子引用计数)、RefCell<T>*const T(裸指针),编译器会拒绝跨线程共享——这杜绝了数据竞争的可能。

use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
    // Arc = Atomic Rc(线程安全的引用计数)
    // Mutex = 互斥锁(保证同时只有一个线程访问)
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());  // 10
}

多线程

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..=5 {
            println!("子线程: {i}");
            thread::sleep(Duration::from_millis(10));
        }
    });

    for i in 1..=3 {
        println!("主线程: {i}");
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().unwrap();  // 等待子线程结束
    println!("所有线程结束");
}

通道(Channel)

Rust 使用 MPSC(多生产者单消费者)Channel——类似 Go 的 channel:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 多个生产者
    let tx1 = tx.clone();
    thread::spawn(move || {
        tx1.send("来自线程 1 的消息").unwrap();
    });

    thread::spawn(move || {
        tx.send("来自线程 2 的消息").unwrap();
    });

    // 消费者
    for received in rx {
        println!("收到: {received}");
    }
}

异步编程(Tokio)

Tokio 是 Rust 的异步运行时,类似于 Node.js 的事件循环或 Go 的 goroutine 调度器:

[dependencies]
tokio = { version = "1", features = ["full"] }
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 并发执行两个异步任务
    let handle1 = tokio::spawn(async {
        for i in 1..=5 {
            println!("任务 1: {i}");
            sleep(Duration::from_millis(10)).await;
        }
    });

    let handle2 = tokio::spawn(async {
        for i in 1..=5 {
            println!("任务 2: {i}");
            sleep(Duration::from_millis(10)).await;
        }
    });

    // 等待两个任务都完成
    let _ = tokio::join!(handle1, handle2);
    println!("所有任务完成");
}
async 函数在被 .await 调用之前不会执行任何代码——它们是惰性的。这与 Go 的 goroutine(创建即执行)不同。Rust 的 tokio::spawn 才真正启动一个异步任务,类似于 Go 的 go func()

异步 I/O

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("监听 8080 端口...");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("新连接来自: {addr}");

        tokio::spawn(async move {
            let mut buf = [0; 1024];
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return,  // 连接关闭
                    Ok(n) => {
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;  // 写入失败
                        }
                    }
                    Err(_) => return,
                }
            }
        });
    }
}

线程 vs 异步

场景推荐方案
I/O 密集型(网络、文件)async/await(Tokio)
CPU 密集型(计算、加密)thread::spawnrayon
混合场景Tokio + tokio::task::spawn_blocking

一句话小结

Send/Sync 在编译期保证线程安全,Arc<Mutex<T>> 跨线程共享数据,mpsc::channel 线程间通信。异步用 Tokio,async fn + .await 写异步代码,tokio::spawn 启动并发任务。更多 Rust 高级特性详见 智能指针

练习

  1. 使用多线程计算 1 到 1,000,000 的和:分成 4 个线程,每个线程计算 250,000 个数字的和,最后汇总。
  2. 用 Tokio 实现一个异步定时器:每隔 1 秒打印一次 “tick”,总共打印 5 次。
参考答案
// 1. 多线程求和
use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
    let total = Arc::new(Mutex::new(0u64));
    let mut handles = vec![];

    for chunk in 0..4 {
        let total = Arc::clone(&total);
        let handle = thread::spawn(move || {
            let start = chunk * 250_000 + 1;
            let end = start + 249_999;
            let sum: u64 = (start..=end).sum();
            let mut total = total.lock().unwrap();
            *total += sum;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("总和: {}", *total.lock().unwrap());
}

// 2. 异步定时器
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    for i in 1..=5 {
        println!("tick {i}");
        sleep(Duration::from_secs(1)).await;
    }
}
最后更新于