Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Ruby: Rackの仕組みとWebSocketやSSEとの組み合わせを詳しく理解する(翻訳)

概要

元サイトの許諾を得て翻訳・公開いたします。

日本語タイトルは内容に即したものにしました。

rack/rack - GitHub

Ruby: Rackの仕組みとWebSocketやSSEとの組み合わせを詳しく理解する(翻訳)

Rackは、現在のあらゆる著名なRuby製Webフレームワークの基盤であり、RubyアプリケーションとWebサーバーの間のインターフェイスを標準化します。

このメカニズムによって、Rack準拠のWebサーバー(Puma、Unicorn、Falconなど)とRack準拠のWebフレームワーク(Rails、Sinatra、Roda、Hanamiなど)を組み合わせ可能になります。

このように関心を分離すると、組み合わせを大幅に柔軟にできるため、非常に強力です。

ただし制約もあります。Rack 2では、あらゆるリクエストに対してレスポンスを返すことと、コネクションをクローズすることが前提となっていました。しかしWebSocketなどを経由可能にする永続的なコネクションを提供する機能はありませんでした。このため、WebSocketなどの永続的コネクションを実装するには、Rackからのコネクションを引き継ぐ脱出口的なハックが必要でした。

Rack 3では、これがすべて変更されます。

訳注

Rack 3については以下の記事もどうぞ。

Rack 2-> Rack 3アップグレードガイド(翻訳)

🔗 Rackの基本

しくみを詳しく見ていく前に、Rack自体をもう少しおさらいしておきましょう。

🔗 素のRackアプリ

基本的なRackアプリは以下のようになります。

class App
  def call(env)
    [200, { "Content-Type" => "text/plain" }, ["Hello World"]]
  end
end

run App.new

上のenvは、HTTPヘッダーなどのリクエスト固有の情報を含むハッシュです。リクエストが行われると、callメソッドが呼び出され、レスポンスを表す配列が返されます。

  • 配列の第1要素はHTTPレスポンスコードで、この場合は200です。
  • 配列の第2の要素は、送信するRackおよびHTTPレスポンスヘッダーを含むハッシュです。
  • 配列の最後の要素は、レスポンスのbodyを表す文字列の配列です。

このアプリをフォルダ内に配置して実行してみましょう。

$ mkdir rack-demo
$ cd rack-demo
$ bundle init
$ bundle add rack rackup webrick
$ touch app.rb
$ touch config.ru

訳注

原文では上の4行目はbundle add rack rackupとなっていましたが、webrick gemはデフォルトではインストールされないため、bundle add rack rackup webrickに修正しました。

app.rbファイルに以下を書きます。

class App
  def call(env)
    [200, { "content-type" => "text/plain" }, ["Hello World"]]
  end
end

config.ruには以下を書きます。

require_relative "app"

run App.new

以下のコマンドを実行すると、このアプリをデフォルトのWEBrickサーバーで実行できます。

$ bundle exec rackup

サーバーは9292ポートで実行されます。以下のcurlコマンドで動作を確認できます。

$ curl localhost:9292
Hello World

これで基本的なアプリが動くようになりました!WEBrickは開発専用のWebサーバーなので、Pumaに差し替えましょう。

🔗 Webサーバーを差し替える

$ bundle add puma

上を実行してから、rackupを再度実行してみましょう。
バンドルされたPumaが自動的に検出され、WEBrickの代わりにPumaが起動します。

$ bundle exec rackup
Puma starting in single mode...
* Puma version: 6.4.2 (ruby 3.2.2-p53) ("The Eagle of Durango")
*  Min threads: 0
*  Max threads: 5
*  Environment: development
*          PID: 45877
* Listening on http://127.0.0.1:9292
* Listening on http://[::1]:9292
Use Ctrl-C to stop

pumaコマンドには必要に応じて以下のように設定引数を渡せるので、rackupよりもpumaコマンドで直接起動することをおすすめします。

$ bundle exec puma -w 4
[45968] Puma starting in cluster mode...
[45968] * Puma version: 6.4.2 (ruby 3.2.2-p53) ("The Eagle of Durango")
[45968] *  Min threads: 0
[45968] *  Max threads: 5
[45968] *  Environment: development
[45968] *   Master PID: 45968
[45968] *      Workers: 4
[45968] *     Restarts: (✔) hot (✔) phased
[45968] * Listening on http://0.0.0.0:9292
[45968] Use Ctrl-C to stop
[45968] - Worker 0 (PID: 45981) booted in 0.0s, phase: 0
[45968] - Worker 1 (PID: 45982) booted in 0.0s, phase: 0
[45968] - Worker 2 (PID: 45983) booted in 0.0s, phase: 0
[45968] - Worker 3 (PID: 45984) booted in 0.0s, phase: 0

この基本的なアプリは、Rackインターフェイスをデモンストレーションします。
受信したHTTPリクエストは解析されてenvハッシュとしてアプリケーションに提供されます。アプリケーションはこのリクエストを処理し、サーバーがクライアントに送信する形にフォーマットした配列をレスポンスとして返します。

🔗 フレームワークをRackに準拠させる

Rack準拠のWebフレームワークは、すべて内部的にRackの仕様に準拠しており、このレベルまで下がれるアクセスポイントを提供しています。

Railsの場合、以下のようにコントローラでRackレスポンスを直接送信できます。

class HomeController
  def index
    self.response = [200, {}, ["I'm Home!"]]
  end
end

Rodaでも同様のことができます。

route do |r|
  r.on "home" do
    r.halt [200, {}, ["I'm Home!"]]
  end
end

これを実現するための構文はRack準拠のフレームワークごとに少しずつ異なりますが、内部でRackレスポンスを送信しているのはどれも同じであるため、開発者がそれらのレスポンスにアクセスするためのAPIが用意されています。

Rackの完全な技術仕様については以下のGitHubリポジトリ上で公開されているので、比較的手軽にアクセスできます。

参考: rack/SPEC.rdoc at main · rack/rack

このデモで示したように、Rackは「リクエストを受け取る」「Webアプリケーションで処理する」「レスポンスを返す」という前提で動作します。ここに永続的コネクションを投入すると、このモデルは完全に崩壊してしまいますが、にもかかわらずRailsなどのRack準拠フレームワークの中にはWebSocketを実装しているものもあります。

🔗 ソケットのハイジャック

ここまでは、可能な限り最も基本的なRackアプリをセットアップすることで、リクエストを処理してレスポンスを返す方法を学びました。

今度は、Rackのコネクションを引き継ぐことで、WebSocketなどを経由して永続的なコネクションを維持できるようにする方法を学びます。

まず、実際のHTTPコネクションがどのようなしくみになっているかを見てみましょう。

HTTPシーケンスダイアグラム

上図に示したように、まずTCPソケットをオープンし、次にリクエストをサーバーに送信します。
サーバーはレスポンスを返し、コネクションをクローズします。
すべての通信は平文テキストで行われます。

ここで「ソケットハイジャック(socket hijacking)」という手法を用いることで、リクエストを受け取ったときにRackからソケットを制御できるようになります。Rackは、ソケットハイジャックを行うために以下の2つの手法を提供しています。

  • 部分ハイジャック(partial hijack)
    RackはHTTPレスポンスヘッダを送信してから、コネクションをアプリケーションに渡します。

  • 完全ハイジャック(full hijack)
    Rackはソケットに何も書き込まず、単にコネクションをクライアントに渡します。

🔗 部分ハイジャック

部分ハイジャックは以下のように行います。

class App
  def call(env)
    body = proc do |stream|
      5.times do
        stream.write "#{Time.now}\n\n"
        sleep 1
      end
    ensure
      stream.close
    end

    [200, { "content-type" => "text/plain", "rack.hijack" => body }, []]
  end
end

上のRackアプリを実行してから以下のようにcurlでアクセスすると、1秒おきに時刻が書き込まれることがわかります。

$ curl -i localhost:9292

🔗 完全ハイジャック

完全ハイジャックは以下のように行います。

class App
  def call(env)
    headers = [
      "HTTP/1.1 200 OK",
      "Content-Type: text/plain"
    ]

    stream = env["rack.hijack"].call
    stream.write(headers.map { |header| header + "\r\n" }.join)
    stream.write("\r\n")
    stream.flush

    begin
      5.times do
        stream.write "#{Time.now}\n\n"
        sleep 1
      end
    ensure
      stream.close
    end

    [-1, {}, []]
  end
end

ただし、このコードは悪例につき、こういう書き方をしてはいけません。繰り返しますが、自分がやっていることを正しく理解できないうちは絶対やらないでください。この書き方には、落とし穴やおかしな振る舞いがあふれかえっています。

🔗 streaming body

完全ハイジャックはひどいアイデアですが、部分ハイジャックは有用なツールです。しかしそれでもハックっぽさがつきまとうため、Rack 3の仕様では「streaming body」という概念を導入する形で、部分ハイジャックを正式に採用しました。

class App
  def call(env)
    body = proc do |stream|
      5.times do
        stream.write "#{Time.now}\n\n"
        sleep 1
      end
    ensure
      stream.close
    end

    [200, { "content-type" => "text/plain" }, body]
  end
end

ここでは、配列ではなくブロックをレスポンスのbodyとして返すようにしています。ブロックの実行が完了するまで、コネクションはオープンされたままになります。

しかし、Pumaを使う場合は大きな問題が1つあります。Pumaはマルチスレッドのサーバーであり、受信リクエストごとにスレッドを1つ割り当てるようになっています。私たちはRackからのソケットを引き継いでいますが、コネクションがオープン中である間はPumaスレッドを拘束し続けることになります。

Pumaではコンカレンシー数を設定可能ですが、スレッド数には上限があるため、スレッドを長時間拘束するのは得策ではありません。これについて実際の動作を見てみましょう。

$ bundle exec puma -w 1 -t 1:1

上とは別にターミナルウィンドウを2つ開いて、それぞれのウィンドウで以下のコマンドを同時に実行します。

$ curl localhost:9292

リクエストの一方はただちに処理されますが、他方のリクエストは最初のリクエストの処理が完了するまで待たされます。ここではPumaをシングルワーカーかつシングルスレッドで起動したため、1度に1件のリクエストしか処理できなくなっています。

これは、以下のように独自のスレッドを作成することで回避可能です。

class App
  def call(env)
    body = proc do |stream|
      Thread.new do
        5.times do
          stream.write "#{Time.now}\n\n"
          sleep 1
        end
      ensure
        stream.close
      end
    end

    [200, { "content-type" => "text/plain" }, body]
  end
end

上の実験を繰り返すと、今度はPumaスレッドを占有していないため、curlリクエストがコンカレントに処理されていることがわかります。

繰り返しますが、自分がやっていることを正しく理解していないうちは、この方法は避けること。システムプログラミングは奥の深い複雑なトピックです。ここで行っているデモは主に学術的な目的のためであり、実用目的ではありません。

🔗 Falcon Webサーバーの場合

スレッドの問題はPuma Webサーバー固有の問題なので、今度は別のオプションとしてFalconを見ていきましょう。

socketry/falcon - GitHub

Falconはasync gem上に構築された、高度にコンカレントなRack準拠Webサーバーであり、スレッドの代わりにRubyのFiber(作成コストが低く、オーバーヘッドがはるかに小さい)を使っています。

socketry/async - GitHub

async gemは、RubyのIOや待機操作(sleepなど)にフックをかけて、それらをさまざまなFiber間で切り替えることで、プログラムが何もせずに立ち止まることのないようにします。

アプリを、スレッドを生成しない元のバージョンに戻します。

class App
  def call(env)
    body = proc do |stream|
      5.times do
        stream.write "#{Time.now}\n\n"
        sleep 1
      end
    ensure
      stream.close
    end

    [200, { "content-type" => "text/plain" }, body]
  end
end

次にPumaを削除してFalconをインストールします。

$ bundle remove puma
$ bundle add falcon

以下を実行してFalconサーバーを起動します。
Falconはデフォルトではhttpsトラフィックだけを配信するので、ここでは明示的にバインドを指定する必要があります。

$ bundle exec falcon serve -n 1 -b http://localhost:9292

このサーバーは単一のスレッドだけを利用します。これは以下のコマンドで確認できます(<pid>はFalconのログから拾ったpidに置き換えてください)。

$ top -pid <pid> -stats pid,th

MRIは内部でスレッドを利用するため、上のコマンドで出力されるスレッド数(#th)は2になります。

先ほどの実験を繰り返してみましょう。ターミナルウィンドウを2つ開いて、それぞれで以下のcurlリクエストを同時に行います。

$ curl localhost:9292

RubyのFiberのおかげで、2つのリクエストが同時に処理されることがわかります。

Falconは比較的新しいWebサーバーです。RubyにFiberが最初に導入されたのは3.0のときでした1。FalconはRack準拠なので、Railsでも利用可能ですが、FalconのドキュメントではRails 7.1以上での利用を推奨しています。
そういうわけで、Falconをproduction環境で使うには少々リスクがありますが、この発展はRuby世界においてとてもエキサイティングだと思います。Falconの今後数年の進歩が楽しみです。

ここまでは、Rackで永続的コネクションを作成する方法と、他のリクエストをブロックせずに実行する方法を学びました。しかしここまで見てきたユースケースは実用的ではなく、しかもわざとらしいものでした。今度は、この手法を実用に供する方法について見ていきましょう。

🔗 SSEとWebSocket

Webには、永続的なコネクション上で通信するための正式な仕様が2つあります。

WebSocketは非常に広く用いられていて人気も高いのですが、SSEはそこまで知られていないので、まずはこれについて詳しく見ていきましょう。

🔗 Server Sent Events(SSE)

SSEを使うと、クライアントでサーバーとのコネクションをオープンしたままにできますが、クライアントにメッセージをパブリッシュできるのはサーバーだけです。SSEは双方向プロトコルでは「ありません」。

SSEはJavaScript APIなので、必要なスクリプトを含むHTMLページを配信するようにアプリを変更してみましょう。

class App
  def call(env)
    req = Rack::Request.new(env)
    path = req.path_info

    case path
    when "/"
      sse_js(env)
    end
  end

  private

    def sse_js(env)
      body = <<~HTML
        <!DOCTYPE html>
        <html>
          <head>
            <meta charset="utf-8">
            <meta name="viewport" content="width=device-width, initial-scale=1">
            <title>SSE - Demo</title>

            <script type="text/javascript">
              const eventSource = new EventSource("/sse")
              eventSource.addEventListener("message", event => {
                document.body.insertAdjacentHTML(
                  "beforeend",
                  `<p>${event.data}</p>`
                )
              })
            </script>
          </head>
          <body>
          </body>
        </html>
      HTML

      [200, { "content-type" => "text/html" }, [body]]
    end
end

このAPIはEventSourceクラスにカプセル化されており、サーバーからの新しいメッセージは、私たちがリッスンしているイベントをトリガーします。

次に、イベントを送信するためのエンドポイントを構築する必要があります。

class App
  def call(env)
    req = Rack::Request.new(env)
    path = req.path_info

    case path
    when "/"
      sse_js(env)
    when "/sse"
      sse(env)
    end
  end

  private

    def sse_js(env)
      # (略)
    end

    def sse(env)
      body = proc do |stream|
        Thread.new do
          5.times do
            stream.write "data: #{Time.now}!\n\n"
            sleep 1
          end
        ensure
          stream.close
        end
      end

      [200, { "content-type" => "text/event-stream" }, body]
    end
end

サーバーの立場から見ると、これは前述のstreaming bodyとかなり似ています。ここで注目して欲しいのは、クライアントに返送されるcontent-typeヘッダーと文字列の形式です。

サーバーを実行します(以下のように明示的にPumaに戻すこと)。

$ bundle add puma
$ bundle exec puma

訳注

上のbundle add pumaは原文にありませんが追加しました。

Webブラウザでlocalhost:9292を開くと、1秒おきに現在時刻が5回書き込まれます。

この手法は、サーバーから更新をクライアントに通知したいだけの場合に有用です。上の例ではループで回しているだけなのでかなり不自然ですが、現実のアプリケーションでの用例を見てみましょう。

🔗 RubyのQueue

Rubyは、スレッド間通信用のQueueというデータ構造を提供しています。これを使ってデータをクライアントに「パブリッシュ」できます。

今度も現在時刻を5回パブリッシュするという例を使いますが、今回はバックグラウンドのスレッドからパブリッシュします。

class App
  def call(env)
    # (略)
  end

  private

    def sse_js(env)
      # (略)
    end

    def sse(env)
      queue = Queue.new
      trigger_background_loop(queue)

      body = proc do |stream|
        Thread.new do
          loop do
            data = queue.pop
            stream.write "data: #{data}!\n\n"
          end
        ensure
          stream.close
        end
      end

      [200, { "content-type" => "text/event-stream" }, body]
    end

    def trigger_background_loop(queue)
      Thread.new do
        5.times do
          queue.push(Time.now)
          sleep 1
        end
      end
    end
end

上の例では、別のバックグラウンドスレッドを生成して、現在の時刻を1秒おきにキューにプッシュします。SSEスレッドで呼び出されるqueue.popは、キューに何かが追加されるまでブロックします。

この手法を使うと、Redisなどのpub/subシステムを利用する形で、バックグラウンドスレッドからqueueにデータを追加し、それをクライアントにパブリッシュできるようになります。

次は、WebSocketを見てみましょう。

🔗 WebSocket

WebSocketは、バイナリ形式によるクライアント・サーバー間の双方向通信用プロトコルです。WebSocketは現代のWebで広く使われており、RailsのAction CableフレームワークもWebSocketを基盤としています。

WebSocketの作成にはHTTPコネクションが使われますが、WebSocketはHTTPから完全に独立したプロトコルです。

WebSocketコネクションを作成するには、以下のヘッダーを含むHTTPリクエストを行う必要があります。

Connection: Upgrade
Upgrade: websocket

これに対してサーバーは、Switching Protocolsを意味する101レスポンスを返します。HTTPリクエストで使われたのと同じTCPコネクションが、WebSocketコネクションに昇格します。

バイナリプロトコルであるWebSocketはかなり込み入っているので、本記事ではWebSocketの細かな点には立ち入りません。ご興味がおありの方は、Starr Horneによる以下の記事もどうぞ。

参考: Building a simple websockets server from scratch in Ruby - Honeybadger Developer Blog

次は、TCPソケットがWebSocketコネクションに昇格する方法を見てみましょう。

🔗 HTTPをWebSocketに昇格する

既に説明したように、通信が成立するためには、まず101レスポンスを返してから、WebSocketのバイナリプロトコルを用いてソケットに書き込む必要があります。

require 'digest/sha1'

class App
  def call(env)
    req = Rack::Request.new(env)

    key = req.get_header("HTTP_SEC_WEBSOCKET_KEY")
    response_key = Digest::SHA1.base64digest([key, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"].join)

    body = proc do |stream|
      response = "Hello world!"
      output = [0b10000001, response.size, response]
      stream.write output.pack("CCA#{ response.size }")
    ensure
      stream.close
    end

    [101, { "Upgrade" => "websocket", "Connection" => 'upgrade', "Sec-WebSocket-Accept" => response_key }, body]
  end
end

セキュアなコネクションを作成するには、レスポンスキー(response key)を作成する必要があります。レスポンスキーの生成に使われるUUIDは、仕様に記載されているグローバルな定数です。
WebSocketコネクションに書き込む文字列のバイナリ形式については、ここでは解説しませんが、Starr Horneによる以下の記事で一通り解説されていますので、ご興味がおありの方はどうぞ。

参考: Building a simple websockets server from scratch in Ruby - Honeybadger Developer Blog

🔗 デモ

サーバーを実行します。

$ bundle exec puma

コネクションを作成するには、WebSocketクライアントを使うのが最も手軽です。websocatがおすすめです。

$ websocat ws://127.0.0.1:9292/

vi/websocat - GitHub

上のwebsocatを実行すると、Hello world!が出力されます。これでコネクションがアクティブになりました。理論上はこのソケットでメッセージを書き込んだり受け取ったりできますが、今はサーバーでメッセージの受信やパブリッシュ機能を何も実装していないので、実用的な意味では動いていません。

🔗 まとめ

Rackについてのくわしい解説は以上でおしまいです。
Rackとは何か、および基本的なRackアプリのセットアップ方法を学びました。次に、ソケットハイジャックによる永続的コネクションを維持する方法を学びました。

最後に、永続的コネクション上で通信するために、Webプラットフォームで提供されるSSE(Server Sent Events)とWebSocketという2つの仕様を利用しました。

  • SSE: サーバーからクライアントにデータを送信する単方向プロトコルで、HTTPと同様にプレーンテキストを用います。
  • WebSocket: サーバーとクライアントの間で通信する双方向プロトコルであり、バイナリ形式を用います。

Pumaのようなスレッド化されたWebサーバーを使う場合、永続的コネクションで困難が伴うことを常に覚えておきましょう。永続的コネクションはスレッドを拘束し、重大なパフォーマンスの問題を引き起こす可能性があります(スレッドメカニズムを独自に実装しない限り解決は難しいでしょう)。

本記事は最初にAppSignal Blogの記事として公開されました。

関連記事

Rack 2-> Rack 3アップグレードガイド(翻訳)

Rails: TurboのAction Cableコネクションに潜む脆弱性を事前に修正しよう(翻訳)


  1. 訳注: Ruby 3.0で導入されたのはFiber Schedulerであり、Fiberはもっと前からあります。 

CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。