こんにちは。yoshiです。
TechRachoアドベントカレンダー2024ということで、クリスマスとはなんの関係もない話題をやっていこうと思います。
今回扱うのはRustの非同期ランタイムです。Rustの非同期ランタイムというと、 tokio
とか async-std
とか他にもありますが、今回はそれらと同じようなものを手動で作っていくことで理解して行こうという趣旨です。
非同期とは何か
実際に作り始める前に、非同期とは何か軽く説明しておくことにします。そんなのは分かっているよ、という方は次の節まで飛ばして構いません。
非同期の話をするには、まず同期の話をしておいた方が良いでしょう。コード上ではそれぞれ async
と sync
という文言で表されることが多いです。それぞれ asynchronous
と synchronous
の略ですね。ところで asynchronous
は「エイシンクロナス」みたいに発音する単語ですが、 async
という文字列はどうしても「アシンク」と読んでしまう人が多いのではないでしょうか。
「同期」には色々な意味がありますが(例えばNTPで時刻同期する、とかだと全く別の話になってしまいます)、本記事の文脈はプログラムがどのような順で処理を行うかについての話です。と言っても、非同期が特殊なケースで、同期が一般的なケースだと考えておけばよいでしょう。つまり、私達は普通にプログラムを書くとき、意識せずとも同期的なコードを書いているはずです。
処理A、B、Cがあったとき、実行時間を気にする必要がなければ、Aを処理して、それが終わったらBを処理して、それが終わったらCを処理する、といったコードを書くのが普通だと思います。
fn some_function() {
a();
b();
c();
}
コードで書くとこんな感じ。特段珍しいものではありませんね。こういうやり方を同期的と言います。
シングルスレッドプログラミングでは、同期的なコードはスレッドをブロックします。つまり、その処理に割り込んで他の操作を行うことができません。例えばGUIスレッドはユーザーの操作を即座に反映させたいのですが、時間のかかる処理を同期的に行うとGUIスレッドがブロックされ、いわゆる「フリーズ」などと呼ばれるようなGUIの応答がない状態になってしまいます。
これを避けるために、マルチスレッドプログラミングにして時間のかかる処理を別のスレッドで実行するという方法がありますが、処理の終わりを待機する必要があるのであれば同じようにスレッドがブロックされてしまいます。
そこで、実際には以下のようなアプローチを採用することになります。
- 待機関数にタイムアウト値を設定し、タイムアウトごとに溜まったイベントを処理する
- 待機する代わりに「処理が終わったかどうか」を定期的に問い合わせる
このうち、1のタイムアウト値付き待機関数の厄介な点は、待機する対象が増えるとどんどん複雑になっていく、という点です。特に、待機する対象が異なる場合、そもそも同時に待機する関数がないようなこともあります。タイムアウトがない場合は個別に待機関数を呼ぶだけで良い(待機関数は処理が終わっていれば即座に帰るので、どういう順に呼んでも一番時間のかかった処理に律速される)のですが、タイムアウト値がある場合は一つの関数で処理を行わなければなりません。
そこで、2のようにスレッドがアクティブになった時に問い合わせを行う方式が考えられます。これであれば、GUIスレッドでもイベントループ内にイベント処理に加えて問い合わせを行う処理を追加するだけで良い、ということになります。待機関数を使って処理の終了に同期する必要がないので、これが非同期、という訳です。
※実際の問い合わせは複数の処理に対してほぼ時間差なく行います。図中の矢印が3本重なっているような形になりますが、分かりづらいので分けています。
この問い合わせは実際のコードで書くとイベントループの中で毎回実行するような形になるでしょう。Rustを含めネイティブコードを書くような言語では、GUIアプリのイベントループは言語機能には含まれていないので自分で実装するか、もしくはGUIライブラリが用意したものを使うことになります。一方で、JavaScriptなどではイベントループは言語機能の内部に隠蔽されていて、実装を弄ることはできません。そのため、このような定期的な問い合わせ自体は見えることがなく、代わりに、問い合わせで処理が終了したことが分かったらコールバック関数が呼び出されることになります。JavaScriptの Promise
が then
にコールバック関数を渡していく形になっているのはこのような理由からですね。
このように、非同期ランタイムはイベントループなどと密接に関わるため、実装方法が一通りではなく、Rustでは公式で非同期ランタイムが用意されていません。非同期ランタイムを提供するクレートがいくつもあるのもそういった理由によります。
非同期処理の基本機能
Rustでは公式に非同期ランタイムは用意されていませんが、言語機能と標準ライブラリには非同期のための機能がいくつか存在します。その中でも基本的な機能について紹介します。
std::future::Future
トレイト
非同期処理は Future
トレイトを実装した型によって表現されます。 Future
トレイトの定義は、
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
となっています。 poll
の戻り値である std::task::Poll
は
pub enum Poll<T> {
Ready(T),
Pending,
}
となっていて、処理が終わった( Ready
)か実行中( Pending
)かを表します。
Pin
や Context
については後述します。
Future
トレイトを実装すれば非同期処理を自力で作ることができます。とはいえ、実際にはライブラリ作成者でなければこのトレイトを直接実装するケースは少ないでしょう。後述の async
ブロックや async
関数を使うと Future
トレイトが実装された匿名型が作成されます。
async
キーワード
Future
トレイトを実装した型に対して定期的に poll
を呼び出すことで非同期処理が終わったかどうかを確認できるのですが、実際に手動で poll
を呼び出すような処理を書くのは面倒です。そこで登場するのが async
キーワードです。 async
キーワードは async
ブロックと async
関数の2箇所で登場し、ともに Future
トレイトが実装された匿名型を生成するのに使われます。
async
ブロックは
async {
// ここに非同期処理を書く
}
という形の式で、ブロック内の処理は後から非同期ランタイムにより実行させることができます。コードを書いた場所の外側の値などを取り込むこともでき、引数のないクロージャと似たようなものと言えます。クロージャに move
キーワードを使って外側の値を参照ではなくムーブすることができるように、 async move {}
とすることもできます。
async
ブロックの戻り値の型は、自動実装される Future
トレイトの Output
関連型になります。
let a = async { /*非同期にする意味がないけど*/ 42i64 };
// aの型は `impl Future<Output = i64>` になる
async
関数は
async fn some_async_function() -> T {
// ここに非同期処理を書く
todo!()
}
のような関数です。通常の関数の頭に async
をつけるだけで文法上大きな違いはありません。
これは実際には、
fn some_async_function() -> impl Future<Output = T> {
async move {
// ここに非同期処理を書く
todo!()
}
}
と書くのとほぼ同じことです。つまり async
関数は async
ブロックを返す関数のシンタックスシュガーと考えて良いでしょう。
なお、クロージャに対する特殊な文法はありません(RFCはあるので将来実装されるかもしれない)が、大抵の場合は || async {}
とすれば問題ないでしょう。
await
キーワード
await
は Future<Output = T>
を T
に変換する操作を表現するキーワードです。
Rustでは、 await
キーワードは async
ブロックまたは async
関数の中でのみ使用可能で、
async {
let future = some_async_function();
future.await
}
というように、フィールドへのアクセスと同じような形で .await
をつけますが、フィールドではなく文法的に await
式として用意されています。
async
ブロックは await
の前後で処理が分割され、 await
式の Future
オブジェクトが Ready(T)
を返すまで後続の処理は行われません。逆に async
関数の内部でも、 await
を含まない部分は同期的に実行されます。async
ブロックは一回の poll
の呼び出しで、必ず await
の位置かコード末尾もしくは return
の位置に 到達することになります。 await
の位置に到達した async
ブロックは、内部の await
対象のオブジェクトが Ready
になるまで、 poll
の呼び出しを対象オブジェクトに移譲することになります。
こうして、 await
を随所に挟むことで、実際には非同期的な poll
の呼び出しによって進行する一連の処理が、見た目上同期的な処理と同じように記述することができるようになります。
block_on
の実装
一番単純な非同期ランタイムの実装として、 block_on
を作ってみようと思います。
例えば async_std
には async_std::task::block_on
が用意されていたり、 tokio
の場合は tokio::runtime::Runtime
型のメソッドとして block_on
が用意されていたりします。
この関数は単純に、 Future
トレイトが実装された型の値を受け取って、処理が終わったら Output
の値を返すものです。つまり、非同期処理を同期的な待機関数にしてしまう訳ですね。
とりあえずシグネチャだけ定義すると、こうなります。
use std::future::Future;
pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
todo!()
}
続いて、中身を実装する訳ですが、この関数は定期的に poll
を呼び出して、結果が Ready
だったら値を返したい訳です。とりあえず、ループの中でpollを呼び出せると良さそうですね。
use std::future::Future;
pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
loop {
future.poll()
}
}
はい、とりあえず書いてみましたが、エラーになりますね。これも当然で、 Future::poll
の self
の型は Self
ではなく Pin<&mut Self>
になっていますし、第二引数に &mut Context
もあります。 Pin
や Context
についてはまだ説明していませんでしたね。という訳で、ようやく「作って学ぶ」に入り始めたところですが、一旦これらの説明をしようと思います。
std::pin::Pin
Pin
という構造体についていまいち良く分かっていない人も多いのではないでしょうか。私も理解するのに時間がかかりました。Pin
の由来は、画鋲など何かを留めるのに使うあのピンです。では何を留めるのかという話になりますよね。
そもそもなぜ Pin
が必要かというと、これはRustの言語機能であるムーブが関わっています。
Rustでは、 Copy
が実装されていない型の値の変数を他の場所に代入すると、元の変数の寿命が尽きて、代入先が所有権を引き継ぐことになります。この時の操作が、メモリレベルではC言語の memcpy
と同じ、つまり単純にコピーを行うことで実現されます。元の変数があった位置には値が残りますが、それはアクセスしない限りは問題なく、いずれメモリ領域が再利用されて上書きされたりするかもしれません。まあここらへんは今回の話とはあまり関係ありませんが、重要なのは言語機能としてムーブが用意され、単純なコピーによる実装になっているということです。
これ自体はとても良くできていて、C++のようにプログラマーがムーブの内容に介入する必要もなく、ほとんどの場合でうまく行きます。ただ、「自己参照構造体が作れない」という弱点があります。
自己参照構造体というのは、自分自身のアドレスをメンバーに持つ構造体のことです。もちろん、安全性を考えなければそのような構造体を作るのは難しくありません。例えば、以下のようにできます。
struct SelfRef {
ptr: *const Self,
}
let mut self_ref = SelfRef { ptr: std::ptr::null(); }
self_ref.ptr = &self_ref as _;
ですが、ここで SelfRef::new()
を作ることを考えてみましょう。
struct SelfRef {
ptr: *const Self,
}
impl SelfRef {
fn new() -> Self {
let mut self_ref = SelfRef { ptr: std::ptr::null(); }
self_ref.ptr = &self_ref;
self_ref
}
}
ここで、 self_ref
はムーブされ、 new
関数の戻り値になりました。すると、ムーブ後のアドレスはムーブ前と異なる可能性があります。Rustではムーブに介入できないので、ムーブ時点でアドレスの書き換えを行うことができません。したがって、このような構造体は安全に作ることができません(でも上のコードは unsafe
じゃないのでは、と思うかもしれませんが、実際はポインタにアクセスする時に unsafe
な操作を行う必要があり、その時点で安全性を担保できないということになります)。
実際、このような構造体を作る機会はあまりないのですが、 async
ブロックについて考えると話は変わります。
async
ブロック内の変数は、スタック上の自動変数ではなく async
ブロックが生成する匿名型のメンバだと考えることができます。例えば、
async {
let a = 42i64;
let b = &a;
some_async_function().await
println!("{b}");
}
という async
ブロックは、仮に構造体として定義するなら、
struct UnnamedAsyncBlock {
a: MaybeUninit<i64>,
b: MaybeUninit<*const i64>,
future: MaybeUninit<ReturnTypeOfSomeAsyncFunction>,
address: *const (),
}
こんな感じの構造になっているでしょう( future
は some_async_function()
の戻り値、 address
はどこから処理を再開するかの情報が入るものとします)。
1回目の poll
の呼び出しまでは、上記の構造体の中身は address
以外未初期化です。 poll
が呼び出されると、 a
が 42i64
で初期化され、 b
には a
のアドレスが入ります。 a
のアドレスはこの構造体の内部のアドレスなので、この構造体は自己参照することになります。したがって、一度 poll
を呼び出したこの構造体をムーブするのは危険、ということになります。
ムーブされると危険なので、処理が終わるまではムーブできないようにしておきたいですよね。それが Pin
の役目です。 Pin
は構造的には内部に可変借用を持つだけの構造体で、借用されている間はムーブできないという言語仕様により、Pin
の寿命の間は「ピン留め」されていることが保証され、安全に poll
を呼び出すことができます。
スタック上の値をピン留めするときは、 std::pin::pin!
マクロを使います。 pin!
マクロの実装は以下のようになっていて、
// https://doc.rust-lang.org/1.82.0/src/core/pin.rs.html#2015
$crate::pin::Pin::<&mut _> { __pointer: &mut { $value } }
引数である $value
をブロック式の中にいれることで強制的にムーブし、その値の可変参照をメンバに入れています。ブロック式の値は一時変数になり、借用されることで寿命が Pin
の生存期間に伸びますが、名前がついていないので Pin
を介さずにアクセスすることはできなくなります。
と、実はここまでは Pin<&mut T>
の説明で、他にも Pin<Box<T>>
や Pin<Rc<T>>
などポインタライクな型を持つ形で Pin
が現れることがあります。これらの型の内部の値はスタックではなくヒープ上に作られるため、ムーブされても安全です。例えば Box<T>
をムーブしても実際にはポインタのコピーと同じことが起きるだけで、自己参照構造があってもヒープ上のアドレスは移動しないためです。
ただし、 &mut Box<T>
から &mut T
を取り出す操作は安全ではありません。なぜなら、 &mut T
はスワップすることができるためです。スワップもムーブと同じくメモリ上の値をそのまま交換することで実装できるため、自己参照構造体をスワップすると壊れてしまいます。そのため、 Pin<Box<T>>
などの形で扱うとそのような操作は unsafe
メソッドを使わずには行えないようになります。
std::marker::Unpin
トレイト
さて、とはいえ、 Pin
はあくまで自己参照構造体を安全に扱うためのラッパーで、ほとんどのRustの型はムーブしても安全になっています。つまり、わざわざ Pin
を介さなくても良いのですが、型の一貫性のためだけに Pin
を使う必要があるのです。そういう型は Unpin
トレイト境界が実装された型として表現されます。 Unpin
、つまり「ピン留めを外す」ことができる型という意味です。
T
に Unpin
トレイト境界が実装されているとき、 Pin<&mut T>
や Pin<Box<T>>
から &mut T
を安全に取り出すことができます。 DerefMut
なども実装されるので、意識せずとも透過的に T
のメソッドを利用することが可能です。逆に言うと、 Unpin
が実装されていない型に対してはこのような操作は unsafe
メソッドを介さないといけないので、安全性が保たれるという訳です。
Unpin
は Auto traits という少し特殊なトレイトで、構造体のフィールドの内容に応じてトレイト境界が自動的に実装されます。具体的には、フィールドがすべて Unpin
なら実装され、そうでなければ実装されないということになります。
std::marker::PhantomPinned
をフィールドに入れることで明示的に Unpin
を実装させないようにもできます。
std::task::Context
とstd::task::Waker
poll
の第二引数である Context
は、内部に Waker
を持ちます。 Waker
が何かというと、処理が終了した時に wake()
を呼ぶと、非同期ランタイム側に通知することができる型です。
最初の節で「非同期は定期的に問い合わせを行う」と説明しましたが、実際にはそれだと無駄があったり動作が遅くなったりします。問い合わせの間隔が長いと、処理が終わっていても次の問い合わせまで後続の処理が開始できず、全体の動作時間が長くなってしまいます。かといって間隔を短くすると問い合わせの処理が多くなってCPU時間を浪費することになります。極端な話、スリープを挟まずに問い合わせを続けると処理の終了に即座に対応できることになりますが、それはビジーループになってしまいますよね。という訳で、メインスレッドはやることがない時は処理が終了するまで停止して、処理が終わった瞬間に再開できると都合が良い訳です。そのために使われるのが Waker
です。
Context
は今のところ Waker
を返すだけのオブジェクトなのですが、拡張性を残すために Waker
を直接渡さないようにしているのだと思います。
Waker
の仮実装
実際のランタイムは Waker::wake
の呼び出しに応じて再び poll
を呼び出すように作る訳ですが、とりあえず最初はそこらへんは無視して定期的な問い合わせ方式を使うことにしましょう。
とはいえ、 poll
の引数に Context
がいる以上、なんらかの形で Context
を作る必要があります。
Context
には from_waker
というメソッドがあり、 Waker
から構築することができます。なので Waker
さえ作れてしまえば良いということになりますね。
ですが、 Waker
の構築用メソッドは from_raw
という unsafe
関数です。これを使うのは難しいので、代わりに std::task::Wake
トレイトを使いましょう。 Wake
トレイトは必ず Ark<Self>
を引数に取るようになる(場合によっては無駄がある)代わりに安全に Waker
を作ることができるように用意されたトレイトです。
Wake
トレイトを使って Waker
と Context
を作るとこうなります。
use std::{
sync::Ark,
task::Context,
};
struct MyWaker {}
impl MyWaker {
fn new() -> Arc<Self> {
Arc::new(Self {})
}
}
impl Wake for MyWaker {
fn wake(self: Arc<Self>) {
// TODO
}
}
let waker = MyWaker::new().into();
let cx = Context::from_waker(&waker);
この MyWaker
は今のところ何もしない Waker
で、上記コードの // TODO
の部分にランタイムに対する通知などのコードを入れることができるのですが、とりあえずは定期的な問い合わせを行うので問題ありません。
これでとりあえず block_on
がちゃんと(とりあえず動くように)実装できるようになりました。
use std::{
future::Future,
pin::pin,
task::{Context, Poll},
thread::sleep,
time::Duration,
};
pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
let waker = MyWaker::new().into();
let mut cx = Context::from_waker(&waker);
let mut future = pin!(future);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => sleep(Duration::from_millis(1)),
}
}
}
やっていることは単純で、 Ready
になるまで1msごと(OSの分解能などに依存するので実際は1msとは限りません)に poll
を呼び出しているだけです。
さて、これが実際に動くかどうか試してみるには、何らかの Future
が必要です。
そこで、単なるタイムアウトを行う関数を実装してみます。
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
fn timeout(dur: Duration) -> impl Future<()> {
struct Timeout(Instant);
impl Future for Timeout {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
if Instant::now() >= self.0 {
return Poll::Ready(())
}
Poll::Pending
}
}
Timeout(Instant::now() + dur)
}
この Timeout
構造体は Waker::wake
を呼び出さないので Future
の実装としては不完全なのですが(ランタイムによってはこれは poll
が呼び出されずにブロックし続けます)、とりあえずさきほどの block_on
の実装では問題なく動作するはずです。
use std::time::Duration;
fn main() {
block_on(async {
timeout(Duration::from_secs(5)).await;
println!("Hello, async world!");
});
}
ちゃんと実装ができていれば、起動から5秒後に Hello, async world!
と標準出力に出力するプログラムになるはずです。
MyWaker::wake
の実装
wake
の中身はランタイムが自由に決めることができますが、まずは簡単に std::thread::park
と std::thread::Thread::unpark
を使ってスレッドの停止・再開を行う仕組みにしようと思います。
まずは MyWaker
の構造をこのようにして、内部に block_on
を行うスレッドの情報を持てるようにします。
use std::thread::{current, Thread};
struct MyWaker {
thread: Thread,
}
impl MyWaker {
fn new() -> Arc<Self> {
Arc::new(Self { thread: current() })
}
}
続いて、 wake
の中身を実装します。
impl Wake for MyWaker {
fn wake(self: Arc<Self>) {
self.thread.unpark();
}
}
block_on
の方も変更します。
use std::{
future::Future,
pin::pin,
task::{Context, Poll},
thread::park,
time::Duration,
};
pub fn block_on<F, T>(future: F) -> T
where F: Future<Output = T> {
let waker = MyWaker::new().into();
let mut cx = Context::from_waker(&waker);
let mut future = pin!(future);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => park(),
}
}
}
sleep
の代わりに park
を使うことで、無駄な定期的な poll
の呼び出しをしなくて良くなりました。
しかし、この実装だと他のスレッドから block_on
を行っているスレッドを起こしてやる必要が生まれるため、先ほどの timeout
がうまく動かなくなります。こちらもスレッドを使うように変更しましょう。
今回は、今まで無視していた Context
からちゃんと Waker
を取り出して内部状態に入れる必要があります。
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
thread::{sleep, spawn},
sync::Mutex,
};
fn timeout(dur: Duration) -> impl Future<()> {
struct Timeout(Arc<Mutex<TimeoutState>>);
struct TimeoutState {
completed: bool,
waker: Option<Waker>,
}
impl Future for Timeout {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = self.0.lock().unwrap();
if state.completed {
return Poll::Ready(());
}
// `waker` は `poll` の呼び出しごとに更新する。呼び出しごとに別の `Waker` 実装に変えても良いため
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
let state = Arc::new(Mutex::new(TimeoutState{
completed: false,
waker: None,
}));
spawn({
let state = state.clone();
move || {
sleep(dur);
let mut state = state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
});
Timeout(state)
}
毎回時間を確認する代わりに、別スレッドで sleep
を呼ぶようにしました。これだと timeout
を複数呼ぶと毎回OSスレッドが作られることになりオーバーヘッドが大きいですが、そこは実装次第です。今回の記事ではこれで良しとしましょう。
終わりに
ここまでで一応「非同期ランタイムが作れた」と言っても良いかなと思います。もちろん、 block_on
しかないランタイムは不便で、実際にはOSのイベントを処理しつつ非同期タスクを待機するなどの要求があったりするのですが、そろそろ文字数もけっこうな量になってますし、締切も近いので一旦ここで筆を置こうと思います。
もう少し発展的な内容を「作って学ぶRustの非同期ランタイム 2」などと銘打って、近い内に(まあ……せめて来年のTechrachoの夏イベントくらいには……)書こうかなと思います。
それではまた。メリークリスマス!