Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)
Solid Queueは、Active Jobで利用できるDBベースのキューバックエンドであり、シンプルさとパフォーマンスを念頭に置いて設計されています。
通常のジョブエンキューや処理に加えて、ジョブの延期、コンカレンシー制御、キューの一時停止、数値によるジョブ単位の優先度指定、キュー順序に基づいた優先度指定、バルクエンキュー(Active Jobのperform_all_later
で使われるenqueue_all
)もサポートしています。
(ログ出力、instrumentation(計測)、CLIツールの改良、既存プロセス内での"async"モード実行方法、一意のジョブを指定する何らかの方法といった機能も間もなく登場予定です)
Solid Queueは、MySQL、PostgreSQL、SQLなどのSQLデータベースで利用可能で、FOR UPDATE SKIP LOCKED
句でジョブポーリング時のブロッキングやロック待機を回避します(利用可能な場合)。Solid Queueのジョブリトライ、ジョブ破棄、エラー処理、シリアライズ、ジョブの延期はActive Jobに依存しており、Ruby on Railsのマルチスレッドと互換性があります。
🔗 インストールと利用方法
アプリケーションのGemfileに以下の行を追加します。
gem "solid_queue"
続いて以下を実行します。
$ bundle
または以下のように手動インストールもできます。
$ gem install solid_queue
次に、必要なマイグレーションをインストールしてActive Jobのアダプタを設定する必要があります。提供されている以下のジェネレータを実行すれば、この2つを一度に行えます。
$ bin/rails generate solid_queue:install
これで、solid_queue
がproduction環境でActive Jobのデフォルトのアダプタとして設定され、必要なマイグレーションがアプリにコピーされます。
以下を実行すれば、マイグレーションだけをアプリに追加することも可能です。
$ bin/rails solid_queue:install:migrations
アプリのconfig/environments/ディレクトリで以下を手動で設定すれば、Solid QueueをActive Jobのバックエンドにできます。
# config/environments/production.rb
config.active_job.queue_adapter = :solid_queue
別のアダプタから移行するときに、ジョブを段階的にSolid Queueに移動したい場合は、以下を設定することで特定のジョブだけがSolid Queueをバックエンドとして利用するようにできます。
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self.queue_adapter = :solid_queue
# ...
end
設定が終わったら、最後にマイグレーションを実行する必要があります。
$ bin/rails db:migrate
これでSolid Queueでジョブをエンキューする準備が整います。ただし、ジョブを実行するには以下のようにSolid Queueのスーパバイザを起動しておく必要があります。
$ bundle exec rake solid_queue:start
これで、キュー内にあるすべてのジョブがデフォルト設定で処理を開始します。Solid Queueの設定方法について詳しくは後述します。
小規模プロジェクトであれば、Solid QueueをWebサーバーと同一のマシン上で実行できます。Solid Queueはすぐに利用可能な水平スケーリングをサポートしているので、スケーリングの準備が整い次第使えるようになります。
Solid QueueはWebサーバーと別のマシン上で実行することも、複数マシン上で同時にbundle exec rake solid_queue:start
を実行することも可能です。特定のマシンをディスパッチャ専用にしたい場合はbundle exec rake solid_queue:dispatch
を、ワーカー専用にしたい場合はbundle exec rake solid_queue:work
を利用します。
🔗 要件
Solid Queueを利用するには、Rails 7.1に加えて、FOR UPDATE SKIP LOCKED
をサポートしているデータベース(MySQL 8以上、またはPostgreSQL 9.5以上)で利用するのがベストです。これより前のバージョンのデータベースでもSolid Queueを利用できますが、同一のキューを複数のワーカーが実行するとロック待機が発生する可能性があります。
🔗 設定
🔗 ワーカーとディスパッチャ
Solid Queueには3種類のプロセスがあります。
- ワーカー(worker)
- 実行可能になったジョブをキューから選択して処理する役目を担います。ワーカーは
solid_queue_ready_executions
テーブルで動作します。 - ディスパッチャ(dispatcher)
- 今後実行するようスケジューリングされたジョブを選択してディスパッチする(dispatch: 振り分ける)役目を担います。ディスパッチ作業は、
solid_queue_scheduled_executions
テーブルにあるジョブをsolid_queue_ready_executions
テーブルに移動してワーカーが拾い上げられるようにするというシンプルなものです。また、定期的なタスクの管理も担当しており、タスクを処理するためのジョブをスケジュールに沿ってディスパッチします。コンカレンシー制御に関連するメンテナンス作業もいくつか行います。 - スーパバイザ(supervisor)
- 設定に基づいてワーカーやディスパッチャをforkし、ハートビートを制御し、必要に応じて停止や開始のシグナルを送信します。
Solid Queueは、デフォルトではconfig/solid_queue.yml
にある設定ファイルの探索を試みますが、SOLID_QUEUE_CONFIG
環境変数で探索パスを変更することも可能です。設定ファイルは以下のような感じになります。
production:
dispatchers:
- polling_interval: 1
batch_size: 500
concurrency_maintenance_interval: 300
workers:
- queues: "*"
threads: 3
polling_interval: 2
- queues: [ real_time, background ]
threads: 5
polling_interval: 0.1
processes: 3
必須項目はなく、すべての項目はオプションです。設定が指定されていない場合、Solid Queueはディスパッチャ1個とワーカー1個のデフォルト設定で実行されます。
polling_interval
- ワーカーとディスパッチャが追加ジョブをチェックする前に待機する時間を指定します(単位は秒)。デフォルト値は、ディスパッチャが
1
秒、ワーカーが0.1
秒です。 batch_size
- ディスパッチャがジョブをディスパッチするときのバッチサイズを指定します。デフォルトは500です。
concurrency_maintenance_interval
- ブロック中のジョブがブロック解除可能になったかどうかをディスパッチャがチェックするまで待機する時間を指定します(単位は秒)。詳しくはコンカレンシー制御を参照してください。デフォルト値は
600
秒です。 queues
-
ワーカーがジョブを選択するキューのリストを指定します。
*
を指定すると、すべてのキューが対象となります(無指定の場合はこれがデフォルトです)。このオプションには単一のキューを指定することも、キューのリストを配列で指定することも可能です。
ジョブはこれらのキューから順にポーリングされるので、たとえば[ real_time, background ]
を指定すると、real_time
で待機中のジョブがすべてなくなるまでbackground
のジョブはワーカーに拾われなくなります。以下のようにワイルドカードを使って特定のキューにマッチするプレフィックスを指定することも可能です。staging: workers: - queues: staging* threads: 3 polling_interval: 5
上の設定にすると、
staging
で始まるすべてのキューからジョブをフェッチするワーカーが作成されます。このワイルドカード*
は、キュー名の末尾に1個しか書けません。つまり*_some_queue
のようなキュー名は指定できません(指定しても無視されます)。
最後に、[ staging*, background ]
のように名前のプレフィックスと正確な名前を組み合わせることも可能です。順序に関する振る舞いは、正確な名前のみを組み合わせた場合と同じになります。 threads
- 各ワーカーがジョブを実行するのに必要な最大スレッドプール数です。個別のワーカーはこの個数以下のジョブをフェッチし、スレッドプールに配置して実行します。この設定はワーカーだけにあり、デフォルトは
3
です。 processes
- 指定の設定でスーパバイザによってforkされるワーカープロセス数の個数を指定します。デフォルトは
1
です(シングルプロセス)。この設定は、複数のCPUコアを1個のキュー(または同じ設定の複数のキュー)専用にしたい場合に便利です。この設定はワーカーだけにあります。 concurrency_maintenance
- ディスパッチャでコンカレンシーのメンテナンスを行うかどうかを指定します(デフォルトでは
true
)。この設定は、コンカレンシー制御を完全に無効にしたい場合や、ディスパッチャを複数実行している状態で一部のディスパッチャについては他の作業を行わずにディスパッチに専念させたい場合に便利です。 recurring_tasks
- ディスパッチャで管理する定期的なタスクのリストを指定します。このタスクについて詳しくは定期的なタスクセクションをお読みください。
🔗 キューの順序と優先度
上述のように、あるワーカーでキューのリストをreal_time,background
のように指定すると、その順序でポーリングされます。この場合、real_time
に待機中のジョブがなくなるまでbackground
のジョブはフェッチされなくなります。
Active Jobでは、ジョブをエンキューするときの優先度を正の整数で指定できます。Solid Queueは値が小さいほど優先度が高くなります。優先度のデフォルトは0
です。
この設定は、重要度や緊急度が異なるジョブを同一のキューで実行する場合に便利です。
ただし優先度に基づいてジョブが選択されるのは同一キュー内の場合です。先ほどのreal_time,background
の例で説明すると、background
キューの優先度をより高くしても、real_time
キュー内のジョブは引き続きbackground
キュー内のジョブよりも先にフェッチされます。
キューの順序と優先度を取り違えないためにも、どちらか一方のみを指定することをおすすめします。これにより、ジョブの実行順序が理解しやすくなります。
🔗 スレッド、プロセス、シグナル
Solid Queueのワーカーは、スレッドプールを用いて複数スレッドでの作業を実行します。これは上述のthreads
パラメータで設定できます。その他に、1台のマシン上の複数プロセス(ワーカーごとにprocesses
パラメータで設定可能)や水平スケーリングによってパラレリズム(並列処理)を実現できます。
スーパバイザはこれらのプロセスを管理しており、以下のシグナルに応答します。
TERM
、INT
- graceful shutdowを開始します。スーパバイザは監視対象プロセスに
TERM
シグナルを送信し、SolidQueue.shutdown_timeout
で指定された期間までプロセス終了を待機します。期間を過ぎても監視対象プロセスが残っている場合は、終了の必要があることを示すQUIT
シグナルを残りのプロセスに送信します。 QUIT
- 即時終了を開始します。スーパバイザは監視対象プロセスに
QUIT
シグナルを送信し、プロセスをただちに終了させます。
QUIT
シグナルを受信したときに、ワーカーに実行中のジョブがまだ残っている場合、これらのジョブはプロセスが登録解除されるときにキューに戻されます。
プロセス終了前にクリーンアップする機会がなかった場合(ケーブルが抜けてしまったなど)、 実行中のプロセスによってジョブの実行がリクエスト中のままになる可能性があります。プロセスはハートビートを送信し、スーパバイザはハートビートが期限切れのプロセスをチェックしてクリーンアップし、リクエスト中のジョブをキューに戻します。ハートビートの送信頻度と、プロセス停止を判断するしきい値を設定できます。次のセクションを参照してください。
🔗 その他の設定
注意: 本セクションの設定は、config/application.rb
ファイルまたは個別の環境設定ファイルでconfig.solid_queue.silence_polling = true
のように設定する必要があります。
Solid Queueの動作を制御する以下の設定も利用できます。
logger
- 利用するロガーを指定します。デフォルトはアプリのロガーです。
app_executor
- 非同期操作をラップするのに使われるRails executorです。デフォルトではアプリのexecutorが使われます。
on_thread_error
-
発生した例外を引数として受け取るスレッド内でエラーが発生したときに呼び出す、カスタムlambda/Procを指定します。デフォルトは以下です。
-> (exception) { Rails.error.report(exception, handled: false) }
connects_to
-
Active Recordの
SolidQueue::Record
抽象モデルで利用されるカスタムのデータベース設定です。これは以下のように、メインアプリとは別のデータベースを利用するのに必要となります。# Solid Queue用に別のDBを使う設定 config.solid_queue.connects_to = { database: { writing: :solid_queue_primary, reading: :solid_queue_replica } }
use_skip_locked
- ロック読み取りの実行時に
FOR UPDATE SKIP LOCKED
を利用するかどうかを指定します。この設定は将来自動検出される予定です。現在は、FOR UPDATE SKIP LOCKED
をサポートしていないデータベース(MySQL 8未満、PostgreSQL 9.5未満)では、この設定をfalse
にするだけで済みます。SQLiteはシーケンシャル書き込みを行うので、この設定は無効です。
process_heartbeat_interval
: すべてのプロセスが従うべきハートビート間隔です。デフォルトは60
秒です。
process_alive_threshold
- 直近のハートビート後にプロセス停止とみなすまでの待ち時間です。デフォルトは
5
分です。 shutdown_timeout
TERM
を送信したスーパバイザが、QUIT
を送信して即時終了を要求するまでの待ち時間です。デフォルトは5
秒です。silence_polling
- ワーカーやディスパッチャをポーリングするときにActive Recordのログに出力するかどうかを指定します。デフォルトは
true
です。 supervisor_pidfile
- 起動時にスーパバイザが作成するpidfileへのパスです。起動時に同じホスト内で複数のスーパバイザが実行されないようにする場合や、ヘルスチェック用途に利用できます。デフォルトは
nil
です。 preserve_finished_jobs
- 完了したジョブを
solid_queue_jobs
テーブルに残すかどうかを指定します。デフォルトはtrue
です。 clear_finished_jobs_after
preserve_finished_jobs
がtrueの場合に、完了したジョブを保持する期間を指定します。注意: 現時点では完了したジョブの自動クリーンアップ機能はありません。ジョブをクリーンアップするにはSolidQueue::Job.clear_finished_in_batches
を定期的に呼び出す必要がありますが、これは近い将来自動化される予定です。default_concurrency_control_period
- コンカレンシー制御パラメータのデフォルト値として使われます。デフォルト値は
3.minutes
です。
🔗 コンカレンシー制御
Solid Queueは、Active Jobをコンカレンシー制御で拡張することで、特定の型や特定の引数を持つジョブの最大同時実行可能数を制限できます。この方法で制限をかけるとジョブの実行はブロックされ、別のジョブが終了してブロックが解除されるか、設定されている有効期間(コンカレンシー制限のduration
)を過ぎるまで、ブロックされたままになります。ジョブはブロックされることはあっても、破棄されたり失われたりすることはありません。
class MyJob < ApplicationJob
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group
# ...
key
- 唯一の必須パラメータであり、ジョブ引数をパラメータとして受け取ります。渡すジョブ引数は、制限する必要のあるジョブを識別するのに用いられ、「シンボル」「文字列」「proc」のいずれかを渡すことが可能です。procがActive Recordのレコードを返す場合、キーはそのクラス名と
id
からビルドされます。 to
- デフォルト値は
1
です。 duration
- はデフォルトで
SolidQueue.default_concurrency_control_period
の値(デフォルト値は3.minutes
)が設定されますが、設定の変更も可能です。 group
- 異なるジョブクラスをグループ化してコンカレンシーを制御するのに使います。デフォルトではジョブクラス名が使われます。
これらの制御がジョブに含まれている場合、同じkey
をyieldするジョブの個数(to
で指定される)を最大としてコンカレント実行されるようになり、これはエンキューされた各ジョブのduration
の間持続します。
ただし「実行順序」については何も保証されません。保証されるのは、同時に(=実行時間が重なり合う形で)実行される点だけです。
以下の例を見てみましょう。
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes
def perform(contact)
# ...
上のcontact
とaccount
はActiveRecord
のレコードです。この場合、同じaccountに対するDeliverAnnouncementToContact
という種類のジョブが最大2個コンカレントに実行されるようになります。
何らかの理由でジョブの1つが5分以上かかった場合や、ジョブを取得してから5分以内にコンカレンシーロックが解放されなかった場合は、同じキーを持つ別の新しいジョブがロックを取得する可能性があります。
group
の別の利用例を見てみましょう。
class Box::MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key: ->(contact) { contact }, duration: 15.minutes, group: "ContactActions"
def perform(contact)
# ...
class Bundle::RebundlePostingsJob < ApplicationJob
limits_concurrency key: ->(bundle) { bundle.contact }, duration: 15.minutes, group: "ContactActions"
def perform(bundle)
# ...
上の場合、contactレコード(idは123
)用のBox::MovePostingsByContactToDesignatedBoxJob
というジョブと、bundleレコード(contactレコード123
を参照している)用のBundle::RebundlePostingsJob
という別のジョブが同時にエンキューされると、処理の続行を許されるジョブはどちらか1つだけとなります。続行を許されなかった方のジョブは、最初のジョブが完了するまで(または最初に何が起ころうと15分経過するまで)ブロックされたままになります。
duration
の設定は、ディスパッチャで設定したconcurrency_maintenance_interval
の値に間接的に依存している点に注意が必要です(この値は、ブロックされたジョブをチェックしてブロックを解除する作業の頻度を指定するからです)。duration
の値は、一般に「すべてのジョブがその期間内で適切に完了する」ように設定すべきであり、コンカレンシーのメンテナンスタスクは、あくまで「何か異常が発生したときのフェイルセーフ」として考える必要があります。
最後に、自動または手動で再試行されるようになっているジョブが失敗すると、その失敗したジョブは、新たにエンキューされる別のジョブと同じように扱われます。つまり、どちらもエンキューされてロックを取得しようとし、ロックを取得すれば実行されます。ジョブが過去にロックを既に取得していたかどうかは関係ありません。
🔗 失敗したジョブとリトライ
Solid Queueには自動リトライのしくみはなく、自動リトライについてはActive Jobに依存しています。「失敗した実行」はシステム内で保持され、そうしたジョブ用にsolid_queue_failed_executions
テーブルのレコードが作成されます。このジョブは、手動で破棄するか再度エンキューするまで残り続けます。
ジョブの破棄やリトライは以下のようにコンソールで実行できます。
failed_execution = SolidQueue::FailedExecution.find(...) # 自分のジョブに関連する失敗した実行を検索する
failed_execution.error # エラーを調べる
failed_execution.retry # 初回と同様にジョブを新たにエンキューする
failed_execution.discard # ジョブをシステムから削除する
ただし、失敗したジョブを調べてリトライや破棄を実行できるmission_control-jobsというダッシュボードがあるので、チェックすることをおすすめします。
🔗 Pumaプラグイン
Solid QueueのスーパバイザをPumaと一緒に実行して、Pumaで監視や管理を行いたい場合に利用できるPumaプラグインを提供しています。puma.rb
設定ファイルに以下を追加するだけで利用できます。
plugin :solid_queue
🔗 ジョブとトランザクションの整合性について
警告: ジョブをアプリケーションデータと同じACID準拠のデータベースに配置すると、強力なツールが有効になります。つまり、トランザクション整合性を利用して、ジョブがコミットされない限りアプリ内の特定のアクションもコミットされないようにすることが可能になります。これは非常に強力かつ有用ですが、将来Active Jobのバックエンドを別のものに差し替えたり、Solid Queueを単に専用のデータベースに移動して振る舞いが急に変更されたりすると、逆効果になる可能性があります。
これに依存したくない場合や、誤って依存することを避けたい場合は、以下の注意を守る必要があります。
- 特定レコードに依存するジョブは、常に
after_commit
コールバックでエンキューするか、ジョブが使うデータが「ジョブがエンキューされる前に」確実にコミットされる場所でエンキューすること。 -
この振る舞いを完全にオプトアウトしたい場合は、Solid Queueがアプリと同じデータベースを使う場合であっても、アプリケーションのリクエストやジョブ実行を処理するスレッド上で「別のコネクションが使われる」ようにSolid Queueのデータベースを設定します。以下は設定例です。
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
connects_to database: { writing: :primary, reading: :replica }
config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } }
🔗 定期的なタスク
Solid Queueは、将来の特定の時刻になると定期的に実行される、cronジョブのような繰り返しタスクの定義をサポートします。これらのタスクはディスパッチャのプロセスによって管理されるので、以下のようにディスパッチャのコンフィグで定義できます。
dispatchers:
- polling_interval: 1
batch_size: 500
recurring_tasks:
my_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
recurring_tasks
はハッシュ(辞書)形式であり、キーは内部のタスクキーとなります。個別のタスクには、キューイングするジョブクラスと、スケジュール指定が必要です。スケジュールの解析はFugitで行われるので、Fugitにcronとして渡せる形式であれば任意のスケジュールを受け取れます。また、ジョブに渡す引数も指定できます。渡せる引数は、「単一の引数」、「引数のハッシュ」または「引数の配列(配列の最後の要素にキーワード引数を含めることも可能)」です。
上のコンフィグ例では以下のようなジョブが毎秒(every second)エンキューされます。
MyJob.perform_later(42, status: "custom_status")
タスクは、対応する時刻になると、そのタスクを所有するディスパッチャによってエンキューされ、各タスクは次のタスクをスケジューリングします。この手法は、good_job gemの機能から多くのインスピレーションを得ています。
同一のrecurring_tasks
を利用して複数のディスパッチャを実行することも可能です。重複したタスクが同時にエンキューされるのを防ぐために、ジョブがエンキューされるのと同じトランザクション内に新しくsolid_queue_recurring_executions
テーブルが作成されます。個別のタスクで1度に1個のエントリだけが作成されることを保証するために、このテーブルのtask_key
とrun_at
にunique indexが設定されます。この機能は、preserve_finished_jobs
をtrue
に設定した場合にのみ有効で(デフォルトではtrue
)、この保証はジョブが維持されている限り適用されます。
最後に、Solid Queueで処理しないジョブを構成することも可能です。これは、アプリで以下のような形でジョブを実行するだけでできます。
class MyResqueJob < ApplicationJob
self.queue_adapter = :resque
def perform(arg)
# ..
end
end
Solid Queueで以下のコンフィグを利用することも可能です。
dispatchers:
- recurring_tasks:
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
この場合、ジョブはperform_later
によってエンキューされるので、Resqueで実行されます。ただし、このジョブはどのsolid_queue_recurring_execution
レコードでもトラッキングされなくなるので、ジョブのエンキューが1回限りであることは保証されなくなります。
🔗 インスパイアされたプロジェクト
Solid Queueは、resqueとGoodJobにインスパイアされています。これらのプロジェクトは私たちが多くを学んだ素晴らしい事例なので、ぜひチェックしてみてください。
🔗 License
このgemは、MIT Licenseに基づいてオープンソースとして利用可能です。
概要
MITライセンスに基づいて翻訳・公開いたします。
日本語タイトルは内容に即したものにしました。