Ruby: Ractorによる安全な非同期通信の実験(翻訳)
Ractorとは、アクターモデルからヒントを得たRuby 3.0の新しいコンカレンシー抽象化です。
- APIドキュメント: Ractor | Ruby API (v3.0)
- 設計ドキュメント: ruby/ractor.md at master · ruby/ruby
ある情報を他の場所に送信する処理をRactor目線で見ると、通信を以下のいずれかに分類できます。
- 非同期(ノンブロッキング)
- Ractorは
Ractor#send
で他のRactorに情報を送信できます。その情報は送信先のRactorからRactor.receive
で読み取り可能な無限のキューに入れられます。 - 同期(ブロッキング)
- Ractorは
Ractor.yield
を使って、他のRactorがRactor#take
を呼び出すまでブロックできます。
非同期の場合を考えてみましょう。他のRactorに情報を送りたいが、処理が終わるまでブロックしたくないとします。受信側のRactorの処理速度が遅すぎる場合はどうなるでしょうか?
receiver_ractor = Ractor.new do
loop do
message = Ractor.receive
sleep 1
puts "Processed #{message}"
end
end
counter = 0
while true
counter += 1
receiver_ractor.send(counter)
end
案の定、受信側が送信側に追いつけなくなるとメモリ使用量が増加し続け、最終的にシステムメモリが枯渇してアプリケーションがクラッシュします。
RactorのAPIを見てみると、受信側が遅れているかどうかを送信者が確認する方法が組み込まれていないので、次のような方法を考えてみました。
receiver_ractor = Ractor.new do
processing_queue = Queue.new
Thread.new do
sleep(1) # このスレッドで遅い開始をシミュレート
loop do
message = processing_queue.pop
puts "Processed from queue: #{message}"
end
end
loop do
queue_size = processing_queue.size
sender, message = Ractor.select(Ractor.current, yield_value: queue_size)
if sender != :yield
processing_queue << message
puts "Added message to queue: #{message}"
else
puts "Sent queue status: #{queue_size}"
end
end
end
receiver_ractor.send(1)
receiver_ractor.send(2)
receiver_ractor.send(3)
puts "Finished submissions"
sleep(0.5)
receiver_ractor.take # ステータスを強制リフレッシュ
puts "Receiver queue length: #{receiver_ractor.take}"
sleep(1)
receiver_ractor.take # ステータスを強制リフレッシュ
puts "Receiver queue length: #{receiver_ractor.take}"
実行結果は以下のようになります。
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Finished submissions
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Sent queue status: 3
Sent queue status: 3
Receiver queue length: 3
Processed from queue: 1
Processed from queue: 2
Processed from queue: 3
Sent queue status: 3
Sent queue status: 0
Receiver queue length: 0
コードは以下のように進行します。
- メインのRactorは、
1
,2
,3
を含む3つのメッセージをreceiver_ractor
に送信する -
メインのRactorが
sleep
する -
受信側のRactorが起動して3つのメッセージを読み取り、
processing_queue
にリダイレクトする -
メインのRactorが動き出して
queue_size
を強制リフレッシュする(詳しくは後述) -
メインのRactorが
take
を呼び出し、queue_size
が3であることを検出する -
メインのRactorが
sleep
する -
受信側のRactorの2つ目のスレッドが起動して
1
,2
,3
の3つのメッセージを処理する -
メインのRactorが起動して、再度
queue_size
を強制リフレッシュする -
メインのRactorが、クエリが空であることを検出する
受信側Ractorの内部では、この戦略は次のように機能します。現在2つのスレッドがあり、スレッドのひとつは Ractor.select
を用いて2つのことを同時に行います。処理のために新しいアイテムを受け取り、それらを通常のスレッドセーフなキューに入れるか、あるいはキューの現在のサイズを返します。2番目のスレッドは、スレッドセーフなキューから受け取るものを単に処理します。
これで送信側はsend
でアイテムを送信したり、take
を2回呼び出してキューのサイズを取得したりできるようになりました。2回呼び出す理由がおわかりでしょうか?この値はselect
が呼ばれる前にしか更新されないため、select
が入力されてからsend
やtake
が呼び出されるまでに時間がかかると、上の例のように値が古くなってしまいます。しかしtake
を2回連続して呼び出せば「フレッシュな」値が得られることが保証されるので、2回目のtake
で値が更新されたことがわかります。
この構造を土台として、2つのRactor間での通信を改善するさまざまな戦略を実装できます。たとえば元のコード例を以下のように見直すと、送信側が受信側より決して「先走りすぎない」ようにできます。
receiver_ractor = Ractor.new do
processing_queue = Queue.new
Thread.new do
loop do
message = processing_queue.pop
sleep(1)
puts "Processed #{message}"
end
end
loop do
queue_size = processing_queue.size
sender, message = Ractor.select(Ractor.current, yield_value: queue_size)
if sender != :yield
processing_queue << message
puts "Added message to queue: #{message}"
else
puts "Sent queue status: #{queue_size}"
end
end
end
counter = 0
while true
counter += 1
receiver_ractor.send(counter)
if counter % 10 == 0
receiver_ractor.take # ステータスを強制リフレッシュ
queue_size = receiver_ractor.take
if queue_size > 5
puts "Ractor is falling behind (#{queue_size} elements unprocessed); sleeping for a while"
sleep(1) while (receiver_ractor.take && receiver_ractor.take > 1)
end
end
end
実行結果は以下のような感じになります。
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Added message to queue: 4
Added message to queue: 5
Added message to queue: 6
Added message to queue: 7
Added message to queue: 8
Added message to queue: 9
Added message to queue: 10
Sent queue status: 10
Sent queue status: 9
Ractor is falling behind (9 elements unprocessed); sleeping for a while
Sent queue status: 9
Sent queue status: 9
Processed 1
Sent queue status: 9
Sent queue status: 8
Processed 2
Sent queue status: 8
Sent queue status: 7
Processed 3
Sent queue status: 7
Sent queue status: 6
Processed 4
Sent queue status: 6
Sent queue status: 5
Processed 5
Sent queue status: 5
Sent queue status: 4
Processed 6
Sent queue status: 4
Sent queue status: 3
Processed 7
Sent queue status: 3
Sent queue status: 2
Processed 8
Sent queue status: 2
Sent queue status: 1
Added message to queue: 11
Added message to queue: 12
Added message to queue: 13
Added message to queue: 14
Added message to queue: 15
Added message to queue: 16
Added message to queue: 17
Added message to queue: 18
Added message to queue: 19
Added message to queue: 20
Sent queue status: 11
Sent queue status: 11
Ractor is falling behind (11 elements unprocessed); sleeping for a while
sleep
は非常に安直なソリューションですが、ちゃんと動いています。他にも、同期通信に切り替えたり、別のRactorやコードパスに作業を振り分けたりすることも考えられます。
以上、Ractorで初めて行った実験でした。
お気づきの点はこちらへ: ivo@元記事サイトのドメイン
、または twitter
当ブログに関心のある方へ: 新着記事をメール通知で受け取る
概要
原著者の許諾を得て翻訳・公開いたします。