Ruby: Rackの仕組みとWebSocketやSSEとの組み合わせを詳しく理解する(翻訳)
Rackは、現在のあらゆる著名なRuby製Webフレームワークの基盤であり、RubyアプリケーションとWebサーバーの間のインターフェイスを標準化します。
このメカニズムによって、Rack準拠のWebサーバー(Puma、Unicorn、Falconなど)とRack準拠のWebフレームワーク(Rails、Sinatra、Roda、Hanamiなど)を組み合わせ可能になります。
このように関心を分離すると、組み合わせを大幅に柔軟にできるため、非常に強力です。
ただし制約もあります。Rack 2では、あらゆるリクエストに対してレスポンスを返すことと、コネクションをクローズすることが前提となっていました。しかしWebSocketなどを経由可能にする永続的なコネクションを提供する機能はありませんでした。このため、WebSocketなどの永続的コネクションを実装するには、Rackからのコネクションを引き継ぐ脱出口的なハックが必要でした。
Rack 3では、これがすべて変更されます。
🔗 Rackの基本
しくみを詳しく見ていく前に、Rack自体をもう少しおさらいしておきましょう。
🔗 素のRackアプリ
基本的なRackアプリは以下のようになります。
class App
def call(env)
[200, { "Content-Type" => "text/plain" }, ["Hello World"]]
end
end
run App.new
上のenv
は、HTTPヘッダーなどのリクエスト固有の情報を含むハッシュです。リクエストが行われると、call
メソッドが呼び出され、レスポンスを表す配列が返されます。
- 配列の第1要素はHTTPレスポンスコードで、この場合は
200
です。 - 配列の第2の要素は、送信するRackおよびHTTPレスポンスヘッダーを含むハッシュです。
- 配列の最後の要素は、レスポンスのbodyを表す文字列の配列です。
このアプリをフォルダ内に配置して実行してみましょう。
$ mkdir rack-demo
$ cd rack-demo
$ bundle init
$ bundle add rack rackup webrick
$ touch app.rb
$ touch config.ru
訳注
原文では上の4行目はbundle add rack rackup
となっていましたが、webrick gemはデフォルトではインストールされないため、bundle add rack rackup webrick
に修正しました。
app.rb
ファイルに以下を書きます。
class App
def call(env)
[200, { "content-type" => "text/plain" }, ["Hello World"]]
end
end
config.ru
には以下を書きます。
require_relative "app"
run App.new
以下のコマンドを実行すると、このアプリをデフォルトのWEBrickサーバーで実行できます。
$ bundle exec rackup
サーバーは9292
ポートで実行されます。以下のcurl
コマンドで動作を確認できます。
$ curl localhost:9292
Hello World
これで基本的なアプリが動くようになりました!WEBrickは開発専用のWebサーバーなので、Pumaに差し替えましょう。
🔗 Webサーバーを差し替える
$ bundle add puma
上を実行してから、rackup
を再度実行してみましょう。
バンドルされたPumaが自動的に検出され、WEBrickの代わりにPumaが起動します。
$ bundle exec rackup
Puma starting in single mode...
* Puma version: 6.4.2 (ruby 3.2.2-p53) ("The Eagle of Durango")
* Min threads: 0
* Max threads: 5
* Environment: development
* PID: 45877
* Listening on http://127.0.0.1:9292
* Listening on http://[::1]:9292
Use Ctrl-C to stop
puma
コマンドには必要に応じて以下のように設定引数を渡せるので、rackup
よりもpuma
コマンドで直接起動することをおすすめします。
$ bundle exec puma -w 4
[45968] Puma starting in cluster mode...
[45968] * Puma version: 6.4.2 (ruby 3.2.2-p53) ("The Eagle of Durango")
[45968] * Min threads: 0
[45968] * Max threads: 5
[45968] * Environment: development
[45968] * Master PID: 45968
[45968] * Workers: 4
[45968] * Restarts: (✔) hot (✔) phased
[45968] * Listening on http://0.0.0.0:9292
[45968] Use Ctrl-C to stop
[45968] - Worker 0 (PID: 45981) booted in 0.0s, phase: 0
[45968] - Worker 1 (PID: 45982) booted in 0.0s, phase: 0
[45968] - Worker 2 (PID: 45983) booted in 0.0s, phase: 0
[45968] - Worker 3 (PID: 45984) booted in 0.0s, phase: 0
この基本的なアプリは、Rackインターフェイスをデモンストレーションします。
受信したHTTPリクエストは解析されてenv
ハッシュとしてアプリケーションに提供されます。アプリケーションはこのリクエストを処理し、サーバーがクライアントに送信する形にフォーマットした配列をレスポンスとして返します。
🔗 フレームワークをRackに準拠させる
Rack準拠のWebフレームワークは、すべて内部的にRackの仕様に準拠しており、このレベルまで下がれるアクセスポイントを提供しています。
Railsの場合、以下のようにコントローラでRackレスポンスを直接送信できます。
class HomeController
def index
self.response = [200, {}, ["I'm Home!"]]
end
end
Rodaでも同様のことができます。
route do |r|
r.on "home" do
r.halt [200, {}, ["I'm Home!"]]
end
end
これを実現するための構文はRack準拠のフレームワークごとに少しずつ異なりますが、内部でRackレスポンスを送信しているのはどれも同じであるため、開発者がそれらのレスポンスにアクセスするためのAPIが用意されています。
Rackの完全な技術仕様については以下のGitHubリポジトリ上で公開されているので、比較的手軽にアクセスできます。
参考: rack/SPEC.rdoc at main · rack/rack
このデモで示したように、Rackは「リクエストを受け取る」「Webアプリケーションで処理する」「レスポンスを返す」という前提で動作します。ここに永続的コネクションを投入すると、このモデルは完全に崩壊してしまいますが、にもかかわらずRailsなどのRack準拠フレームワークの中にはWebSocketを実装しているものもあります。
🔗 ソケットのハイジャック
ここまでは、可能な限り最も基本的なRackアプリをセットアップすることで、リクエストを処理してレスポンスを返す方法を学びました。
今度は、Rackのコネクションを引き継ぐことで、WebSocketなどを経由して永続的なコネクションを維持できるようにする方法を学びます。
まず、実際のHTTPコネクションがどのようなしくみになっているかを見てみましょう。
上図に示したように、まずTCPソケットをオープンし、次にリクエストをサーバーに送信します。
サーバーはレスポンスを返し、コネクションをクローズします。
すべての通信は平文テキストで行われます。
ここで「ソケットハイジャック(socket hijacking)」という手法を用いることで、リクエストを受け取ったときにRackからソケットを制御できるようになります。Rackは、ソケットハイジャックを行うために以下の2つの手法を提供しています。
- 部分ハイジャック(partial hijack)
RackはHTTPレスポンスヘッダを送信してから、コネクションをアプリケーションに渡します。 -
完全ハイジャック(full hijack)
Rackはソケットに何も書き込まず、単にコネクションをクライアントに渡します。
🔗 部分ハイジャック
部分ハイジャックは以下のように行います。
class App
def call(env)
body = proc do |stream|
5.times do
stream.write "#{Time.now}\n\n"
sleep 1
end
ensure
stream.close
end
[200, { "content-type" => "text/plain", "rack.hijack" => body }, []]
end
end
上のRackアプリを実行してから以下のようにcurl
でアクセスすると、1秒おきに時刻が書き込まれることがわかります。
$ curl -i localhost:9292
🔗 完全ハイジャック
完全ハイジャックは以下のように行います。
class App
def call(env)
headers = [
"HTTP/1.1 200 OK",
"Content-Type: text/plain"
]
stream = env["rack.hijack"].call
stream.write(headers.map { |header| header + "\r\n" }.join)
stream.write("\r\n")
stream.flush
begin
5.times do
stream.write "#{Time.now}\n\n"
sleep 1
end
ensure
stream.close
end
[-1, {}, []]
end
end
ただし、このコードは悪例につき、こういう書き方をしてはいけません。繰り返しますが、自分がやっていることを正しく理解できないうちは絶対やらないでください。この書き方には、落とし穴やおかしな振る舞いがあふれかえっています。
🔗 streaming body
完全ハイジャックはひどいアイデアですが、部分ハイジャックは有用なツールです。しかしそれでもハックっぽさがつきまとうため、Rack 3の仕様では「streaming body」という概念を導入する形で、部分ハイジャックを正式に採用しました。
class App
def call(env)
body = proc do |stream|
5.times do
stream.write "#{Time.now}\n\n"
sleep 1
end
ensure
stream.close
end
[200, { "content-type" => "text/plain" }, body]
end
end
ここでは、配列ではなくブロックをレスポンスのbodyとして返すようにしています。ブロックの実行が完了するまで、コネクションはオープンされたままになります。
しかし、Pumaを使う場合は大きな問題が1つあります。Pumaはマルチスレッドのサーバーであり、受信リクエストごとにスレッドを1つ割り当てるようになっています。私たちはRackからのソケットを引き継いでいますが、コネクションがオープン中である間はPumaスレッドを拘束し続けることになります。
Pumaではコンカレンシー数を設定可能ですが、スレッド数には上限があるため、スレッドを長時間拘束するのは得策ではありません。これについて実際の動作を見てみましょう。
$ bundle exec puma -w 1 -t 1:1
上とは別にターミナルウィンドウを2つ開いて、それぞれのウィンドウで以下のコマンドを同時に実行します。
$ curl localhost:9292
リクエストの一方はただちに処理されますが、他方のリクエストは最初のリクエストの処理が完了するまで待たされます。ここではPumaをシングルワーカーかつシングルスレッドで起動したため、1度に1件のリクエストしか処理できなくなっています。
これは、以下のように独自のスレッドを作成することで回避可能です。
class App
def call(env)
body = proc do |stream|
Thread.new do
5.times do
stream.write "#{Time.now}\n\n"
sleep 1
end
ensure
stream.close
end
end
[200, { "content-type" => "text/plain" }, body]
end
end
上の実験を繰り返すと、今度はPumaスレッドを占有していないため、curl
リクエストがコンカレントに処理されていることがわかります。
繰り返しますが、自分がやっていることを正しく理解していないうちは、この方法は避けること。システムプログラミングは奥の深い複雑なトピックです。ここで行っているデモは主に学術的な目的のためであり、実用目的ではありません。
🔗 Falcon Webサーバーの場合
スレッドの問題はPuma Webサーバー固有の問題なので、今度は別のオプションとしてFalconを見ていきましょう。
Falconはasync
gem上に構築された、高度にコンカレントなRack準拠Webサーバーであり、スレッドの代わりにRubyのFiber(作成コストが低く、オーバーヘッドがはるかに小さい)を使っています。
async
gemは、RubyのIOや待機操作(sleep
など)にフックをかけて、それらをさまざまなFiber間で切り替えることで、プログラムが何もせずに立ち止まることのないようにします。
アプリを、スレッドを生成しない元のバージョンに戻します。
class App
def call(env)
body = proc do |stream|
5.times do
stream.write "#{Time.now}\n\n"
sleep 1
end
ensure
stream.close
end
[200, { "content-type" => "text/plain" }, body]
end
end
次にPumaを削除してFalconをインストールします。
$ bundle remove puma
$ bundle add falcon
以下を実行してFalconサーバーを起動します。
Falconはデフォルトではhttps
トラフィックだけを配信するので、ここでは明示的にバインドを指定する必要があります。
$ bundle exec falcon serve -n 1 -b http://localhost:9292
このサーバーは単一のスレッドだけを利用します。これは以下のコマンドで確認できます(<pid>
はFalconのログから拾ったpidに置き換えてください)。
$ top -pid <pid> -stats pid,th
MRIは内部でスレッドを利用するため、上のコマンドで出力されるスレッド数(#th
)は2
になります。
先ほどの実験を繰り返してみましょう。ターミナルウィンドウを2つ開いて、それぞれで以下のcurl
リクエストを同時に行います。
$ curl localhost:9292
RubyのFiberのおかげで、2つのリクエストが同時に処理されることがわかります。
Falconは比較的新しいWebサーバーです。RubyにFiberが最初に導入されたのは3.0のときでした1。FalconはRack準拠なので、Railsでも利用可能ですが、FalconのドキュメントではRails 7.1以上での利用を推奨しています。
そういうわけで、Falconをproduction環境で使うには少々リスクがありますが、この発展はRuby世界においてとてもエキサイティングだと思います。Falconの今後数年の進歩が楽しみです。
ここまでは、Rackで永続的コネクションを作成する方法と、他のリクエストをブロックせずに実行する方法を学びました。しかしここまで見てきたユースケースは実用的ではなく、しかもわざとらしいものでした。今度は、この手法を実用に供する方法について見ていきましょう。
🔗 SSEとWebSocket
Webには、永続的なコネクション上で通信するための正式な仕様が2つあります。
WebSocketは非常に広く用いられていて人気も高いのですが、SSEはそこまで知られていないので、まずはこれについて詳しく見ていきましょう。
🔗 Server Sent Events(SSE)
SSEを使うと、クライアントでサーバーとのコネクションをオープンしたままにできますが、クライアントにメッセージをパブリッシュできるのはサーバーだけです。SSEは双方向プロトコルでは「ありません」。
SSEはJavaScript APIなので、必要なスクリプトを含むHTMLページを配信するようにアプリを変更してみましょう。
class App
def call(env)
req = Rack::Request.new(env)
path = req.path_info
case path
when "/"
sse_js(env)
end
end
private
def sse_js(env)
body = <<~HTML
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>SSE - Demo</title>
<script type="text/javascript">
const eventSource = new EventSource("/sse")
eventSource.addEventListener("message", event => {
document.body.insertAdjacentHTML(
"beforeend",
`<p>${event.data}</p>`
)
})
</script>
</head>
<body>
</body>
</html>
HTML
[200, { "content-type" => "text/html" }, [body]]
end
end
このAPIはEventSource
クラスにカプセル化されており、サーバーからの新しいメッセージは、私たちがリッスンしているイベントをトリガーします。
次に、イベントを送信するためのエンドポイントを構築する必要があります。
class App
def call(env)
req = Rack::Request.new(env)
path = req.path_info
case path
when "/"
sse_js(env)
when "/sse"
sse(env)
end
end
private
def sse_js(env)
# (略)
end
def sse(env)
body = proc do |stream|
Thread.new do
5.times do
stream.write "data: #{Time.now}!\n\n"
sleep 1
end
ensure
stream.close
end
end
[200, { "content-type" => "text/event-stream" }, body]
end
end
サーバーの立場から見ると、これは前述のstreaming bodyとかなり似ています。ここで注目して欲しいのは、クライアントに返送されるcontent-type
ヘッダーと文字列の形式です。
サーバーを実行します(以下のように明示的にPumaに戻すこと)。
$ bundle add puma
$ bundle exec puma
訳注
上のbundle add puma
は原文にありませんが追加しました。
Webブラウザでlocalhost:9292
を開くと、1秒おきに現在時刻が5回書き込まれます。
この手法は、サーバーから更新をクライアントに通知したいだけの場合に有用です。上の例ではループで回しているだけなのでかなり不自然ですが、現実のアプリケーションでの用例を見てみましょう。
🔗 RubyのQueue
Rubyは、スレッド間通信用のQueue
というデータ構造を提供しています。これを使ってデータをクライアントに「パブリッシュ」できます。
今度も現在時刻を5回パブリッシュするという例を使いますが、今回はバックグラウンドのスレッドからパブリッシュします。
class App
def call(env)
# (略)
end
private
def sse_js(env)
# (略)
end
def sse(env)
queue = Queue.new
trigger_background_loop(queue)
body = proc do |stream|
Thread.new do
loop do
data = queue.pop
stream.write "data: #{data}!\n\n"
end
ensure
stream.close
end
end
[200, { "content-type" => "text/event-stream" }, body]
end
def trigger_background_loop(queue)
Thread.new do
5.times do
queue.push(Time.now)
sleep 1
end
end
end
end
上の例では、別のバックグラウンドスレッドを生成して、現在の時刻を1秒おきにキューにプッシュします。SSEスレッドで呼び出されるqueue.pop
は、キューに何かが追加されるまでブロックします。
この手法を使うと、Redisなどのpub/subシステムを利用する形で、バックグラウンドスレッドからqueue
にデータを追加し、それをクライアントにパブリッシュできるようになります。
次は、WebSocketを見てみましょう。
🔗 WebSocket
WebSocketは、バイナリ形式によるクライアント・サーバー間の双方向通信用プロトコルです。WebSocketは現代のWebで広く使われており、RailsのAction CableフレームワークもWebSocketを基盤としています。
WebSocketの作成にはHTTPコネクションが使われますが、WebSocketはHTTPから完全に独立したプロトコルです。
WebSocketコネクションを作成するには、以下のヘッダーを含むHTTPリクエストを行う必要があります。
Connection: Upgrade
Upgrade: websocket
これに対してサーバーは、Switching Protocols
を意味する101
レスポンスを返します。HTTPリクエストで使われたのと同じTCPコネクションが、WebSocketコネクションに昇格します。
バイナリプロトコルであるWebSocketはかなり込み入っているので、本記事ではWebSocketの細かな点には立ち入りません。ご興味がおありの方は、Starr Horneによる以下の記事もどうぞ。
参考: Building a simple websockets server from scratch in Ruby - Honeybadger Developer Blog
次は、TCPソケットがWebSocketコネクションに昇格する方法を見てみましょう。
🔗 HTTPをWebSocketに昇格する
既に説明したように、通信が成立するためには、まず101
レスポンスを返してから、WebSocketのバイナリプロトコルを用いてソケットに書き込む必要があります。
require 'digest/sha1'
class App
def call(env)
req = Rack::Request.new(env)
key = req.get_header("HTTP_SEC_WEBSOCKET_KEY")
response_key = Digest::SHA1.base64digest([key, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"].join)
body = proc do |stream|
response = "Hello world!"
output = [0b10000001, response.size, response]
stream.write output.pack("CCA#{ response.size }")
ensure
stream.close
end
[101, { "Upgrade" => "websocket", "Connection" => 'upgrade', "Sec-WebSocket-Accept" => response_key }, body]
end
end
セキュアなコネクションを作成するには、レスポンスキー(response key)を作成する必要があります。レスポンスキーの生成に使われるUUIDは、仕様に記載されているグローバルな定数です。
WebSocketコネクションに書き込む文字列のバイナリ形式については、ここでは解説しませんが、Starr Horneによる以下の記事で一通り解説されていますので、ご興味がおありの方はどうぞ。
参考: Building a simple websockets server from scratch in Ruby - Honeybadger Developer Blog
🔗 デモ
サーバーを実行します。
$ bundle exec puma
コネクションを作成するには、WebSocketクライアントを使うのが最も手軽です。websocat
がおすすめです。
$ websocat ws://127.0.0.1:9292/
上のwebsocat
を実行すると、Hello world!
が出力されます。これでコネクションがアクティブになりました。理論上はこのソケットでメッセージを書き込んだり受け取ったりできますが、今はサーバーでメッセージの受信やパブリッシュ機能を何も実装していないので、実用的な意味では動いていません。
🔗 まとめ
Rackについてのくわしい解説は以上でおしまいです。
Rackとは何か、および基本的なRackアプリのセットアップ方法を学びました。次に、ソケットハイジャックによる永続的コネクションを維持する方法を学びました。
最後に、永続的コネクション上で通信するために、Webプラットフォームで提供されるSSE(Server Sent Events)とWebSocketという2つの仕様を利用しました。
- SSE: サーバーからクライアントにデータを送信する単方向プロトコルで、HTTPと同様にプレーンテキストを用います。
- WebSocket: サーバーとクライアントの間で通信する双方向プロトコルであり、バイナリ形式を用います。
Pumaのようなスレッド化されたWebサーバーを使う場合、永続的コネクションで困難が伴うことを常に覚えておきましょう。永続的コネクションはスレッドを拘束し、重大なパフォーマンスの問題を引き起こす可能性があります(スレッドメカニズムを独自に実装しない限り解決は難しいでしょう)。
本記事は最初にAppSignal Blogの記事として公開されました。
関連記事
- 訳注: Ruby 3.0で導入されたのはFiber Schedulerであり、Fiberはもっと前からあります。 ↩
概要
元サイトの許諾を得て翻訳・公開いたします。
日本語タイトルは内容に即したものにしました。