rust 2024年12月28日

Async Rust

异步Rust:释放无畏并发能力
书籍封面

Async Rust

作者:Maxwell Flitton 出版日期:2024-12-17 出版社:O'Reilly Media, Inc.
本书详细讲解了异步Rust编程,涵盖了异步编程基础概念(例如Future、Task、async/await),自定义异步运行时构建,网络编程集成,Actor模型,反应式编程以及测试策略等方面。 它通过具体的代码示例和设计模式,帮助读者理解和掌握异步Rust编程的技巧,并探讨了并发编程中常见问题的解决方法,例如死锁和回压。书中还介绍了如何构建高效、可扩展的异步系统。

好的,我将按照您的要求,对每个章节的内容进行总结,并尽可能地包含关键代码,使其内容详实且易于理解。

第一章:异步简介

本章主要介绍了异步编程的概念以及在Rust中的应用。

  • 什么是异步? 异步编程允许任务并发执行,特别适用于处理I/O密集型操作,如网络请求或文件处理。与传统的同步编程不同,异步编程可以更有效地利用系统资源,提高应用程序的性能,而无需额外的CPU核心。
  • 进程和线程: 本章还介绍了进程和线程的基本概念。虽然线程可以实现并发,但当任务可以等待时,异步编程可以释放CPU资源,避免阻塞。
  • 为何需要异步? 随着硬件性能提升的放缓,以及微服务等系统对I/O网络调用的需求增加,我们需要更有效地利用资源。异步编程提供了一种解决方案,可以在不增加线程的情况下提高程序速度。
  • 异步示例
    • 同步代码示例: 通过连续发送四个相同的网络请求,演示了同步代码的执行方式。 每个请求完成后才会开始下一个请求。
    let first = reqwest::get(url);
    let second = reqwest::get(url);
    let third = reqwest::get(url);
    let fourth = reqwest::get(url);
    
    let first = first.await?;
    let second = second.await?;
    let third = third.await?;
    let fourth = fourth.await?;
    • 异步代码示例: 使用 tokio::join! 宏同时运行多个任务,展示了异步编程的优势,使程序速度提升了4.7倍,而无需增加线程。
let (_, _, _, _) = tokio::join!(
    reqwest::get(url),
    reqwest::get(url),
    reqwest::get(url),
    reqwest::get(url),
);

第二章:基础异步Rust

本章深入探讨了Rust中异步编程的基础概念,包括Future、Waker以及如何创建自定义的Future。

  • Future trait: Future trait是Rust异步编程的核心,代表一个可能尚未完成的异步计算。Future通过poll方法进行轮询,以检查是否已完成。poll方法返回Poll::PendingPoll::Ready
    • 自定义Future示例: CounterFuture 结构体演示了如何实现 Future trait,每次轮询时增加计数,并在计数达到5时返回 Poll::Ready.
    impl Future for CounterFuture {
        type Output = u32;
        fn poll(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>
        ) -> Poll<Self::Output> {
            self.count += 1;
            println!("polling with result: {}", self.count);
            std::thread::sleep(Duration::from_secs(1));
            if self.count < 5 {
                cx.waker().wake_by_ref();
                Poll::Pending
            } else {
                Poll::Ready(self.count)
            }
        }
    }
  • Pin和Context
    • Pin: 为了防止Future在执行过程中被移动到内存中的其他位置,使用了Pin,保证了自引用结构的安全性。
    • Context: Context 结构体包含了 Waker,用于通知执行器在Future准备好继续执行时唤醒任务。
  • Waker: Waker 用于在Future等待某些外部事件(如I/O操作完成)时,唤醒Future。
    • 远程唤醒示例: 通过通道模拟外部调用,展示了如何从外部唤醒Future,以及使用Waker
  • 共享数据: 介绍了如何在异步任务之间共享数据,例如使用 Arc<Mutex<>> 来保护共享的可变数据,并使用 Condvar 进行条件通知。
    • 共享数据竞争示例: 展示了使用 Mutex 保护共享数据时可能出现的竞争条件,以及如何使用更高级的抽象(如 async 函数)来简化实现。
  • AsyncWriteFuture: 一个实现了Future trait的结构体,展示了如何在异步环境中进行写入操作,其中使用 try_lock 来尝试获取锁,并在无法立即获取锁时返回 Poll::Pending

第三章:构建我们自己的异步队列

本章介绍了如何构建自定义的异步运行时,包括任务队列和线程池,以及如何使用通道在线程之间传递任务。

  • 异步运行时: 运行时负责执行异步任务,包括任务调度、事件循环和资源管理。
    • 运行时比喻: 将运行时比作干洗店,Future是衣服上的标签,描述清洗方法,而Task是带有号码的取衣凭证。
  • 任务队列: 使用 flume::unbounded 创建无界通道作为任务队列,并通过多个线程消费队列中的任务。
  • 多队列和优先级: 使用枚举 FutureType 表示任务优先级,并使用不同的队列处理不同优先级的任务。
    • 自定义任务调度: 可以通过为高优先级和低优先级任务设置不同的队列来实现任务调度。
  • spawn_task! 宏: 创建一个宏来简化任务的生成,并允许指定任务的优先级。
  • 自定义 join 宏: 创建一个宏来等待多个任务完成并返回结果,避免重复阻塞主函数。
  • 配置运行时: 介绍了如何配置自定义运行时,例如通过设置环境变量来定义高优先级和低优先级队列的线程数。
  • Tokio运行时: 展示了如何使用Tokio库配置运行时,并与自定义运行时进行比较,同时也指出Tokio也使用了宏来设置运行时。

第四章:将网络集成到我们自己的异步运行时

本章介绍了如何将网络功能集成到自定义的异步运行时中,包括使用 hyper 库进行HTTP请求,并实现 AsyncReadAsyncWrite trait。

  • HTTP客户端: 使用 hyper 库创建HTTP客户端,发送HTTP和HTTPS请求。
  • Executor和Connector: 介绍了 ExecutorConnector 的概念。其中 Connector 用于处理连接, Executor 用于执行异步任务。
  • 自定义Stream: 创建一个名为 CustomStream 的枚举,支持HTTP和HTTPS连接,使用 Async<TcpStream>TlsStream<Async<TcpStream>>
  • Service Trait: 为自定义的连接器结构体实现hyper::service::Service trait。
  • AsyncRead Trait: 介绍了 AsyncRead trait,并通过实现 poll_read 方法将 CustomStream 集成到异步任务系统中。
  • AsyncWrite Trait: 介绍了 AsyncWrite trait,并通过实现 poll_writepoll_flushpoll_shutdown 方法来异步写入和关闭连接。
  • 连接和运行客户端: 展示如何连接到服务器并发送请求,测试 CustomStream 的实现。
  • mio crate: 使用 mio crate 创建TCP服务器,并使用token来区分不同的socket。
  • Future 和 Mio Poll:mio 的socket轮询功能集成到Future中,允许异步地等待socket事件。
  • 客户端发送数据: 创建客户端代码以发送数据到服务器,并使用 ErrorKind::WouldBlock 处理阻塞情况。

第五章:使用协程进行异步编程

本章介绍了如何使用 Rust 的协程(coroutines)进行异步编程,并使用 rand 创建随机数。

  • 协程基础: 协程是一种轻量级的并发机制,可以通过 Coroutine trait 实现。 协程可以暂停和恢复执行,这使得它适用于异步编程。
  • 协程和 Future 接口: 将协程的输出映射到等价的异步输出,实现了协程和 Future 的结合。
 impl Future for MutexCoRoutine {
     type Output = ();
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
         -> Poll<Self::Output> {
         match Pin::new(&mut self).resume(()) {
             CoroutineState::Complete(_) => Poll::Ready(()),
             CoroutineState::Yielded(_) => {
                 cx.waker().wake_by_ref();
                 Poll::Pending
             },
         }
     }
 }
  • 基本测试: 构建了一个简单的测试,演示了如何使用协程进行文件写入。
  • 反应式编程:提到了反应式编程,这会在第六章中详细介绍。

第六章:反应式编程

本章介绍了反应式编程的概念,以及如何使用异步编程来构建一个基本的反应式系统。

  • 反应式编程: 是一种编程范式,其中代码对数据值或事件的变化作出反应。反应式编程允许构建实时动态响应变化的系统。
  • 观察者模式: 通过实现观察者模式来构建一个简单的加热系统。其中,温度和期望温度是主题,加热器和显示器是观察者。
  • 主题定义: 使用 LazyLock 和原子变量 (AtomicI16AtomicBool) 定义温度、期望温度和加热器开关状态等主题。
  • 显示器 Future: 实现了一个 DisplayFuture,它通过不断轮询温度主题来更新显示,并在温度变化时触发加热器开关。
    • DisplayFuture 通过比较当前温度和快照的温度来决定是否更新显示,以及是否开启或关闭加热器。
  • 加热器 Future: 实现了一个 HeaterFuture,它根据 HEAT_ON 标志增加温度,并在加热器打开时持续加热。
  • 热损耗 Future: 实现了一个 HeatLossFuture,它持续减少温度,模拟热损耗。
  • 使用 compare_exchange 使用 compare_exchange 函数来确保原子操作的原子性,并避免数据竞争。
  • 事件循环和回调: 介绍了事件循环和回调的概念,通过传递事件到线程池,在方便的时候将数据发送回事件的源头。
  • 输入处理: 使用原子变量、Mutex和队列来处理输入事件并更新显示。
  • 事件总线: 创建一个事件总线结构体 EventBus,可以发布事件到多个订阅者,并使用 AsyncMutexHashMap 来管理订阅者。 * EventHandleEventBus 创建了一个句柄 EventHandle,使订阅者可以轮询事件。 * send 函数向所有订阅者发送事件。
  • 背压: 提到了事件的创建速率可能大于处理速率,导致背压,并指出一些解决背压的方法。
  • 垃圾回收器: 创建一个 garbage_collector 函数来清理不再使用的句柄,以防止资源泄漏。

第七章:自定义Tokio

本章深入探讨了如何自定义 Tokio 运行时,并控制其配置和行为。

  • Tokio运行时配置: 使用 tokio::runtime::Builder 构建和配置 Tokio 运行时,包括设置线程数、线程名称、时间驱动等。
  • 使用 LazyLock 延迟初始化: 通过 LazyLock 实现运行时的延迟初始化,确保运行时只创建一次。
  • spawn_task 函数: 定义 spawn_task 函数来将任务生成到运行时,与第三章自定义的 spawn_task 函数类似,但返回的是 tokio::task::JoinHandle
  • 多运行时: 在一个程序中创建多个不同的 Tokio 运行时,并演示了如何将不同的任务发送到不同的运行时。
    • 高优先级和低优先级运行时:通过创建高优先级和低优先级运行时来模拟具有不同优先级的任务队列。
  • 本地池: 介绍了本地池(local pools)的概念,并展示了如何使用 spawn_pinned 在本地池中处理任务。
    • 使用本地池可以避免使用原子操作等手段来保护共享数据,因为在同一个线程内访问的数据始终是安全的。
  • 优雅关闭: 介绍了如何优雅地关闭 Tokio 运行时,包括使用 tokio::signal::ctrl_c 监听 Ctrl-C 事件,并使用 std::process::exit 关闭程序。
  • 总结: 回顾了本章的主要内容,强调了对 Tokio 运行时的控制,包括线程数、阻塞线程数和任务轮询。

第八章:Actor 模型

本章介绍了 Actor 模型,以及如何使用 Rust 的异步特性来实现一个简单的 Actor 系统。

  • Actor 模型: 是一种并发模型,其中 Actor 是独立的执行单元,通过消息传递进行通信。每个 Actor 都有自己的状态和行为,并且是隔离的。
  • 基本 Actor: 演示了如何使用通道 (channel) 来创建基本的 Actor,并通过通道传递消息。
  • 路由器 Actor: 创建了一个路由器 Actor,它接收消息并将其路由到相应的 Actor,以实现系统模块化。
  • 全局 Sender: 使用 OnceLock 创建了一个全局的 Sender 以便在程序的任何地方发送消息给 Actor。
  • 键值存储 Actor: 创建了一个键值存储 Actor,可以设置、获取和删除键值对。
  • Writer Actor: 添加了一个 Writer Actor,负责将键值对操作写入文件,以实现数据备份。
  • Actor 监督: 介绍了如何通过心跳机制来监控 Actor,并在 Actor 失败时自动重启它们。
  • 消息枚举: 定义了 RoutingMessage 枚举来区分不同类型的消息,并将消息路由到不同的 Actor。
  • 重置 Actor: 演示了如何重置 Actor,并确保Actor系统在重置后依然可以正常工作。
  • 总结: 强调了本章中构建的 Actor 系统,包括键值存储、数据备份和心跳监控,以及路由器模式的简化作用。

第九章:设计模式

本章介绍了一些常用的异步设计模式,以及如何在 Rust 中实现它们。

  • 异步模块: 展示了如何创建具有最小占用空间的异步模块,并管理异步任务的执行。
  • 瀑布模式: 又称责任链模式,将异步任务串联在一起,前一个任务的输出作为后一个任务的输入。
    • 利用 Rust 的错误处理机制简化了链式调用。
  • 装饰器模式: 通过包装结构体来扩展现有功能,并且不需要修改原始结构体的代码,可以通过实现相同的 trait 来实现。
    • 可以在不更改原有代码的前提下,动态地添加或修改现有类的功能。
  • 状态机模式: 使用枚举和模式匹配来管理状态转换,通过不同的事件来改变状态,并将异步代码集成到状态机中。
    • 可以通过状态机来实现不同状态下的异步任务调度,使代码更加清晰和可维护。
  • 重试模式: 使用循环和 tokio::time::sleep 实现任务的自动重试,并设置最大重试次数和退避策略。
  • 断路器模式: 当错误超过阈值时,阻止新的任务执行,避免系统崩溃。
    • 利用原子变量来记录错误次数,并控制断路器的状态。
  • 总结: 强调了在应用设计模式时需要注意适度,避免过度工程。

第十章:使用标准库构建异步服务器

本章介绍了如何仅使用 Rust 标准库构建一个异步 TCP 服务器,而不依赖任何外部库。

  • 数据层逻辑: 构建了一个用于序列化和反序列化数据的模块,并将 Data 结构体公开。
  • 原始 Waker: 创建自定义的 RawWaker,以更精细地控制任务的执行过程。
    • 定义 RawWakerTable 中的函数,如克隆、唤醒和删除 RawWaker
  • 执行器: 构建了一个简单的执行器(Executor),使用 VecDeque 作为任务队列,并通过轮询执行任务。
    • 执行器负责轮询任务并执行Future。
  • 自定义 Future: 创建了一个简单的 CountingFuture,展示了如何定义自定义的Future。
  • 使用标准库实现异步运行时: 使用标准库中的 FutureContext 来实现一个简单的异步运行时,并验证其正常工作。
  • TCP发送器: 构建了一个 TcpSender 结构体,用于异步发送数据到 TCP 连接。
    • TcpSender 实现了 Future trait,使用 try_lock 获取锁,并在无法立即写入时返回 Pending
  • TCP接收器: 构建了一个 TcpReceiver 结构体,用于异步接收来自 TCP 连接的数据。
    • TcpReceiver 实现了 Future trait, 使用 try_lock 获取锁,并在没有数据可读取或阻塞时返回 Pending
  • 休眠 Future: 构建了一个 Sleep 结构体,用于模拟异步休眠。
  • 多线程处理: 使用多线程处理传入的 TCP 请求,并使用 AtomicBool 控制线程的挂起和唤醒。
  • 宏的使用: 使用宏来简化线程的创建和任务分发过程。
  • 请求处理: 处理传入的 TCP 请求,并将请求分发到不同的线程,并在每个线程中执行异步任务。

第十一章:测试

本章介绍了如何测试异步代码,包括同步测试、模拟异步代码、测试死锁、测试竞争条件以及测试网络交互。

  • 测试概述: 介绍了异步测试的概念,包括隔离测试和模拟依赖项。
  • 模拟异步代码: 使用 mockall crate 模拟异步代码,并验证函数的调用和参数。
    • 使用 mock! 宏创建模拟对象,并指定模拟对象的行为。
  • 通道容量测试: 使用 tokio::sync::mpsc::channel 创建通道,并测试通道的容量限制。
    • 使用 timeout 来测试通道阻塞的情况,以及防止死锁的发生。
  • 网络交互测试: 使用 mockito crate 模拟网络服务器,并测试客户端的网络交互。
    • 使用 mockito crate 创建模拟服务器,并指定服务器的返回状态码和响应体。
  • 细粒度的 Future 测试: 使用 tokio_test crate 更细致地测试 Future,包括轮询 Future 并断言其状态。
    • 使用 assert_pending! 来断言 future 处于 pending 状态,从而测试 Future 的执行细节。