Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Ruby: Ractorによる安全な非同期通信の実験(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

Ruby: Ractorによる安全な非同期通信の実験(翻訳)

Ractorとは、アクターモデルからヒントを得たRuby 3.0の新しいコンカレンシー抽象化です。

ある情報を他の場所に送信する処理をRactor目線で見ると、通信を以下のいずれかに分類できます。

非同期(ノンブロッキング)
RactorはRactor#sendで他のRactorに情報を送信できます。その情報は送信先のRactorからRactor.receiveで読み取り可能な無限のキューに入れられます。
同期(ブロッキング)
RactorはRactor.yieldを使って、他のRactorがRactor#takeを呼び出すまでブロックできます。

非同期の場合を考えてみましょう。他のRactorに情報を送りたいが、処理が終わるまでブロックしたくないとします。受信側のRactorの処理速度が遅すぎる場合はどうなるでしょうか?

receiver_ractor = Ractor.new do
  loop do
    message = Ractor.receive
    sleep 1
    puts "Processed #{message}"
  end
end

counter = 0
while true
  counter += 1
  receiver_ractor.send(counter)
end

ractor unbounded memory

案の定、受信側が送信側に追いつけなくなるとメモリ使用量が増加し続け、最終的にシステムメモリが枯渇してアプリケーションがクラッシュします。

RactorのAPIを見てみると、受信側が遅れているかどうかを送信者が確認する方法が組み込まれていないので、次のような方法を考えてみました。

receiver_ractor = Ractor.new do
  processing_queue = Queue.new

  Thread.new do
    sleep(1) # このスレッドで遅い開始をシミュレート

    loop do
      message = processing_queue.pop
      puts "Processed from queue: #{message}"
    end
  end

  loop do
    queue_size = processing_queue.size
    sender, message = Ractor.select(Ractor.current, yield_value: queue_size)

    if sender != :yield
      processing_queue << message
      puts "Added message to queue: #{message}"
    else
      puts "Sent queue status: #{queue_size}"
    end
  end
end

receiver_ractor.send(1)
receiver_ractor.send(2)
receiver_ractor.send(3)
puts "Finished submissions"

sleep(0.5)

receiver_ractor.take # ステータスを強制リフレッシュ
puts "Receiver queue length: #{receiver_ractor.take}"

sleep(1)

receiver_ractor.take # ステータスを強制リフレッシュ
puts "Receiver queue length: #{receiver_ractor.take}"

実行結果は以下のようになります。

<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Finished submissions
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Sent queue status: 3
Sent queue status: 3
Receiver queue length: 3
Processed from queue: 1
Processed from queue: 2
Processed from queue: 3
Sent queue status: 3
Sent queue status: 0
Receiver queue length: 0

コードは以下のように進行します。

  1. メインのRactorは、1, 2, 3を含む3つのメッセージをreceiver_ractorに送信する

  2. メインのRactorがsleepする

  3. 受信側のRactorが起動して3つのメッセージを読み取り、processing_queueにリダイレクトする

  4. メインのRactorが動き出してqueue_sizeを強制リフレッシュする(詳しくは後述)

  5. メインのRactorがtakeを呼び出し、queue_sizeが3であることを検出する

  6. メインのRactorがsleepする

  7. 受信側のRactorの2つ目のスレッドが起動して 1, 2, 3 の3つのメッセージを処理する

  8. メインのRactorが起動して、再度queue_sizeを強制リフレッシュする

  9. メインのRactorが、クエリが空であることを検出する

受信側Ractorの内部では、この戦略は次のように機能します。現在2つのスレッドがあり、スレッドのひとつは Ractor.selectを用いて2つのことを同時に行います。処理のために新しいアイテムを受け取り、それらを通常のスレッドセーフなキューに入れるか、あるいはキューの現在のサイズを返します。2番目のスレッドは、スレッドセーフなキューから受け取るものを単に処理します。

これで送信側はsendでアイテムを送信したり、takeを2回呼び出してキューのサイズを取得したりできるようになりました。2回呼び出す理由がおわかりでしょうか?この値はselectが呼ばれる前にしか更新されないため、selectが入力されてからsendtakeが呼び出されるまでに時間がかかると、上の例のように値が古くなってしまいます。しかしtakeを2回連続して呼び出せば「フレッシュな」値が得られることが保証されるので、2回目のtakeで値が更新されたことがわかります。

この構造を土台として、2つのRactor間での通信を改善するさまざまな戦略を実装できます。たとえば元のコード例を以下のように見直すと、送信側が受信側より決して「先走りすぎない」ようにできます。

receiver_ractor = Ractor.new do
  processing_queue = Queue.new

  Thread.new do
    loop do
      message = processing_queue.pop
      sleep(1)
      puts "Processed #{message}"
    end
  end

  loop do
    queue_size = processing_queue.size
    sender, message = Ractor.select(Ractor.current, yield_value: queue_size)

    if sender != :yield
      processing_queue << message
      puts "Added message to queue: #{message}"
    else
      puts "Sent queue status: #{queue_size}"
    end
  end
end

counter = 0
while true
  counter += 1
  receiver_ractor.send(counter)

  if counter % 10 == 0
    receiver_ractor.take # ステータスを強制リフレッシュ
    queue_size = receiver_ractor.take
    if queue_size > 5
      puts "Ractor is falling behind (#{queue_size} elements unprocessed); sleeping for a while"
      sleep(1) while (receiver_ractor.take && receiver_ractor.take > 1)
    end
  end
end

実行結果は以下のような感じになります。

<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Added message to queue: 4
Added message to queue: 5
Added message to queue: 6
Added message to queue: 7
Added message to queue: 8
Added message to queue: 9
Added message to queue: 10
Sent queue status: 10
Sent queue status: 9
Ractor is falling behind (9 elements unprocessed); sleeping for a while
Sent queue status: 9
Sent queue status: 9
Processed 1
Sent queue status: 9
Sent queue status: 8
Processed 2
Sent queue status: 8
Sent queue status: 7
Processed 3
Sent queue status: 7
Sent queue status: 6
Processed 4
Sent queue status: 6
Sent queue status: 5
Processed 5
Sent queue status: 5
Sent queue status: 4
Processed 6
Sent queue status: 4
Sent queue status: 3
Processed 7
Sent queue status: 3
Sent queue status: 2
Processed 8
Sent queue status: 2
Sent queue status: 1
Added message to queue: 11
Added message to queue: 12
Added message to queue: 13
Added message to queue: 14
Added message to queue: 15
Added message to queue: 16
Added message to queue: 17
Added message to queue: 18
Added message to queue: 19
Added message to queue: 20
Sent queue status: 11
Sent queue status: 11
Ractor is falling behind (11 elements unprocessed); sleeping for a while

sleepは非常に安直なソリューションですが、ちゃんと動いています。他にも、同期通信に切り替えたり、別のRactorやコードパスに作業を振り分けたりすることも考えられます。

以上、Ractorで初めて行った実験でした。

お気づきの点はこちらへivo@元記事サイトのドメイン、または twitter
当ブログに関心のある方へ新着記事をメール通知で受け取る

関連記事

Rubyのスケール時にGVLの特性を効果的に活用する(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。