Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

turbo-railsはどのようにAction Cableを使ったリアルタイム機能を実現しているのか

hotwired/turbo-rails - GitHub

turbo-railsを使えば、リアルタイムチャットを数行で実現できます。例えば以下のようなコードを実装することで、サーバー側でチャットメッセージを保存するたびに画面を開いている全クライアントに対してidが"chat"の要素にメッセージが追加されます。

<%= turbo_stream_from chat_room, channel: ChatChannel %>

<div id="chat">
  <%= render "chat", message: message %>
</div>
class ChatMessage < ApplicationRecord
  belongs_to :chat_room

  after_create_commit :broadcast

  private

  def broadcast
    broadcast_append_later_to(chat_room, target: "chat", partial: "chat", locals: {message: self})
  end
end

この記事ではturbo-railsがどのようにリアルタイム機能を実現しているのかをソースコードを通して調べてみることにしました。

調査で使用したライブラリのバージョンは以下です。

  • Rails 7.1.3.4
  • turbo-rails 2.0.5

2つのメソッドを通してリアルタイム機能を実現している

先ほどのコード例だとturbo-railsの以下2つのメソッドを実行するだけでリアルタイム機能を実現できます。

  • Turbo::StreamsHelper#turbo_stream_fromをViewで呼び出す。
  • Turbo::Broadcastable#broadcast_append_later_toをサーバ側で実行する。

※ ここでは例としてbroadcast_append_later_toを用いますが、他のbroadcast_*系メソッドでも説明の流れは同様です。

内部ではどのような処理が行われているのか

先に調べた結果を記載すると以下のようなことが行われているようです。

※ ここではRailsアプリケーションの一部としてActionCableを実行しアダプタにRedisを使用することを想定しています。

  1. WebSocket接続開始
  2. チャネルのサブスクライブ
  3. RedisのSUBSCRIBEコマンドを実行
  4. クライアントがメッセージを送信
  5. RedisのPUBLISHコマンドを実行
  6. 指定のチャネルをサブスクライブしているActionCableへメッセージ送信
  7. クライアントへメッセージ送信

Railsガイドにも記載されている通り、ActionCableではPub/Subアプローチというものを用いています。Redisの場合は、Redis Pub/Sub が使用されています。

図の流れに従えば、ActionCableサーバーはRedisに対してSUBSCRIBEコマンドを実行して指定のチャネルをサブスクライブします。その後、RailsサーバーからPUBLISHコマンドで対象のチャネルに対してメッセージをRedisに送信すると、そのチャネルをサブスクライブしている全ActionCableサーバーがそのメッセージを受け取ることができます。

これにより、Railsサーバーが複数台あったとしてもいずれかのサーバーで受け取ったメッセージがRedisを通してそれぞれのサーバーに送信され、対象の全クライアントへメッセージを送信することができます。

それぞれソースコードを通して確認してみます。

Turbo::StreamsHelper#turbo_stream_from

クライアント側の処理

def turbo_stream_from(*streamables, **attributes)
  attributes[:channel] = attributes[:channel]&.to_s || "Turbo::StreamsChannel"
  attributes[:"signed-stream-name"] = Turbo::StreamsChannel.signed_stream_name(streamables)

  tag.turbo_cable_stream_source(**attributes)
end

このコードをViewで呼ぶことで、以下のような HTML が出力されます。

<turbo-cable-stream-source channel="Turbo::StreamsChannel" signed-stream-name="IloybGtPaTh2WVhCd0wxUnlZVzV6Y0c5eWRGSmxjWFZsYzNRdk16QSI=--48c92057aa2266c00f7c84509b07exxx"></turbo-cable-stream-source>

このコードが出力されることで/cable(デフォルト設定の場合のActionCableサーバーの接続先)へのWebSocket接続を確立しその後チャネルをサブスクライブします。

実際のコードを見てみます。turbo-railsはturbo-rails/app/assets/javascripts/turbo.js at ab9c87e18b8a600751297200364290635b3f88fe · hotwired/turbo-railsでカスタム要素を定義しています。そのため<turbo-cable-stream-source>要素がHTMLに追加されることでTurboCableStreamSourceElementconnectedCallback内のsubscribeTo が呼び出されます。その処理によりWebSocket接続の確立とチャネルのサブスクライブの2つの処理を実行します。

サーバー側の処理

クライアントから/cableへの接続が開始されるとRailsガイド にも記載されている通り、ApplicationCable::Connection#connectが実行されます。

# app/channels/application_cable/connection.rb
module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user

    def connect
      self.current_user = find_verified_user
    end

    private
    def find_verified_user
      if verified_user = User.find_by(id: cookies.encrypted[:user_id])
        verified_user
      else
        reject_unauthorized_connection
      end
    end
  end
end

ここでは、接続クライアントの識別を行いサーバー側で保持します。
その後、以下のコードが実行されstream_nameに対しサブスクライブします。

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
  def subscribed
    stream_from stream_name
  end
end

#stream_fromの処理

実際のサブスクライブの処理はこのメソッドで行われています。

def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  # Build a stream handler by wrapping the user-provided callback with
  # a decoder or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams[broadcasting] = handler

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end

ここで行われているのは主に以下の2つの処理です。

  • ActionCable → RedisへのSUBSCRIBEコマンドの実行(図の3の処理)
  • Redis → ActionCableにメッセージが届いた際に実行するメソッドの設定(図の7で実行されるメソッドの設定)
ActionCable → RedisへのSUBSCRIBEコマンドの実行
pubsub.subscribe(broadcasting, handler, lambda do
  ensure_confirmation_sent
  logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)

ここでRedisに対しSUBSCRIBEコマンドを実行しています。

Redis → ActionCableにメッセージが届いた際に実行するメソッドの設定

#stream_from処理内でRedisからActionCableへメッセージが送信された際に実行する処理を仕込んでいます。つまりActionCableからクライアントへメッセージを送信する処理を設定しています。

具体的にはこのhandlerをサブスクライブする際に引数として渡しています。handlerはProcでクライアントへメッセージを送信する処理が組み込まれています。これにより、Redis → ActionCableにメッセージが届いた際にクライアントへそのメッセージが送信されます。

ここまでで、Turbo::StreamsHelper#turbo_stream_fromをViewで呼び出すだけで図の1から3までの処理が行われていることがわかりました。
続いて、クライアントがチャットメッセージを送信した際の処理を見ていきます。

Turbo::Broadcastable#broadcast_append_later_to

サーバ側がチャットメッセージを受け取ると、turbo-railsのbroadcast_* メソッドを実行することでクライアントへメッセージを送信することができます。broadcast_* メソッドで行われるのはRedisのPUBLISHコマンドの実行です。図で言うと5の処理が行われます。

broadcast_* メソッドでは、broadcast_stream_toが呼ばれます。そのメソッド内でActionCable.server.broadcast によって、PUBLISHコマンドが実行されます。

その後、Redis Pub/Subによってサブスクライブしている全AcitonCableにメッセージが届き、サブスクライブ時に設定したメソッドが呼ばれることでクライアントはリアルタイムでチャットメッセージを受け取ることができます。

終わりに

turbo-railsのTurbo::StreamsHelper#turbo_stream_fromTurbo::Broadcastable#broadcast_append_later_to の2つのメソッドでなぜリアルタイム機能を実現できるのかをソースコードを通して確認することができました。



CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。