Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails以外の開発一般

作って学ぶRustの非同期ランタイム

こんにちは。yoshiです。
TechRachoアドベントカレンダー2024ということで、クリスマスとはなんの関係もない話題をやっていこうと思います。
今回扱うのはRustの非同期ランタイムです。Rustの非同期ランタイムというと、 tokio とか async-std とか他にもありますが、今回はそれらと同じようなものを手動で作っていくことで理解して行こうという趣旨です。

非同期とは何か

実際に作り始める前に、非同期とは何か軽く説明しておくことにします。そんなのは分かっているよ、という方は次の節まで飛ばして構いません。

非同期の話をするには、まず同期の話をしておいた方が良いでしょう。コード上ではそれぞれ asyncsync という文言で表されることが多いです。それぞれ asynchronoussynchronous の略ですね。ところで asynchronous は「エイシンクロナス」みたいに発音する単語ですが、 async という文字列はどうしても「アシンク」と読んでしまう人が多いのではないでしょうか。

「同期」には色々な意味がありますが(例えばNTPで時刻同期する、とかだと全く別の話になってしまいます)、本記事の文脈はプログラムがどのような順で処理を行うかについての話です。と言っても、非同期が特殊なケースで、同期が一般的なケースだと考えておけばよいでしょう。つまり、私達は普通にプログラムを書くとき、意識せずとも同期的なコードを書いているはずです。

処理A、B、Cがあったとき、実行時間を気にする必要がなければ、Aを処理して、それが終わったらBを処理して、それが終わったらCを処理する、といったコードを書くのが普通だと思います。

fn some_function() {
    a();
    b();
    c();
}

コードで書くとこんな感じ。特段珍しいものではありませんね。こういうやり方を同期的と言います。

シングルスレッドプログラミングでは、同期的なコードはスレッドをブロックします。つまり、その処理に割り込んで他の操作を行うことができません。例えばGUIスレッドはユーザーの操作を即座に反映させたいのですが、時間のかかる処理を同期的に行うとGUIスレッドがブロックされ、いわゆる「フリーズ」などと呼ばれるようなGUIの応答がない状態になってしまいます。
これを避けるために、マルチスレッドプログラミングにして時間のかかる処理を別のスレッドで実行するという方法がありますが、処理の終わりを待機する必要があるのであれば同じようにスレッドがブロックされてしまいます。
そこで、実際には以下のようなアプローチを採用することになります。

  1. 待機関数にタイムアウト値を設定し、タイムアウトごとに溜まったイベントを処理する
  2. 待機する代わりに「処理が終わったかどうか」を定期的に問い合わせる

このうち、1のタイムアウト値付き待機関数の厄介な点は、待機する対象が増えるとどんどん複雑になっていく、という点です。特に、待機する対象が異なる場合、そもそも同時に待機する関数がないようなこともあります。タイムアウトがない場合は個別に待機関数を呼ぶだけで良い(待機関数は処理が終わっていれば即座に帰るので、どういう順に呼んでも一番時間のかかった処理に律速される)のですが、タイムアウト値がある場合は一つの関数で処理を行わなければなりません。
そこで、2のようにスレッドがアクティブになった時に問い合わせを行う方式が考えられます。これであれば、GUIスレッドでもイベントループ内にイベント処理に加えて問い合わせを行う処理を追加するだけで良い、ということになります。待機関数を使って処理の終了に同期する必要がないので、これが非同期、という訳です。

※実際の問い合わせは複数の処理に対してほぼ時間差なく行います。図中の矢印が3本重なっているような形になりますが、分かりづらいので分けています。

この問い合わせは実際のコードで書くとイベントループの中で毎回実行するような形になるでしょう。Rustを含めネイティブコードを書くような言語では、GUIアプリのイベントループは言語機能には含まれていないので自分で実装するか、もしくはGUIライブラリが用意したものを使うことになります。一方で、JavaScriptなどではイベントループは言語機能の内部に隠蔽されていて、実装を弄ることはできません。そのため、このような定期的な問い合わせ自体は見えることがなく、代わりに、問い合わせで処理が終了したことが分かったらコールバック関数が呼び出されることになります。JavaScriptの Promisethen にコールバック関数を渡していく形になっているのはこのような理由からですね。

このように、非同期ランタイムはイベントループなどと密接に関わるため、実装方法が一通りではなく、Rustでは公式で非同期ランタイムが用意されていません。非同期ランタイムを提供するクレートがいくつもあるのもそういった理由によります。

非同期処理の基本機能

Rustでは公式に非同期ランタイムは用意されていませんが、言語機能と標準ライブラリには非同期のための機能がいくつか存在します。その中でも基本的な機能について紹介します。

std::future::Future トレイト

非同期処理は Future トレイトを実装した型によって表現されます。 Future トレイトの定義は、

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

となっています。 poll の戻り値である std::task::Poll

pub enum Poll<T> {
    Ready(T),
    Pending,
}

となっていて、処理が終わった( Ready )か実行中( Pending )かを表します。

PinContext については後述します。

Future トレイトを実装すれば非同期処理を自力で作ることができます。とはいえ、実際にはライブラリ作成者でなければこのトレイトを直接実装するケースは少ないでしょう。後述の async ブロックや async 関数を使うと Future トレイトが実装された匿名型が作成されます。

async キーワード

Future トレイトを実装した型に対して定期的に poll を呼び出すことで非同期処理が終わったかどうかを確認できるのですが、実際に手動で poll を呼び出すような処理を書くのは面倒です。そこで登場するのが async キーワードです。 async キーワードは async ブロックasync 関数の2箇所で登場し、ともに Future トレイトが実装された匿名型を生成するのに使われます。

async ブロックは

async {
    // ここに非同期処理を書く
}

という形の式で、ブロック内の処理は後から非同期ランタイムにより実行させることができます。コードを書いた場所の外側の値などを取り込むこともでき、引数のないクロージャと似たようなものと言えます。クロージャに move キーワードを使って外側の値を参照ではなくムーブすることができるように、 async move {} とすることもできます。

async ブロックの戻り値の型は、自動実装される Future トレイトの Output 関連型になります。

let a = async { /*非同期にする意味がないけど*/ 42i64 };
// aの型は `impl Future<Output = i64>` になる

async 関数は

async fn some_async_function() -> T {
    // ここに非同期処理を書く
    todo!()
}

のような関数です。通常の関数の頭に async をつけるだけで文法上大きな違いはありません。
これは実際には、

fn some_async_function() -> impl Future<Output = T> {
    async move {
        // ここに非同期処理を書く
        todo!()
    }
}

と書くのとほぼ同じことです。つまり async 関数は async ブロックを返す関数のシンタックスシュガーと考えて良いでしょう。

なお、クロージャに対する特殊な文法はありません(RFCはあるので将来実装されるかもしれない)が、大抵の場合は || async {} とすれば問題ないでしょう。

await キーワード

awaitFuture<Output = T>T に変換する操作を表現するキーワードです。
Rustでは、 await キーワードは async ブロックまたは async 関数の中でのみ使用可能で、

async {
    let future = some_async_function();
    future.await
}

というように、フィールドへのアクセスと同じような形で .await をつけますが、フィールドではなく文法的に await 式として用意されています。

async ブロックは await の前後で処理が分割され、 await 式の Future オブジェクトが Ready(T) を返すまで後続の処理は行われません。逆に async 関数の内部でも、 await を含まない部分は同期的に実行されます。async ブロックは一回の poll の呼び出しで、必ず await の位置かコード末尾もしくは return の位置に 到達することになります。 await の位置に到達した async ブロックは、内部の await 対象のオブジェクトが Ready になるまで、 poll の呼び出しを対象オブジェクトに移譲することになります。

こうして、 await を随所に挟むことで、実際には非同期的な poll の呼び出しによって進行する一連の処理が、見た目上同期的な処理と同じように記述することができるようになります。

block_on の実装

一番単純な非同期ランタイムの実装として、 block_on を作ってみようと思います。
例えば async_std には async_std::task::block_on が用意されていたり、 tokio の場合は tokio::runtime::Runtime 型のメソッドとして block_on が用意されていたりします。
この関数は単純に、 Future トレイトが実装された型の値を受け取って、処理が終わったら Output の値を返すものです。つまり、非同期処理を同期的な待機関数にしてしまう訳ですね。

とりあえずシグネチャだけ定義すると、こうなります。

use std::future::Future;

pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
    todo!()
}

続いて、中身を実装する訳ですが、この関数は定期的に poll を呼び出して、結果が Ready だったら値を返したい訳です。とりあえず、ループの中でpollを呼び出せると良さそうですね。

use std::future::Future;

pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
    loop {
        future.poll()
    }
}

はい、とりあえず書いてみましたが、エラーになりますね。これも当然で、 Future::pollself の型は Self ではなく Pin<&mut Self> になっていますし、第二引数に &mut Context もあります。 PinContext についてはまだ説明していませんでしたね。という訳で、ようやく「作って学ぶ」に入り始めたところですが、一旦これらの説明をしようと思います。

std::pin::Pin

Pin という構造体についていまいち良く分かっていない人も多いのではないでしょうか。私も理解するのに時間がかかりました。Pin の由来は、画鋲など何かを留めるのに使うあのピンです。では何を留めるのかという話になりますよね。

そもそもなぜ Pin が必要かというと、これはRustの言語機能であるムーブが関わっています。
Rustでは、 Copy が実装されていない型の値の変数を他の場所に代入すると、元の変数の寿命が尽きて、代入先が所有権を引き継ぐことになります。この時の操作が、メモリレベルではC言語の memcpy と同じ、つまり単純にコピーを行うことで実現されます。元の変数があった位置には値が残りますが、それはアクセスしない限りは問題なく、いずれメモリ領域が再利用されて上書きされたりするかもしれません。まあここらへんは今回の話とはあまり関係ありませんが、重要なのは言語機能としてムーブが用意され、単純なコピーによる実装になっているということです。

これ自体はとても良くできていて、C++のようにプログラマーがムーブの内容に介入する必要もなく、ほとんどの場合でうまく行きます。ただ、「自己参照構造体が作れない」という弱点があります。
自己参照構造体というのは、自分自身のアドレスをメンバーに持つ構造体のことです。もちろん、安全性を考えなければそのような構造体を作るのは難しくありません。例えば、以下のようにできます。

struct SelfRef {
    ptr: *const Self,
}

let mut self_ref = SelfRef { ptr: std::ptr::null(); }
self_ref.ptr = &self_ref as _;

ですが、ここで SelfRef::new() を作ることを考えてみましょう。

struct SelfRef {
    ptr: *const Self,
}

impl SelfRef {
    fn new() -> Self {
        let mut self_ref = SelfRef { ptr: std::ptr::null(); }
        self_ref.ptr = &self_ref;
        self_ref
    }
}

ここで、 self_ref はムーブされ、 new 関数の戻り値になりました。すると、ムーブ後のアドレスはムーブ前と異なる可能性があります。Rustではムーブに介入できないので、ムーブ時点でアドレスの書き換えを行うことができません。したがって、このような構造体は安全に作ることができません(でも上のコードは unsafe じゃないのでは、と思うかもしれませんが、実際はポインタにアクセスする時に unsafe な操作を行う必要があり、その時点で安全性を担保できないということになります)。

実際、このような構造体を作る機会はあまりないのですが、 async ブロックについて考えると話は変わります。

async ブロック内の変数は、スタック上の自動変数ではなく async ブロックが生成する匿名型のメンバだと考えることができます。例えば、

async {
    let a = 42i64;
    let b = &a;
    some_async_function().await
    println!("{b}");
}

という async ブロックは、仮に構造体として定義するなら、

struct UnnamedAsyncBlock {
    a: MaybeUninit<i64>,
    b: MaybeUninit<*const i64>,
    future: MaybeUninit<ReturnTypeOfSomeAsyncFunction>,
    address: *const (),
}

こんな感じの構造になっているでしょう( futuresome_async_function() の戻り値、 address はどこから処理を再開するかの情報が入るものとします)。

1回目の poll の呼び出しまでは、上記の構造体の中身は address 以外未初期化です。 poll が呼び出されると、 a42i64 で初期化され、 b には a のアドレスが入ります。 a のアドレスはこの構造体の内部のアドレスなので、この構造体は自己参照することになります。したがって、一度 poll を呼び出したこの構造体をムーブするのは危険、ということになります。

ムーブされると危険なので、処理が終わるまではムーブできないようにしておきたいですよね。それが Pin の役目です。 Pin は構造的には内部に可変借用を持つだけの構造体で、借用されている間はムーブできないという言語仕様により、Pin の寿命の間は「ピン留め」されていることが保証され、安全に poll を呼び出すことができます。

スタック上の値をピン留めするときは、 std::pin::pin! マクロを使います。 pin! マクロの実装は以下のようになっていて、

// https://doc.rust-lang.org/1.82.0/src/core/pin.rs.html#2015
$crate::pin::Pin::<&mut _> { __pointer: &mut { $value } }

引数である $value をブロック式の中にいれることで強制的にムーブし、その値の可変参照をメンバに入れています。ブロック式の値は一時変数になり、借用されることで寿命が Pin の生存期間に伸びますが、名前がついていないので Pin を介さずにアクセスすることはできなくなります。

と、実はここまでは Pin<&mut T> の説明で、他にも Pin<Box<T>>Pin<Rc<T>> などポインタライクな型を持つ形で Pin が現れることがあります。これらの型の内部の値はスタックではなくヒープ上に作られるため、ムーブされても安全です。例えば Box<T> をムーブしても実際にはポインタのコピーと同じことが起きるだけで、自己参照構造があってもヒープ上のアドレスは移動しないためです。
ただし、 &mut Box<T> から &mut T を取り出す操作は安全ではありません。なぜなら、 &mut T はスワップすることができるためです。スワップもムーブと同じくメモリ上の値をそのまま交換することで実装できるため、自己参照構造体をスワップすると壊れてしまいます。そのため、 Pin<Box<T>> などの形で扱うとそのような操作は unsafe メソッドを使わずには行えないようになります。

std::marker::Unpin トレイト

さて、とはいえ、 Pin はあくまで自己参照構造体を安全に扱うためのラッパーで、ほとんどのRustの型はムーブしても安全になっています。つまり、わざわざ Pin を介さなくても良いのですが、型の一貫性のためだけに Pin を使う必要があるのです。そういう型は Unpin トレイト境界が実装された型として表現されます。 Unpin、つまり「ピン留めを外す」ことができる型という意味です。
TUnpin トレイト境界が実装されているとき、 Pin<&mut T>Pin<Box<T>> から &mut T を安全に取り出すことができます。 DerefMut なども実装されるので、意識せずとも透過的に T のメソッドを利用することが可能です。逆に言うと、 Unpin が実装されていない型に対してはこのような操作は unsafe メソッドを介さないといけないので、安全性が保たれるという訳です。

Unpin は Auto traits という少し特殊なトレイトで、構造体のフィールドの内容に応じてトレイト境界が自動的に実装されます。具体的には、フィールドがすべて Unpin なら実装され、そうでなければ実装されないということになります。
std::marker::PhantomPinned をフィールドに入れることで明示的に Unpin を実装させないようにもできます。

std::task::Contextstd::task::Waker

poll の第二引数である Context は、内部に Waker を持ちます。 Waker が何かというと、処理が終了した時に wake() を呼ぶと、非同期ランタイム側に通知することができる型です。
最初の節で「非同期は定期的に問い合わせを行う」と説明しましたが、実際にはそれだと無駄があったり動作が遅くなったりします。問い合わせの間隔が長いと、処理が終わっていても次の問い合わせまで後続の処理が開始できず、全体の動作時間が長くなってしまいます。かといって間隔を短くすると問い合わせの処理が多くなってCPU時間を浪費することになります。極端な話、スリープを挟まずに問い合わせを続けると処理の終了に即座に対応できることになりますが、それはビジーループになってしまいますよね。という訳で、メインスレッドはやることがない時は処理が終了するまで停止して、処理が終わった瞬間に再開できると都合が良い訳です。そのために使われるのが Waker です。
Context は今のところ Waker を返すだけのオブジェクトなのですが、拡張性を残すために Waker を直接渡さないようにしているのだと思います。

Waker の仮実装

実際のランタイムは Waker::wake の呼び出しに応じて再び poll を呼び出すように作る訳ですが、とりあえず最初はそこらへんは無視して定期的な問い合わせ方式を使うことにしましょう。
とはいえ、 poll の引数に Context がいる以上、なんらかの形で Context を作る必要があります。
Context には from_waker というメソッドがあり、 Waker から構築することができます。なので Waker さえ作れてしまえば良いということになりますね。
ですが、 Waker の構築用メソッドは from_raw という unsafe 関数です。これを使うのは難しいので、代わりに std::task::Wake トレイトを使いましょう。 Wake トレイトは必ず Ark<Self> を引数に取るようになる(場合によっては無駄がある)代わりに安全に Waker を作ることができるように用意されたトレイトです。

Wake トレイトを使って WakerContext を作るとこうなります。

use std::{
    sync::Ark,
    task::Context,
};

struct MyWaker {}

impl MyWaker {
    fn new() -> Arc<Self> {
        Arc::new(Self {})
    }
}

impl Wake for MyWaker {
    fn wake(self: Arc<Self>) {
        // TODO
    }
}

let waker = MyWaker::new().into();
let cx = Context::from_waker(&waker);

この MyWaker は今のところ何もしない Waker で、上記コードの // TODO の部分にランタイムに対する通知などのコードを入れることができるのですが、とりあえずは定期的な問い合わせを行うので問題ありません。
これでとりあえず block_on がちゃんと(とりあえず動くように)実装できるようになりました。

use std::{
    future::Future,
    pin::pin,
    task::{Context, Poll},
    thread::sleep,
    time::Duration,
};

pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
    let waker = MyWaker::new().into();
    let mut cx = Context::from_waker(&waker);
    let mut future = pin!(future);
    loop {
        match future.as_mut().poll(&mut cx) {
            Poll::Ready(v) => return v,
            Poll::Pending => sleep(Duration::from_millis(1)),
        }
    }
}

やっていることは単純で、 Ready になるまで1msごと(OSの分解能などに依存するので実際は1msとは限りません)に poll を呼び出しているだけです。

さて、これが実際に動くかどうか試してみるには、何らかの Future が必要です。
そこで、単なるタイムアウトを行う関数を実装してみます。

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::{Duration, Instant},
};

fn timeout(dur: Duration) -> impl Future<()> {
    struct Timeout(Instant);
    impl Future for Timeout {
        type Output = ();

        fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
            if Instant::now() >= self.0 {
                return Poll::Ready(())
            }
            Poll::Pending
        }
    }
    Timeout(Instant::now() + dur)
}

この Timeout 構造体は Waker::wake を呼び出さないので Future の実装としては不完全なのですが(ランタイムによってはこれは poll が呼び出されずにブロックし続けます)、とりあえずさきほどの block_on の実装では問題なく動作するはずです。

use std::time::Duration;

fn main() {
    block_on(async {
        timeout(Duration::from_secs(5)).await;
        println!("Hello, async world!");
    });
}

ちゃんと実装ができていれば、起動から5秒後に Hello, async world! と標準出力に出力するプログラムになるはずです。

MyWaker::wake の実装

wake の中身はランタイムが自由に決めることができますが、まずは簡単に std::thread::parkstd::thread::Thread::unpark を使ってスレッドの停止・再開を行う仕組みにしようと思います。

まずは MyWaker の構造をこのようにして、内部に block_on を行うスレッドの情報を持てるようにします。

use std::thread::{current, Thread};

struct MyWaker {
    thread: Thread,
}

impl MyWaker {
    fn new() -> Arc<Self> {
        Arc::new(Self { thread: current() })
    }
}

続いて、 wake の中身を実装します。

impl Wake for MyWaker {
    fn wake(self: Arc<Self>) {
        self.thread.unpark();
    }
}

block_on の方も変更します。

use std::{
    future::Future,
    pin::pin,
    task::{Context, Poll},
    thread::park,
    time::Duration,
};

pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
    let waker = MyWaker::new().into();
    let mut cx = Context::from_waker(&waker);
    let mut future = pin!(future);
    loop {
        match future.as_mut().poll(&mut cx) {
            Poll::Ready(v) => return v,
            Poll::Pending => park(),
        }
    }
}

sleep の代わりに park を使うことで、無駄な定期的な poll の呼び出しをしなくて良くなりました。
しかし、この実装だと他のスレッドから block_on を行っているスレッドを起こしてやる必要が生まれるため、先ほどの timeout がうまく動かなくなります。こちらもスレッドを使うように変更しましょう。
今回は、今まで無視していた Context からちゃんと Waker を取り出して内部状態に入れる必要があります。

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::{Duration, Instant},
    thread::{sleep, spawn},
    sync::Mutex,
};

fn timeout(dur: Duration) -> impl Future<()> {
    struct Timeout(Arc<Mutex<TimeoutState>>);
    struct TimeoutState {
        completed: bool,
        waker: Option<Waker>,
    }
    impl Future for Timeout {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
            let mut state = self.0.lock().unwrap();
            if state.completed {
                return Poll::Ready(());
            }
            // `waker` は `poll` の呼び出しごとに更新する。呼び出しごとに別の `Waker` 実装に変えても良いため
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
    let state = Arc::new(Mutex::new(TimeoutState{
        completed: false,
        waker: None,
    }));
    spawn({
        let state = state.clone();
        move || {
            sleep(dur);
            let mut state = state.lock().unwrap();
            state.completed = true;
            if let Some(waker) = state.waker.take() {
                waker.wake();
            }
        }
    });
    Timeout(state)
}

毎回時間を確認する代わりに、別スレッドで sleep を呼ぶようにしました。これだと timeout を複数呼ぶと毎回OSスレッドが作られることになりオーバーヘッドが大きいですが、そこは実装次第です。今回の記事ではこれで良しとしましょう。

終わりに

ここまでで一応「非同期ランタイムが作れた」と言っても良いかなと思います。もちろん、 block_on しかないランタイムは不便で、実際にはOSのイベントを処理しつつ非同期タスクを待機するなどの要求があったりするのですが、そろそろ文字数もけっこうな量になってますし、締切も近いので一旦ここで筆を置こうと思います。

もう少し発展的な内容を「作って学ぶRustの非同期ランタイム 2」などと銘打って、近い内に(まあ……せめて来年のTechrachoの夏イベントくらいには……)書こうかなと思います。

それではまた。メリークリスマス!


BPSアドベントカレンダー2024


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。