为cpu绑定任务使用Rustlang的Async东京运行时

这篇文章最初是发表在《新堆栈》杂志上将于2022年1月14日发布,并经授权在此转载。

尽管这个词异步以及它与异步网络I/O的关联,这篇博客文章认为东京运行时在Rust的核心异步生态系统也是cpu密集型工作(如分析引擎中的工作)的好选择。

东京是什么?

Rust内置了对异步(异步)编程模型,类似于JavaScript等语言。

为了充分利用多核和异步I/O,必须使用运行时,虽然Rust社区有几个可用的替代方案,但Tokio是事实上的标准。Tokio.rs的异步运行时Rust编程语言.它提供了编写网络应用程序所需的构建模块。”

虽然这个描述强调Tokio用于网络通信,但运行时也可以用于其他目的,我们将在下面探讨。

为什么在CPU任务中使用东京?

事实证明,现代分析引擎总是需要处理来自网络的客户端请求,以及使用网络与对象存储系统(如AWS S3、GCP云存储和Azure Blob存储)通信。

因此,在Rust中实现的任何这样的系统最终都将使用Tokio作为其网络和至少部分存储I/O(是的,我知道,最初Tokio异步文件IO不是真正的异步,但它是异步的未来很快)。

分析系统还进行CPU密集型计算,我将其定义为以一种消耗大量CPU的方式处理数据,用于存储重组、预计算各种索引或直接回答客户端查询。这些计算通常被分解成许多独立的块,我称之为“任务”,然后并行运行,以利用现代cpu中可用的许多内核。

确定在什么时候运行哪些任务通常是由所谓的“任务调度器”完成的,它将任务映射到可用的硬件核心/操作系统线程。

在各种类型的任务调度器、工作池、线程池等方面有多年的学术和工业工作。

根据我使用(和实现,我有点不好意思承认)几个自定义任务调度器的经验,它们最初很容易工作(99.9%的时间),但随后需要大量(大量!)时间来处理一些特殊情况(快速关闭、任务取消、耗尽等)。由于它们使用低级线程原语,并且存在大量竞态条件,因此它们也非常难以测试。我不建议你这么做。

因此,当在Rust生态系统中寻找任务调度器时,就像我们为raybet雷竞技ios下载 IOx和DataFusion,你自然会在Tokio结束,看起来很不错:

  1. 您已经有了Tokio(没有新的依赖项)。
  2. 东京实施了复杂的工作窃取调度
  3. Tokio有效地内置了对延续的语言支持(异步/等待),以及许多相对成熟的流、异步锁定、通道、取消等库。
  4. 东京以经过良好测试的并在Rust生态系统中大量使用。
  5. Tokio通常会在相同的执行器线程上运行任务和未来,这对于缓存局部性非常有利。
  6. 东京是证据确凿的,积极维护,不断变好。(东京主机发布在我写这篇博客的时候)。

因此,使用它是一件很简单的事情东京作为cpu密集型任务的任务调度器,对吗?WROOOOOOOONG !

普遍反对使用东京

事实证明,使用Tokio是一个相当热门的话题,我想说的是,仍然不是每个人都100%相信,因此这篇博文。我们担心在DataFusion和InfluxDB IOx的早期,我们讨论了很多这个问题。以下是一些常见的反对意见:

东京的医生建议不要:

旧版本的东京文档(例如,1.10.0,包括(在我脑海中)著名的忠告:

“如果你的代码是cpu限制的,你希望限制用于运行它的线程数量,你应该在另一个线程池上运行它,比如Rayon。”

我相信这种措辞在我们的团队以及更广泛的Rust社区中造成了严重的混乱。很多人把它理解为东京运行时不应该用于cpu受限的任务。关键其实是一样的运行时实例(相同的线程池)不应该同时用于I/O和CPU,而我们已经做到了随后澄清文档的意图(关于公关).

顺便说一句,东京的医生建议使用人造丝用于cpu受限的任务。对于许多应用程序来说,人造丝是一个很好的选择,但它不支持异步,所以如果你的代码必须做任何I/O,你将不得不跨越痛苦的同步/异步边界。我还发现映射一个基于拉的执行模型很有挑战性,在这个模型中,任务必须等待所有输入都准备好后才能运行到Rayon。

尾部延迟会跟踪你

明智的人会说:“使用Tokio进行cpu限制的工作将会增加您的请求尾延迟,这是不可接受的。”但是等等!你可能会说,“尾部延迟?我正在写一个数据库,这听起来像一个学术问题的web服务器下的高负载…”

其实并非如此:考虑一下活动检查,这是如今使用容器编排系统(Kubernetes)部署的系统的必备功能。检查您的进程是否运行良好通常是对以下内容的HTTP请求/健康.如果这个请求位于某个任务队列中,因为Tokio正在充分有效地使用您的CPU来处理大量的数据处理任务,Kubernetes就不会得到所需的“一切都很好”响应,从而终止您的进程。

这种推理的结果是经典的结论,因为尾部延迟很关键,所以不能将Tokio用于cpu较多的任务。

然而,正如东京文档所建议的那样,在CPU完全饱和的情况下,避免被Kubernetes和朋友们核爆的真正重要的是使用一个单独的线程池,一个用于“延迟很重要”的任务,比如响应/健康,另一个用于cpu负荷大的任务。这些线程池的最佳线程数因需要而异,这是另一篇单独文章的好话题。

也许可以考虑一下东京运行时作为复杂的线程池,使用的思路不同运行时实例似乎更容易接受,下面我们将演示如何使用专用执行程序来实现这一点。

高昂的每次请求开销

但“等等!”我听到你说(或者每个人都听到你说)黑客新闻),东京的每项任务开销很高。我一点也不惊讶人们可以创建线程池来处理比Tokio更快的小任务。

然而,我还没有看到这样一个系统,我可以信任我的生产工作负载,也没有一个有强大的生态系统支持。

值得庆幸的是,对于许多工作负载,每个任务的开销可以使用“向量化处理”来分摊。这是一种奇特的说法,表示每个任务一次处理数千行,而不是一行。当然,你不能发疯;你确实需要把你的工作分成合理大小的块,你不能摊销所有的工作负载。然而,对于我的应用程序所关心的所有实例,Tokio任务开销都消失在噪音中。

如何将Tokio用于cpu约束任务?

因此,让我们假设我已经说服了您,对于cpu较多的工作使用Tokio是可以的。你是怎么做到的?

首先,重要的是,你的代码需要遵循这句格言:“异步代码不应该花很长时间而不达到一个.await,正如《爱丽丝·赖尔》一书中解释的那样帖子.这是为了让调度程序有机会调度其他事情,窃取工作等等。

当然,“很长时间”取决于你的应用程序;Ryhl建议在优化响应尾延迟时使用10到100微秒。我认为在优化CPU时,10到100毫秒的任务也很好。然而,自从我估计每个任务东京开销在~10纳秒范围内,几乎不可能用10毫秒的任务来测量东京运行时开销。

其次,在单独的平台上运行任务运行时实例。你是怎么做到的?很高兴你发问。

专用的执行者

下面是如何在InfluxDB IOx中单独的Tokio运行时上运行任务的简化版本。(完整版本可在我们的回购,并有额外的逻辑清洁关闭和连接。)

pub struct DedicatedExecutor {state: Arc>,} ///在单独的tokio Executor struct state{///请求通道上运行期货(以及由/// them执行的' tokio::task:: '的任何'任务')——专用执行器从这里接收请求///并运行它们。requests: Option>, ///线程有一个不同的东京运行时///安装并在那里生成任务Thread: Option>,} impl DedicatedExecutor{///创建一个新的' DedicatedExecutor '与一个专用的东京///执行器是独立于通过/// ' [Tokio::main] '创建的线程池。pub fn new(thread_name: &str, num_threads: usize) -> Self {let thread_name = thread_name.to_string();让(tx, rx) = std::同步:mpsc::渠道::<任务> ();let thread = std::thread::spawn(move ||{//创建一个新的Runtime来运行任务let Runtime = Tokio:: Runtime::Builder::new_multi_thread() .enable_all() .thread_name(&thread_name) .worker_threads(num_threads) //降低工作线程的操作系统优先级,以优先考虑主Runtime .on_thread_start(move || set_current_thread_priority_low()) .build() .expect("Creating Tokio Runtime ");//从通道中提取任务请求并将它们发送到执行器运行时。block_on(async move {while let Ok(task) = rx.recv() {Tokio::task::spawn(async move {task.run().await;});} let state =状态{请求:一些(tx),线程:一些(线程),};Self {state: Arc::new(互斥::new(state)),}}

这段代码创建了一个newstd::线程,它创建了一个单独的多线程Tokio运行时来运行任务,然后从通道而且产卵让他们在新的运行时

注意:新线程是关键。如果你尝试创建一个新的运行时在主线程或东京创建的线程之一,你会得到一个错误,因为已经有一个运行时安装。

下面是发送任务到这一秒的相应代码运行时

impl DedicatedExecutor{///在' DedicatedExecutor '上运行指定的Future(以及它生成的任何任务)。pub fn spawn(&self, task: T) -> Job where T: Future + Send + 'static, T::Output: Send + 'static, {let (tx, rx) = tokio::sync::oneshot::channel();let fut = Box::pin(异步移动{let task_output = task.await;tx.send (task_output)对吧()});Let mut state = self.state.lock();let task = task {fut,};if let Some(requests) = &mut state。请求{//如果有人已经开始关闭请求将失败。send(task).ok();}其他{警告!("tried to schedule task on an executor that was shutdown"); } Job { rx, cancel } } }

工作

类的包装器未来被称为工作它处理将结果从专用执行器传输回主执行器,它看起来像:

#[pin_project(PinnedDrop)] pub struct Job {#[pin] rx: Receiver,} impl Future for Job {type Output = Result;fn poll(self: Pin<&mut self >, cx: &mut std::task::Context< ` _>,) -> std::task:: poll < self::Output> {let this = self.project();This.rx.poll (cx)}}

就是这样!你可以在这里找到所有的代码GitHub要点

下一个步骤

雷竞技能赚到钱么InfluxData是开源的忠实信徒和支持者。InfluxDB IOx充分利用了Apache的Rust实现,并对其做出了重大贡献箭头柱状内存格式和Apache ArrowDataFusion查询引擎,它使用Tokio执行查询计划。

我们喜欢社区的贡献,包括文档和代码。DataFusion和Arrow都有充满活力的社区.随时来打个招呼。