Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Rails: マルチテナントでSidekiqジョブを公平に優先順位付けするsidekiq-fair-tenant gem(翻訳)

概要

元サイトの許諾を得て翻訳・公開いたします。

日本語タイトルは内容に即したものにしました。

Rails: マルチテナントでSidekiqジョブを公平に優先順位付けするsidekiq-fair-tenant gem(翻訳)

はじめに

多くのバックエンドアプリケーションのマルチテナントシステムは、特に複雑なものになるとバックグラウンドジョブでのキュー管理が難しくなることがあります。これは、小規模な販売業者と大規模な販売業者が同居する可能性があるeコマースアプリのような、多様なユーザーベースに対応するアプリケーションを考慮したときに特に当てはまります。これを「公平に」管理するのは難題です。
本記事では、RailsとSidekiqを利用する場合に可能な1つのソリューションと、それを実装する新しいRuby gemを紹介します。
それがsidekiq-fair-tenantです。

Envek/sidekiq-fair_tenant - GitHub

しかしその前に、もう少し背景情報について説明します。
このコンテキストにおける「バックグラウンドジョブ」とは、トランザクション処理、在庫の更新、レポート生成といった作業単位を指します。こうしたジョブは、UI操作やアプリケーションサーバーの処理がブロックされないようジョブキューに配置して、後で処理するのが普通です。

しかし、そこに優先順位付けのためのメカニズムがなければ、1件のテナントから大量のジョブバッチが発生したときにリソースを専有されてしまう可能性があります。そうなると、キューの後ろで順番待ちしている他のユーザーのジョブに数分〜数時間の遅延が発生して、パフォーマンスが大幅に悪化することがあります(サービスの利用規約次第でSLOやSLAに違反すれば、非常に深刻な事態となります)。

たとえば、上述のeコマースプラットフォームの例に話を戻すと、ある大規模な販売業者が商品の在庫レベルを更新する数十万件のリクエストを送信すれば、適切な優先順位付けがされないまま数十万件ものジョブがエンキューされる可能性がありえます。すると、この膨大なジョブバッチによってシステムが詰まり、小規模な販売業者が送信したジョブの処理が遅れてしまう可能性があります。この状況は明らかに不公平であり、解決しなければなりません。

私たちが編み出したソリューションを、sidekiq-fair-tenantというRuby gemに切り出しました。これは、RailsとSidekiqを利用するマルチテナントアプリケーションで公平な優先順位付けを実装するために設計されました。

  • 1: 自分のジョブクラス内のsidekiq_optionsに以下のfair_tenant_queuesセクションを追加します。
class SomeJob
  sidekiq_options
    queue: 'default',
+    fair_tenant_queues: [
+     { queue: 'throttled_2x', threshold: 100, per: 1.hour },
+     { queue: 'throttled_4x', threshold:  10, per: 1.minute },
+    ]
end
  • 2: ジョブクラスにテナント検出ロジックを追加します。
class SomeJob
+  def self.fair_tenant(*_perform_arguments)
+    # テナント名として使われる何らかの文字列を返す
+    "tenant_1"
+  end
end

以上で準備は完了です(詳しくはgemのREADMEをご覧ください)。

このgemの本質を理解するために、このgemが誕生したときの経緯を見てみることにしましょう。

私たちのソリューションを解説するために、私たちの顧客であるCoveralls様との経験についてお話しいたします。同社は、テストスイートでカバーできていないコードをあぶりだす形でテストカバレッジを監視するアプリケーションを提供しています。


Coveralls様と取り組んだ内容について詳しくは以下の記事をどうぞ。カバレッジファイルを解析するCLIツールをCrystal言語で作成して、解析データをCoverallsに送信するようにしました。

From Ruby to Crystal? Writing and distributing a CLI tool—Martian Chronicles, Evil Martians’ team blog

このプラットフォームの顧客は、扱うリポジトリ数またはGitHub Organizationの数に応じて料金を支払いますが、リポジトリ数やアクティビティのレベルは顧客によって大きく異なる可能性があります(小規模で非常に活発なリポジトリで生成される「軽量な」タスクの件数が、大規模だがあまり活発でないリポジトリで生成される比較的小数の「重量級」タスクの件数を上回る可能性もあります)。

その結果、支払い額が小さい層に属する顧客(ただしコードベースは大規模かつ非常に活発)がバックグラウンドジョブを専有してしまい、その結果他のすべての層に属する顧客の待ち時間が長くなってしまう可能性がありえます。

🔗 問題の発見と取り組み

このような問題は、バックグラウンドジョブのバックアップ中にサービス速度が低下していることを監視ツールが通知したときに判明するのが典型的です。さらに詳しく調査すると、この事態につながった顧客は1つしかバックログに見当たらなかったということもざらにあります。

Ruby on RailsとSidekiqに関連する(つまり基礎となる技術スタックがRubyとRedisに関連している)技術ソリューションで課題となるのは、パフォーマンスやストレージを犠牲にすることなくキューを管理する方法です。何度か議論を重ねた後、私たちはSidekiqに適した方法を見出すために「公平なキュー」を詳しく実験し始めました。

「公平さ」のさまざまな戦略については、以下のリポジトリをご覧ください。
palkan/faqueue - GitHub

主な課題は、ジョブ作成につきものの「予測不可能性」に根ざしていました。test環境におけるさまざまなユーザーからのジョブバッチは事前に定まっていますが、現実のジョブは時間ウィンドウの中でも分散するうえに、次のバッチがいつどんなサイズでやってくるかは予測しようがないので、「静的な」実験ではあまり役に立ちません。全ユーザーのジョブ履歴を洗い出せるようにする必要があります。

バックグラウンドジョブツールによっては優先順位付きのキューを利用できるものもあります。Solid Queueのジョブ優先順位をご覧ください。
それでも次のタスクで適切な優先順位を選択するロジックを実装する必要があります(これは私たちのソリューションと近いものになる可能性があります)。

さらに別の技術的制限は、Sidekiqのキューは厳密に順序付けられていることです。同一キュー内では、あるジョブを他のジョブより優先することはできません。そのため、欲しい効果を達成するためには、キューを複数使うか、スケジューリングされたジョブをハックする必要があります。
本記事では、「補助キュー(auxiliary queue)」の手法を実装することにします。「極端に重たい」ジョブは「遅い」キューに回されることになります。

🔗 ソリューション

私たちが提案した戦略は、「貪欲なテナント」のジョブをスロットリング(throttling: 抑制)して、他のテナントに道を譲らせるというものです。
最初のステップは、この値を超えたら貪欲とみなすしきい値を定義することでした。そして、すべてのユーザーについてジョブは1日100件までという暫定的なしきい値を設定しました。

しかしこれを実装するために、Redisでこのスライディングウィンドウ(sliding window: このウィンドウは「時間枠」を表します)を作成する方法と、既存のSidekiqアーキテクチャ内でのスロットリングメカニズムを実装する方法を慎重に検討しなければなりませんでした。

スライディングウィンドウを実装する方法の1つは、Redisのsorted sets(ソート済みセット)データ構造を活用するというものです(このデータ構造のユースケースには、何とスライディングウィンドウのレートリミッターも含まれていました!)。

この方法の長所:

  • ジョブのリトライを適切に処理できる(失敗したジョブのためにクォータを使わずに済む)。

この方法の短所:

  • 実行するジョブの個数に応じて必要なスペースが増加する。
    この要件は、1日あたり数十億件のジョブを処理するアプリケーションでは厳しすぎる可能性があります。

別の方法として、leaky buckets(水漏れバケツ)というアルゴリズムを使う手法があります1。これは保存場所がずっとコンパクトになります(必要なスペースはジョブ数ではなくテナント数に比例します)が、時間ウィンドウの長さを変更したり、1つのテナントに複数の時間ウィンドウを設定するのが難しいため、ここではより堅牢なsorted set実装で進めることにし、leaky bucketsについては読者の演習とします。

Redisのsorted setsでソリューションを実装する方法を詳しく見てみましょう。作成するテナントごとに、「Sidekiqのジョブ識別子(jid)」を要素として、「ジョブのエンキュー時刻」を要素の重み付けとして含むsorted setをRedisに保持します。これで、任意の時間ウィンドウ内に含まれるエンキューされたジョブの個数を簡単にカウントできるようになり、1個のsorted setで複数の時間ウィンドウをトラッキングできるようになります。

アプリケーション開発者向けのAPIは以下のような感じになります。

class HeavyJob
  include Sidekiq::Job

  sidekiq_options
    queue: 'default',
    fair_tenant_queues: [
      { threshold: 100, per: 1.day,  queue: 'default_throttled' },
      { threshold:  40, per: 1.hour, queue: 'default_superslow' },
    ]

  def self.fair_tenant(klass, id) # same arguments as in `perform`
    record = (klass.is_a?(String) ? klass.safe_constantize : klass).find(id)
    record.user_id
  end

  def perform(klass, id)
    # ジョブの実装
  end
end

ジョブを別のキューに「再ルーティング」するには、Sidekiqの"client"ミドルウェアを実装する必要があります。この実装では、エンキューされたすべてのジョブについてしきい値を超えていないかどうかをチェックし、しきい値違反を検出した場合はジョブをメインキューではなく低速キューに入れて実行します。

Sidekiqのミドルウェアは、"client"と"server"の2種類があります。"client"ミドルウェアはジョブがエンキューされたタイミングで実行され、"server"ミドルウェアはジョブが実行されるタイミングで実行されます。

module Sidekiq::FairTenant
  TENANT_ENQUEUES_KEY = "sidekiq-fair_tenant:enqueued:%<job_class>s:tenant:%<fair_tenant>s".freeze
  MAX_THROTTLING_WINDOW = 1.day

  class ClientMiddleware
    def call(worker, job, queue, redis_pool)
      # ここに実装する
    end
  end
end

最初に、現在のジョブがスロットリング対象かどうかをチェックする必要があります(不要な場合はスキップします)。

return yield unless job["fair_tenant_queues"]&.any? # このジョブにスロットリングルールがないかをチェック
return yield if queue != job["queue"] # 既に誰かがこのジョブを再ルーティング済みかをチェック

対象となるテナントにスロットリングルールが定義済みかどうかをチェックする必要があります。ミドルウェアのcalljob引数には、sidekiq_optionsヘルパーにあるものがすべて指定できるので、これを利用してjob["fair_tenant_queues"]配列が存在するかどうかをチェックできます。配列が存在しない場合や空配列の場合はジョブのスロットリングが不要なので、後はyieldを実行すればスタック上(またはジョブ自身)の次のミドルウェアに制御が移動します。

次に、ジョブがそのジョブクラス内で定義されたキュー内にまだ残っているか、つまりジョブがスタック上のミドルウェアによって既に再ルーティングされていないかをチェックして、他のSidekiqプラグインとうまく連携させたいと思います。

次に、テナント識別子が判明していることをチェックする必要があります(まだの場合はテナント識別子を探索します)。

worker = worker.is_a?(Class) ? worker : worker.constantize
job["fair_tenant"] ||= worker.fair_tenant(*job["args"]) if worker.respond_to?(:fair_tenant)
if job["fair_tenant"].blank?
  Rails.logger.warn "#{worker} with args #{job["args"].inspect} won't be throttled due to missing fair_tenant (#{job["fair_tenant"].inspect})"
  return yield
end

テナント識別子は複数の方法で指定できます。
MyJob.set(fair_tenant: "tenant_1").perform_asyncのようにジョブエンキュー時に明示的に指定することも、クラスレベルのfair_tenantフックで算出することも可能です。ただしテナント識別子が存在しない場合は、そのジョブのスロットリングをスキップして警告をログ出力します(おそらくバグ)。

すべてのチェックがパスしたら、ジョブをスライディングウィンドウに「登録」し、必要に応じて別キューに再ルーティングして、最終的にエンキュー可能になります。

redis_pool.then do |redis|
  register_job(worker, job, queue, redis)
  job["queue"] = assign_queue(worker, job, queue, redis)
end

yield

手短に言うと、登録は(1つのトランザクション内で)同時に実行されるわずか3つのコマンドです。

def register_job(worker, job, _queue, redis)
  fair_tenant = job["fair_tenant"]
  tenant_enqueues_key = TENANT_ENQUEUES_KEY % { job_class: worker, fair_tenant: fair_tenant }
  redis.multi do |tx|
    tx.zadd(tenant_enqueues_key, Time.current.to_i, "jid:#{job["jid"]}")
    tx.zremrangebyscore(tenant_enqueues_key, "-inf", MAX_THROTTLING_WINDOW.ago.to_i)
    tx.expire(tenant_enqueues_key, MAX_THROTTLING_WINDOW)
  end
end

登録の前に、そのテナントのsorted setで使うキー名を取得する必要があります。私たちの場合は、Rubyの文字列フォーマットを利用することで今後キー名のフォーマットを変更しやすくしています。

次に、以下のコマンドを順に実行します。

  • MULTIコマンドでトランザクションをオープンします。
    これによって、すべてのコマンドが一度に実行されます。
  • ジョブ識別子をZADDコマンドでsorted setに追加します。
    sorted setが存在しない場合はRedisが自動的に作成します。

  • 次に、ZREMRANGEBYSCOREコマンドでsorted setをトリミングし、最大時間ウィンドウよりも古いジョブを削除します。

  • 次にEXPIREコマンドでキーに有効期限を追加します。
    こうすると、テナントがアクティブでなくなったタイミングで、エンキュー済みの古いジョブIDを含むsorted set全体が、最大時間経過後にRedisによって自動削除され、以後スペースを消費しなくなります。

  • 最後は、ブロックが終了したタイミングで、Redis Rubyクライアントがトランザクション内のEXECコマンドをすべて実行します。

以上で完了です!

その後は、ジョブを再ルーティングすべきかどうかを決定する必要があるので、ジョブがしきい値に違反していないかどうかをチェックし、違反している場合はジョブを別キューに割り当てます。

# 制限(しきい値/時間)最もが迫っているルールに該当するもののうち最後のキューを選択する
# 最も制限が迫っている最も遅いキューが`fair_tenant_queues`配列の末尾に来ることが前提
def choose_queue(worker, job, queue, redis)
  tenant_enqueues_key = TENANT_ENQUEUES_KEY % { job_class: worker, fair_tenant: job["fair_tenant"] }
  job["fair_tenant_queues"].map(&:symbolize_keys).filter do |threshold:, per: MAX_THROTTLING_WINDOW, **|
    threshold < redis.zcount(tenant_enqueues_key, per.ago.to_i, Time.current.to_i)
  end.last&.[](:queue) || queue
end

ここでの主な作業はRedisのZCOUNTコマンドによって行われます。
ZCOUNTコマンドは、perで示した秒数前の時刻から現在時刻までの時間ウィンドウ内にエンキューされたジョブの個数をカウントします。ジョブの個数がしきい値を超えるとルールに一致したとみなし、最後に一致したルールに書き込まれているキュー名を返します。ジョブの個数がしきい値を超えない場合は、元のキュー名を返します。
選択されるルールは常に最後に一致したルールなので、ルールのリストが最も制限に余裕があるものから最も制限が迫っているものへと順序付けられ、構成がより堅牢になります。
既存のスロットリングが十分に厳密でない場合は、単に新しいルールを配列の末尾に追加します。新しいルールの時間ウィンドウはより狭くなり、新しいルールが指すキューのスロットリングは強められます。

最後に、"client"ミドルウェアをクライアント(ジョブを実行しないプロセス)とサーバー(Sidekiqのワーカープロセス)の両方で有効にする必要があります。このサーバーは、Sidekiqの他のジョブ内からエンキューされたジョブを再ルーティングする必要もあります。

# config/initializers/sidekiq.rb
Sidekiq.configure_client do |config|
  config.client_middleware do |chain|
    chain.add Sidekiq::FairTenant::ClientMiddleware
  end
end
Sidekiq.configure_server do |config|
  config.client_middleware do |chain|
    chain.add Sidekiq::FairTenant::ClientMiddleware
  end
end

🔗 キューをセットアップする

このソリューションが機能するための主要なメカニズムは、Sidekiqの「重み付きキュー(weighted queues)」です(Sidekiq wikiのAdvenced options for queuesを参照)。重み付けを行うことで、高速キューは低速キューの数倍高速になります(例: 高速キューを4、低速キューを1に重み付けすると、低速キューの約4倍高速になる)。

:queues:
  - [general_queue, 6]
  - [general_queue_throttled, 3]
  - [general_queue_superslow, 1]

上の設定例では、general_queue内のジョブはgeneral_queue_throttled内のジョブよりも2倍の頻度で実行され、general_queue_superslow内のジョブよりも6倍の頻度で実行されるチャンスを得られます。

Sidekiqの内部については、Sidekiq::BasicFetchで行われるキューのポーリング(polling)順序算出部分のコードを見てみると、Sidekiqワーカーが次のジョブを実行用に取得しようとするたびに、キューの重み付けに沿ってジョブをポーリングするためにキューをshuffleしていることがわかります。
つまり、最初にgeneral_queueからジョブを取得しようとした場合は時間の60%、最初にgeneral_queue_throttledからジョブを取得しようとした場合は時間の30%、最初にgeneral_queue_superslowからジョブを取得しようとした場合は時間の10%となります。
つまり、たとえgeneral_queueに多数のジョブが待機していても、general_queue_throttledのジョブもペースが落ちるだけで引き続き処理が進められます。

この方法の長所:

  • 高速キューが空の場合、低速キューはフルスピードで処理されます(人工的な遅延は生じません)。
  • 高速キューがいっぱいになっていても、低速キューは遅くなるだけで処理は止まらずに進められます(設定で変更可能)。
    このため、スロットリングされたユーザーから見てアプリが「詰まる」こともありません。

この方法の短所:

  • Sidekiqは「順序付きキュー」モードと「重み付きキュー」モードの混合をサポートしていません(Sidekiq wikiのqueue configurationに記載されています)。
    このため、同じワーカーで極めて重要度の高いキューを優先的に実行させるために他のキューを無視させようとしても、同じワーカーで他のキューを無視させることはできません。

Sidekiq 7では、Capsulesを使えば実現できます。
それより前のバージョンでは、別のワーカープロセスのセットを起動しておく必要があります。

  • すべてのキューとその重み付けを常にトラッキングしなければならず、production環境用のSidekiq設定とdevelopment環境用のSidekiq設定が、ジョブクラス内の設定と常に一致するよう手動で同期する必要があります。

🔗 導入の結果と反響

実装後、全般に好評をいただきました!
今回の主な目的は「小規模」な顧客が遅延に遭遇しないようにすることでしたが、この目標は達成され、処理の遅延に関するクレームもなくなりました。

同時に、今後もっと大規模な顧客がビルド時間の遅さを気にして「もっと速くして欲しい」という要望を出すことがあっても、通常なら「より強力なプラン」を、場合によっては顧客が専用で動かすインスタンスをセットアップするプラン」を提示することも可能になってきました。

この方法なら、アプリはすべての顧客にえこひいきなしで良好なユーザーエクスペリエンスを保証できるようになります。

🔗 gemの方もお忘れなく!

念のため申し添えておくと、面倒な作業は私たちが何もかも片付けたので、本記事のサンプルコードを頑張ってコピーする必要はありません。ぜひsidekiq-fair-tenant gemをチェックしてください!

Envek/sidekiq-fair_tenant - GitHub

sidekiq-fair-tenant gemは、マルチテナントアプリケーションでジョブの公平な優先順位付けを実装するために設計されています。しかも、素のSidekiqジョブだけでなく、何とActive Jobもサポートしています。


Evil Martiansは、成長段階のスタートアップ企業をユニコーン企業に飛躍させるためにサポートいたします。開発ツールの構築やオープンソース製品の開発も行っています。ワープの準備が整ったお客様、ぜひフォームまでご相談をお寄せください!

関連記事

Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)

Rails: Active Jobスタイルガイド(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。