Ruby: Rack PushでリアルタイムWebアプリをWebから切り離す(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

Ruby: Rack PushでリアルタイムWebアプリをWebから切り離す(翻訳)

原注: (更新情報: プルリクが更新されたので本記事に反映しました)

何だか面白くなってきました。

最近はWebSocketや、その年上の従兄弟であるEventSourceやらServer-Sent Events(SSE)の話題でもちきりです。Fayeや(Railsの)ActionCableも大流行で、リアルタイム更新処理がかつてないほどやりやすくなりました。

しかしリアルタイム更新処理は混乱の極みであり、セットアップもメンテナンスも困難です。パフォーマンスはこの際横に置いておくとしても、要するに既存の設計のコストが増大します。開発工数もハードウェアコストも高くつきます。

しかしRackリポジトリに出現した新しいPR #1272は、これらが今後すべて変革されることを約束しています(訳注: 同PRは翻訳公開時点でopenの状態です)。

このPRは、私たちのコードベースを短縮し、実時間のパフォーマンスを改善してリアルタイムWebアプリの総コストを削減するうえで大きな一歩となります。

これはWebアプリをWebから切り離すための重要な一歩です。

RackはRailsやSinatraなどのRubyフレームワークのインターフェイスであり、Webアプリはこれを用いてRubyアプリケーションサーバーとやりとりすることを思い出しましょう。Rackは広く使われているので影響力は小さくありません。

問題点のまとめ

現状の標準的なアプローチの問題点を簡単にまとめると、リアルタイム機能のサポートのためにリアルタイムアプリの各プロセスがサーバーを2つずつ実行しなければならないという点です。

2つのサーバーは同じポートをリッスンする可能性もあれば、何らかのgemによって隠蔽される可能性もありますが、最終的には2つの異なるI/Oイベントハンドリングの単位が同時に走っていなければならないのです。

「それはまたどうして?」とお思いの方向けにお答えいたします(気にならない方は問題解決のセクションまでスキップしてください)。

一時的なhijackの話

Rackでここ5年間唯一利用可能な「標準」ソリューションである、一時的な間に合わせのソリューションについてお話しします。

Rackの歴史上のある時点で、long pollingなどのHTTPテクニックをサポートする方法が仕様上必要とされていました。特にRails 4.0は「ライブストリーミング」機能的なものを必要としていました。

そのためにRackチームはhijack APIアプローチを採用しました。

このアプローチは迫りくるニーズのための間に合わせの修正で、Rack 2.0がリリースされるためのあくまで一時的な措置でした(Rackプロトコルのバージョンは5年経った今も1.3です)。

アプリはhijack APIを用いることでソケットを完全に制御できます。サーバーのソケットをハイジャックするだけで、たちまちlong pollingやSSEなどをサポートできました。

そしてここから問題がややこしくなっていったのです。

当時はソケットを扱うために大量のネットワークロジックをサーバー層からアプリ層にコピーしなければなりませんでした(write呼び出しのバッファリング、着信データ、プロトコル管理、タイムアウト処理など)。

これは明らかにSOLIDの「S」(単一責任の原則)に違反しています。アプリやフレームワークにI/Oハンドリングの責務を押し付けているからです。

そしてDRY原則にも違反しています。I/Oハンドリングのロジックが複製されてしまうからです(1つはサーバー内、もう1つはアプリ/フレームワーク)。

さらにこのアプローチはネットワークプロトコルとアプリが複雑に絡み合うため、HTTP/2接続でいくつもの問題が生じます。

hijackの明らかなコスト

hijackアプローチは多大なコストを要します。隠れたコストもありますが、目に見えてわかるコストはそれを上回ります。

最も目に付きやすいコストはメモリ、パフォーマンス、そして開発工数です。

hijackベースのソリューションではコードが重複して余分な処理が発生するため、メモリ消費量が増加し、パフォーマンスも低下します(システムコール数の増加やコンテキストスイッチの増加など)。

require 'faye'でアプリにWebSocketが追加されますが、このgemの読み込みだけで9Mbを要します(しかもサーバーが実際に動き出す前の時点です)。

一方、agoo HTTPサーバーまたはiodine HTTPサーバーを用いれば、WebSocketとSSEの両方がアプリに追加され、しかも余分なメモリを消費しません。

具体的には、iodineのメモリ消費量は2MbでPumaをわずかに下回り、しかもHTTPとリアルタイム機能の両方を提供してくれます。

hijackの隠れたコスト

さらに微妙なコストは、ハードウェアが高価なことと、hijack時の1台のマシンで扱えるクライアント数が少ないことです。

どうしてこうなるのでしょうか。

hijackアプローチはパフォーマンス面で劣るのみならず、一部のHTTPサーバーがselectシステムコールに依存してしまいます(前回ざっと見たときにはPumaがselectを使っていました)。

このselectシステムコールを用いると、オープンファイル数の上限が1024にまで落ちてしまい、プロセスあたりのオープン接続数が最大1024に制限されると見込まれます。

ある接続がハイジャックされると、そのソケットはWebサーバーが期待するほど速く閉じられず、最終的にオープンファイル数が1024を超えたときに破損またはクラッシュの可能性があります。

解決方法: コールバックとイベント

Rackで提案された新しいPR #1272は、WebSocketやSSEを効果的に実装する素晴らしい方法を提供し、かつアプリがサーバーのことを一切意識せずに済むようになります。

新しい提案ではネットワークやI/Oのハンドリングはサーバー側の責務のままとしたことで、アプリのコードベースがシンプルになり、ネットワークプロトコルから切り離されています。

アプリはあらゆるイベントの通知をコールバックオブジェクトを用いて受け取ります。これによりアプリがネットワーク層から解放され、データに専念できるようになります。

コールバックオブジェクトは、アプリを実行するサーバーや背後のプロトコルについて一切関知する必要がありません。

コールバックオブジェクトは、Rubyのextendアプローチを用いて正しいAPIに自動リンクされるので、アプリはサーバーについて何も気にする必要がありません(原注: PRはその後更新され、extendアプローチはclientオブジェクトをさらに追加する方法に置き換えられました)。

しくみ

あらゆるRackサーバーは、ハッシュ型オブジェクトを用いてRackアプリとやりとりします。

RailsやSinatra、そしてあらゆるRackアプリやフレームワークはこのしくみの上に構築されています。これはRackの現在の仕様に記載されています。

Rackを用いたシンプルなhello worldは次のようになります(config.ruというファイルに保存します)。

# 通常の HTTP response
RESPONSE = [200, { 'Content-Type' => 'text/html',
          'Content-Length' => '12' }, [ 'Hello World!' ] ]
# `env`変数を取り出す
APP = Proc.new {|env| RESPONSE }
# アプリの実行に使うRack DSL
run APP

新しい提案では、env['rack.upgrade?']という変数が導入されます。

通常、この変数はnilに設定されます(envハッシュに含まれません)。

しかしWebSocket接続の場合はenv['rack.upgrade?']:websocketに設定され、EventSource(SSE)接続の場合は:sseに設定されます。

コールバックオブジェクトを設定するには、env['rack.upgrade']を導入します(メソッド名に?がない点にご注意)。

設計は次のような感じになります。

# config.ruに配置
RESPONSE = [200, { 'Content-Type' => 'text/html',
          'Content-Length' => '12' }, [ 'Hello World!' ] ]

# Callbackクラスの例
class MyCallbacks
  def on_open client
    puts "* Push connection opened."
  end
  def on_message client, data
    puts "* Incoming data: #{data}"
    client.write "Roger that, \"#{data}\""
  end
  def on_close client
    puts "* Push connection closed."
  end
end

# `env`変数を取り出す
APP = Proc.new do |env|
  if(env['rack.upgrade?'])
    env['rack.upgrade'] = MyCallbacks.new
    [200, {}, []]
  else
    RESPONSE
  end
end
# アプリの実行に使うRack DSL
run APP

このアプリをagooサーバーまたはiodineサーバーで実行し、マジックを花開かせます。

たとえばiodineを使う場合は以下のようにします。

# iodineをインストール(バージョン0.6.0以降)
gem install iodine
# シングルスレッドモードで起動
iodine -t 1

localhost:3000をブラウザで開き、ブラウザコンソールでJavaScriptを少々テストしてみましょう。

まずはEventSource(SSE)接続から(以下をブラウザコンソールで実行)。

// SSE実行例
var source = new EventSource("/");
source.onmessage = function(msg) {
  console.log(msg.id);
  console.log(msg.data);
};

もちろん通知を送信してませんのでこれだけでは何にも起きませんが、これでSSE接続がオープンしました。

WebSocketでもやってみましょう(以下をブラウザコンソールで実行)。

// WebSocket実行例
ws = new WebSocket("ws://localhost:3000/");
ws.onmessage = function(e) { console.log(e.data); };
ws.onclose = function(e) { console.log("closed"); };
ws.onopen = function(e) { e.target.send("Hi!"); };

おお!Rubyコンソールを見てみるとWebSocketが確かに動いています。実に簡単ですね。

同じコード例はagooサーバーでも完全に動作します(agooもiodimeもRack Push提案を既にサポートしています)。

以下をお試しください。

# agooサーバーをインストール(バージョン2.1.0以降)
gem install agoo
# 起動
rackup -s agoo -p 3000

余分なgemも、余分なコードも、大量のメモリも使わずに、Rubyサーバーと生Rackだけでできるのです(あえてフレームワークを用いませんでした)。

素晴らしいPushを味わう

シンプルさはこれでわかりますが、まだどれだけ強力なのかがピンときません。

株式相場表示やタイマーを実装するとしましょう。ここではタイマーで考えます。

# config.ruに配置
RESPONSE = [200, { 'Content-Type' => 'text/html',
          'Content-Length' => '12' }, [ 'Hello World!' ] ]

# グローバルなライブ接続ストレージ
module LiveList
  @list = []
  @lock = Mutex.new
  def <<(connection)
    @lock.synchronize { @list << connection }
  end
  def >>(connection)
    @lock.synchronize { @list.delete connection }
  end
  def any?
    # 「live list」への接続を削除
    @lock.synchronize { @list.any? }
  end
  # 同じプロセスを共有する全接続にメッセージを1件送信する
  # (クラスタモードでは部分的なブロードキャストのみとなるのでこの方法はスケールしない)
  def broadcast(data)
    # リストをコピーして、クリティカルセクションを長時間実行しないようにする
    tmp = nil # スコープのこの部分をtmpに置く
    @lock.synchronize do
      tmp = @list.dup # リストをtmpにコピー
    end
    # クリティカルセクションの外でリストを反復
    tmp.each {|c| c.write data }
  end
  extend self
end

# まさにこの瞬間に時刻をブロードキャストするが...
# クラスタモードではスレッドが「中断する」
@thread = Thread.new do
  while(LiveList.any?) do
    sleep(1)
    LiveList.broadcast "The time is: #{Time.now}"
  end
end

# 静的なCallbackモジュールの例
module MyCallbacks
  def on_open client
    # 「live list」への接続を追加
    LiveList << client
  end
  def on_message(client, data)
    # ブロードキャストの単なる例
    LiveList.broadcast "Special Announcement: #{data}"
  end
  def on_close client
    # 「live list」への接続を削除
    LiveList >> client
  end
  extend self
end

# Rackアプリ
APP = Proc.new do |env|
  if(env['rack.upgrade?'])
    env['rack.upgrade'] = MyCallbacks
    [200, {}, []]
  else
    RESPONSE
  end
end
# アプリの実行に使うRack DSL
run APP

iodineサーバーをシングルプロセスモードで起動する(iodine -w 1)と、小さなタイマーが点滅します。

正直、上で私が書いたサンプルコードは好きになれません。何だか長くて頼りないし、iodineをクラスタモードで使えません。

そこで次のコード例として、(コメント含みで)32行のチャットルームをこしらえました。

LiveListモジュールやタイマースレッドを回避するため、iodineのpub/sub拡張APIを使うことにします。タイマーは別に欲しくないのでIodine.run_everyメソッドはスキップします。

また、やり取りの相手はWebSocketクライアントに限定します。私が動かせることを示すためです。

こちらの方が新しいenv['rack.upgrade']アプローチがもたらす真価がよくわかりますし、クラスタモードでもちゃんと動作します。

残念ながら、今度のコード例はagooサーバーでは今のところ動作しません。

# config.ruに配置
RESPONSE = [200, { 'Content-Type' => 'text/html',
          'Content-Length' => '12' }, [ 'Hello World!' ] ]
CHAT = "chat".freeze
# Callbackクラス
class MyCallbacks
  def initialize env
     @name = env["PATH_INFO"][1..-1]
     @name = "unknown" if(@name.length == 0)
  end
  def on_open client
    client.subscribe CHAT
    client.publish CHAT, "#{@name} joined the chat."
  end
  def on_message client, data
    client.publish CHAT, "#{@name}: #{data}"
  end
  def on_close client
    client.publish CHAT, "#{@name} left the chat."
  end
end
# 実際のRackアプリ
APP = Proc.new do |env|
  if(env['rack.upgrade?'] == :websocket)
    env['rack.upgrade'] = MyCallbacks.new(env)
    [200, {}, []]
  else
    RESPONSE
  end
end
# アプリの実行に使うRack DSL
run APP

コマンドライン(ターミナル)でアプリを起動します。

iodine

ブラウザコンソールで以下を実行します。

ws = new WebSocket("ws://localhost:3000/Mitchel");
ws.onmessage = function(e) { console.log(e.data); };
ws.onclose = function(e) { console.log("Closed"); };
ws.onopen = function(e) { e.target.send("Yo!"); };

原注: agoo 2.1.0からpub/sub拡張が実装されましたが、使われているセマンティクスが若干異なります。同じコード例がどちらのサーバーでも動くよう手を尽くしてみました。

なぜ今まで誰も思いつかなかったのか

実は、これはまったく新しいアイデアというわけではありません。

hijack API自体が提案されたときですら、別のアプローチが提案されていました

数年前にも別の提案が試みられていました。

しかし最終的に状況が変わり、agooサーバーとiodineサーバーがどちらも既に新しいアプローチをサポートするようになりました。

期待できそうです。

原注: 本記事のコード例は、Rack仕様へのPR変更に基づいて更新しました。

関連記事

Railsで学ぶSOLID(1): 単一責任の原則(翻訳)

技術的負債を調査する10のポイント(翻訳)

デザインも頼めるシステム開発会社をお探しならBPS株式会社までどうぞ 開発エンジニア積極採用中です! Ruby on Rails の開発なら実績豊富なBPS

この記事の著者

hachi8833

Twitter: @hachi8833、GitHub: @hachi8833 コボラー、ITコンサル、ローカライズ業界、Rails開発を経てTechRachoの編集・記事作成を担当。 これまでにRuby on Rails チュートリアル第2版の半分ほど、Railsガイドの初期翻訳ではほぼすべてを翻訳。その後も折に触れてそれぞれ一部を翻訳。 かと思うと、正規表現の粋を尽くした日本語エラーチェックサービス enno.jpを運営。 実は最近Go言語が好き。 仕事に関係ないすっとこブログ「あけてくれ」は2000年頃から多少の中断をはさんで継続、現在はnote.muに移転。

hachi8833の書いた記事

週刊Railsウォッチ

インフラ

ActiveSupport探訪シリーズ