turbo-railsを使えば、リアルタイムチャットを数行で実現できます。例えば以下のようなコードを実装することで、サーバー側でチャットメッセージを保存するたびに画面を開いている全クライアントに対してidが"chat"の要素にメッセージが追加されます。
<%= turbo_stream_from chat_room, channel: ChatChannel %>
<div id="chat">
<%= render "chat", message: message %>
</div>
class ChatMessage < ApplicationRecord
belongs_to :chat_room
after_create_commit :broadcast
private
def broadcast
broadcast_append_later_to(chat_room, target: "chat", partial: "chat", locals: {message: self})
end
end
この記事ではturbo-railsがどのようにリアルタイム機能を実現しているのかをソースコードを通して調べてみることにしました。
調査で使用したライブラリのバージョンは以下です。
- Rails 7.1.3.4
- turbo-rails 2.0.5
2つのメソッドを通してリアルタイム機能を実現している
先ほどのコード例だとturbo-railsの以下2つのメソッドを実行するだけでリアルタイム機能を実現できます。
Turbo::StreamsHelper#turbo_stream_from
をViewで呼び出す。Turbo::Broadcastable#broadcast_append_later_to
をサーバ側で実行する。
※ ここでは例としてbroadcast_append_later_to
を用いますが、他のbroadcast_*
系メソッドでも説明の流れは同様です。
内部ではどのような処理が行われているのか
先に調べた結果を記載すると以下のようなことが行われているようです。
※ ここではRailsアプリケーションの一部としてActionCableを実行しアダプタにRedisを使用することを想定しています。
- WebSocket接続開始
- チャネルのサブスクライブ
- RedisのSUBSCRIBEコマンドを実行
- クライアントがメッセージを送信
- RedisのPUBLISHコマンドを実行
- 指定のチャネルをサブスクライブしているActionCableへメッセージ送信
- クライアントへメッセージ送信
Railsガイドにも記載されている通り、ActionCableではPub/Subアプローチというものを用いています。Redisの場合は、Redis Pub/Sub が使用されています。
図の流れに従えば、ActionCableサーバーはRedisに対してSUBSCRIBEコマンドを実行して指定のチャネルをサブスクライブします。その後、RailsサーバーからPUBLISHコマンドで対象のチャネルに対してメッセージをRedisに送信すると、そのチャネルをサブスクライブしている全ActionCableサーバーがそのメッセージを受け取ることができます。
これにより、Railsサーバーが複数台あったとしてもいずれかのサーバーで受け取ったメッセージがRedisを通してそれぞれのサーバーに送信され、対象の全クライアントへメッセージを送信することができます。
それぞれソースコードを通して確認してみます。
Turbo::StreamsHelper#turbo_stream_from
クライアント側の処理
def turbo_stream_from(*streamables, **attributes)
attributes[:channel] = attributes[:channel]&.to_s || "Turbo::StreamsChannel"
attributes[:"signed-stream-name"] = Turbo::StreamsChannel.signed_stream_name(streamables)
tag.turbo_cable_stream_source(**attributes)
end
このコードをViewで呼ぶことで、以下のような HTML が出力されます。
<turbo-cable-stream-source channel="Turbo::StreamsChannel" signed-stream-name="IloybGtPaTh2WVhCd0wxUnlZVzV6Y0c5eWRGSmxjWFZsYzNRdk16QSI=--48c92057aa2266c00f7c84509b07exxx"></turbo-cable-stream-source>
このコードが出力されることで/cable
(デフォルト設定の場合のActionCableサーバーの接続先)へのWebSocket接続を確立しその後チャネルをサブスクライブします。
実際のコードを見てみます。turbo-railsはturbo-rails/app/assets/javascripts/turbo.js at ab9c87e18b8a600751297200364290635b3f88fe · hotwired/turbo-railsでカスタム要素を定義しています。そのため<turbo-cable-stream-source>
要素がHTMLに追加されることでTurboCableStreamSourceElement
の connectedCallback
内のsubscribeTo が呼び出されます。その処理によりWebSocket接続の確立とチャネルのサブスクライブの2つの処理を実行します。
サーバー側の処理
クライアントから/cable
への接続が開始されるとRailsガイド にも記載されている通り、ApplicationCable::Connection#connect
が実行されます。
# app/channels/application_cable/connection.rb
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :current_user
def connect
self.current_user = find_verified_user
end
private
def find_verified_user
if verified_user = User.find_by(id: cookies.encrypted[:user_id])
verified_user
else
reject_unauthorized_connection
end
end
end
end
ここでは、接続クライアントの識別を行いサーバー側で保持します。
その後、以下のコードが実行されstream_name
に対しサブスクライブします。
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from stream_name
end
end
#stream_fromの処理
実際のサブスクライブの処理はこのメソッドで行われています。
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
# Build a stream handler by wrapping the user-provided callback with
# a decoder or defaulting to a JSON-decoding retransmitter.
handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams[broadcasting] = handler
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
end
ここで行われているのは主に以下の2つの処理です。
- ActionCable → RedisへのSUBSCRIBEコマンドの実行(図の3の処理)
- Redis → ActionCableにメッセージが届いた際に実行するメソッドの設定(図の7で実行されるメソッドの設定)
ActionCable → RedisへのSUBSCRIBEコマンドの実行
pubsub.subscribe(broadcasting, handler, lambda do
ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
ここでRedisに対しSUBSCRIBEコマンドを実行しています。
Redis → ActionCableにメッセージが届いた際に実行するメソッドの設定
#stream_from
処理内でRedisからActionCableへメッセージが送信された際に実行する処理を仕込んでいます。つまりActionCableからクライアントへメッセージを送信する処理を設定しています。
具体的にはこのhandlerをサブスクライブする際に引数として渡しています。handlerはProcでクライアントへメッセージを送信する処理が組み込まれています。これにより、Redis → ActionCableにメッセージが届いた際にクライアントへそのメッセージが送信されます。
ここまでで、Turbo::StreamsHelper#turbo_stream_from
をViewで呼び出すだけで図の1から3までの処理が行われていることがわかりました。
続いて、クライアントがチャットメッセージを送信した際の処理を見ていきます。
Turbo::Broadcastable#broadcast_append_later_to
サーバ側がチャットメッセージを受け取ると、turbo-railsのbroadcast_*
メソッドを実行することでクライアントへメッセージを送信することができます。broadcast_*
メソッドで行われるのはRedisのPUBLISHコマンドの実行です。図で言うと5の処理が行われます。
broadcast_*
メソッドでは、broadcast_stream_toが呼ばれます。そのメソッド内でActionCable.server.broadcast によって、PUBLISHコマンドが実行されます。
その後、Redis Pub/Subによってサブスクライブしている全AcitonCableにメッセージが届き、サブスクライブ時に設定したメソッドが呼ばれることでクライアントはリアルタイムでチャットメッセージを受け取ることができます。
終わりに
turbo-railsのTurbo::StreamsHelper#turbo_stream_from
と Turbo::Broadcastable#broadcast_append_later_to
の2つのメソッドでなぜリアルタイム機能を実現できるのかをソースコードを通して確認することができました。