Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)
Solid Queueは、Active Jobで利用できるDBベースのキューバックエンドであり、シンプルさとパフォーマンスを念頭に置いて設計されています。
通常のジョブエンキューや処理に加えて、ジョブの延期、コンカレンシー制御、ジョブの定期実行、キューの一時停止、数値によるジョブ単位の優先度指定、キュー順序に基づいた優先度指定、バルクエンキュー(Active Jobのperform_all_laterで使われるenqueue_all)もサポートしています。
Solid Queueは、MySQL、PostgreSQL、SQLiteなどのSQLデータベースで利用可能で、FOR UPDATE SKIP LOCKED句でジョブポーリング時のブロッキングやロック待機を回避します(利用可能な場合)。Solid Queueのジョブリトライ、ジョブ破棄、エラー処理、シリアライズ、ジョブの延期はActive Jobに依存しており、Ruby on Railsのマルチスレッドと互換性があります。
🔗 インストール方法
Rails 8の新規アプリケーションでは、デフォルトでSolid Queueが設定されます。
Rails 8より前のバージョンのRailsを実行している場合は、以下の手順に沿うことで手動で追加できます。
bundle add solid_queuebin/rails solid_queue:install
(注: サポートされる最小バージョンはRails 7.1とRuby 3.1.6です)
上のコマンドを実行することで、Solid Queueがproduction向けのバックエンドとして設定され、config/queue.ymlとconfig/recurring.ymlが作成されます。また、bin/jobs実行可能ラッパーも作成され、ここからSolid Queueを起動できます。
次に、config/database.ymlファイルにキュー用のデータベース設定を追加する必要があります。SQLiteを使っている場合は、以下のような設定になります。
production:
primary:
<<: *default
database: storage/production.sqlite3
queue:
<<: *default
database: storage/production_queue.sqlite3
migrations_paths: db/queue_migrate
MySQL/PostgreSQL/Trilogyを使っている場合は、以下のような設定になります。
production:
primary: &primary_production
<<: *default
database: app_production
username: app
password: <%= ENV["APP_DATABASE_PASSWORD"] %>
queue:
<<: *primary_production
database: app_production_queue
migrations_paths: db/queue_migrate
設定が終わったら、db:prepareを実行して、データベースの作成とスキーマ読み込みが行われるようにしてください。
これで、作業しているサーバーでbin/jobsを実行すれば、ジョブの処理を開始する準備が整います。これにより、デフォルト設定を用いてすべてのキューのジョブの処理が開始されます。Solid Queueの設定について詳しくは後述します。
小規模プロジェクトの場合は、Webサーバーと同じマシンでSolid Queueを実行できます。スケーリングの準備ができたら、Solid Queueはすぐに利用可能な水平スケーリングをサポートします。
Solid QueueをWebサーバーと別のサーバーで実行することも、複数のマシンで同時にbin/jobsを実行することも可能です。設定を変更すれば、一部のマシンのみをディスパッチャ専用やワーカー専用として実行することも可能です。詳しくは、後述の設定セクションを参照してください。
注: 今後のスキーマ変更は、通常のマイグレーションで行うことになります。
🔗 development環境およびその他非production環境での利用法
bin/rails solid_queue:installを呼び出すと、config.solid_queue.connects_to = { database: { writing: :queue } }がconfig/environments/production.rbファイルに自動的に追加されます。Solid Queueを他の環境(developmentやstagingなど)で使うには、同様の設定を追加する必要があります。
たとえば、SQLiteをdevelopment環境で使っている場合は、database.ymlを以下のように更新します。
development:
+ primary:
<<: *default
database: storage/development.sqlite3
+ queue:
+ <<: *default
+ database: storage/development_queue.sqlite3
+ migrations_paths: db/queue_migrate
次に、development.rbに以下を追加します。
# Solid Queueを開発に使う
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }
追加したら、bin/rails db:prepareを実行してSolid Queueデータベースを作成し、スキーマを読み込みます。
最後に、ジョブを処理するためにSolid Queueを起動する必要があります。development環境では、Pumaプラグインでも実行できます。puma.rbファイルの以下の行を更新します。
# 環境変数でもenv.development?でもチェックできる
plugin :solid_queue if ENV["SOLID_QUEUE_IN_PUMA"] || Rails.env.development?
bin/jobsだけを使うことも可能ですが、その場合は以下のようにSolid Queueに別のロガーを設定するとよいでしょう。デフォルトのロガーはlog/development.logにログを出力するので、bin/jobsを実行しても何も表示されません。
config.solid_queue.logger = ActiveSupport::Logger.new(STDOUT)
Action Cableに関するインポートの注意: Action Cable(またはAction Cableに依存するTurbo Streamsなど)を使う場合は、データベースを使うように更新する必要もあります。
config/cable.ymlを以下のように変更します。
development:
- adapter: async
+ adapter: solid_cable
+ connects_to:
+ database:
+ writing: cable
+ polling_interval: 0.1.seconds
+ message_retention: 1.day
config/database.ymlを以下のように変更します。
development:
primary:
<<: *default
database: storage/development.sqlite3
+ cable:
+ <<: *default
+ database: storage/development_cable.sqlite3
+ migrations_paths: db/cable_migrate
🔗 単一データベースによる設定方法
Solid Queueは別のデータベースで実行することが推奨されていますが、1個のデータベースをアプリとキューの両方で利用することも可能です。その場合は以下の手順を実行します。
db/queue_schema.rbの内容を通常のマイグレーションファイルにコピーし、db/queue_schema.rbを削除します。production.rbのconfig.solid_queue.connects_toを削除します。- データベースのマイグレーションを実行することで、
bin/jobsを実行可能になります。
データベースが複数ではないため、database.ymlにプライマリデータベースとキューデータベースを設定する必要はありません。
🔗 Solid Queueを段階的に導入する場合
ジョブを1件ずつ切り替える形でSolid Queueを段階的に導入する計画がある場合は、既存のバックエンドにあるconfig.active_job.queue_adapterは変更せずに、移行するジョブで以下のようにqueue_adapterを直接指定することで可能になります。
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self.queue_adapter = :solid_queue
# ...
end
🔗 高パフォーマンスで必要な要件
Solid QueueはFOR UPDATE SKIP LOCKEDをサポートしているため、MySQL 8以降やPostgreSQL 9.5以降で利用した場合に最大のスループットが得られるように設計されています。これらより古いバージョンでも利用可能ですが、その場合は同じキューに対して複数のワーカーを実行するとロック待機が発生する可能性があります。小規模アプリケーションであればSQLiteでも利用可能です。
🔗 設定
🔗 ワーカー、ディスパッチャ、スケジューラ
Solid Queueには以下のようなアクターがあります。
- ワーカー(worker)
- 実行可能になったジョブをキューから選択して処理する役目を担います。ワーカーは
solid_queue_ready_executionsテーブルで動作します。 - ディスパッチャ(dispatcher)
- 今後実行するようスケジューリングされたジョブを選択してディスパッチする(dispatch: 振り分ける)役目を担います。ディスパッチ作業は、
solid_queue_scheduled_executionsテーブルにあるジョブをシンプルにsolid_queue_ready_executionsテーブルに移動して、ワーカーが処理できるようにします。コンカレンシー制御に関連するメンテナンス作業もいくつか行います。 - スケジューラ(scheduler)
- 定期的なタスクを管理し、時間になったらジョブをエンキューします。
- スーパバイザ(supervisor)
- 設定に基づいてワーカーやディスパッチャをforkし、ハートビートを制御し、必要に応じて停止や開始のシグナルを送信します。
Solid Queueのスーパーバイザは、監視対象のワーカー/ディスパッチャ/スケジューラーごとに個別のプロセスをforkします。
Solid Queueは、デフォルトではconfig/queue.ymlにある設定ファイルの探索を試みますが、SOLID_QUEUE_CONFIG環境変数や-c/--config_fileオプションで探索パスを変更することも可能です。
bin/jobs -c config/calendar.yml
環境変数SOLID_QUEUE_SKIP_RECURRING=trueを設定することで、定期的なタスクをすべてスキップすることも可能です。これは、staging環境やreview環境、development環境のような定期タスクを実行したくない環境で有用です。bin/jobsコマンドに--skip-recurringオプションを指定しても同じ効果を得られます。
この設定ファイルは以下のような感じになります。
production:
dispatchers:
- polling_interval: 1
batch_size: 500
concurrency_maintenance_interval: 300
workers:
- queues: "*"
threads: 3
polling_interval: 2
- queues: [ real_time, background ]
threads: 5
polling_interval: 0.1
processes: 3
必須項目はなく、すべての項目はオプションです。設定がまったく指定されていない場合、Solid Queueはディスパッチャ1個とワーカー1個のデフォルト設定で実行されます。ディスパッチャやワーカーの一方だけを実行する場合は、以下のようにそのセクションのみを設定に含める必要があります。
production:
dispatchers:
- polling_interval: 1
batch_size: 500
concurrency_maintenance_interval: 300
上の設定ではディスパッチャが1個だけ実行され、ワーカーは実行されません。
さまざまなオプションの概要を以下に示します。
polling_interval- ワーカーとディスパッチャが追加ジョブをチェックする前に待機する時間を指定します(単位は秒)。デフォルト値は、ディスパッチャが
1秒、ワーカーが0.1秒です。 batch_size- ディスパッチャがジョブをディスパッチするときのバッチサイズを指定します。デフォルトは500です。
concurrency_maintenance_interval- ブロック中のジョブがブロック解除可能になったかどうかをディスパッチャがチェックするまで待機する時間を指定します(単位は秒)。詳しくはコンカレンシー制御を参照してください。デフォルト値は
600秒です。 queues-
ワーカーがジョブを選択するキューのリストを指定します。
*を指定すると、すべてのキューが対象となります(無指定の場合はこれがデフォルトです)。このオプションには単一のキューを指定することも、キューのリストを配列で指定することも可能です。
ジョブはこれらのキューから順にポーリングされるので、たとえば[ real_time, background ]を指定すると、real_timeで待機中のジョブがすべてなくなるまでbackgroundのジョブはワーカーに拾われなくなります。以下のようにワイルドカードを使って特定のキューにマッチするプレフィックスを指定することも可能です。staging: workers: - queues: staging* threads: 3 polling_interval: 5上の設定にすると、
stagingで始まるすべてのキューからジョブをフェッチするワーカーが作成されます。このワイルドカード*は、キュー名の末尾に1個しか書けません。つまり*_some_queueのようなキュー名は指定できません(指定しても無視されます)。
最後に、[ staging*, background ]のように名前のプレフィックスと正確な名前を組み合わせることも可能です。順序に関する振る舞いは、正確な名前のみを組み合わせた場合と同じになります。
キューの順序と優先度の組み合わせによる動作、およびワーカーごとにキューを指定する方法がパフォーマンスにどのように影響するかについては、後述のセクションを確認してください。 threads- 各ワーカーがジョブを実行するのに必要な最大スレッドプール数です。個別のワーカーはこの個数以下のジョブをフェッチし、スレッドプールに配置して実行します。この設定はワーカーだけにあり、デフォルトは
3です。ワーカースレッドごとに1つのコネクションを利用し、ポーリングとハートビート用に2つの追加コネクションが予約されるため、この値はキューデータベースのコネクションプールサイズから2を引いた値以下に設定することが推奨されます。 processes- 指定の設定でスーパバイザによってforkされるワーカープロセス数の個数を指定します。デフォルトは
1です(シングルプロセス)。この設定は、複数のCPUコアを1個のキュー(または同じ設定の複数のキュー)専用にしたい場合に便利です。この設定はワーカーだけにあります。 concurrency_maintenance- ディスパッチャでコンカレンシーのメンテナンスを行うかどうかを指定します(デフォルトでは
true)。この設定は、コンカレンシー制御を完全に無効にしたい場合や、ディスパッチャを複数実行している状態で一部のディスパッチャについては他の作業を行わずにディスパッチに専念させたい場合に便利です。
🔗 キューの順序と優先度
上述のように、あるワーカーでキューのリストをreal_time,backgroundのように指定すると、その順序でポーリングされます。この場合、real_timeに待機中のジョブがなくなるまでbackgroundのジョブはフェッチされなくなります。
Active Jobでは、ジョブをエンキューするときの優先度を正の整数で指定できます。Solid Queueは値が小さいほど優先度が高くなります。優先度のデフォルトは0です。
この設定は、重要度や緊急度が異なるジョブを同一のキューで実行する場合に便利です。
ただし優先度に基づいてジョブが選択されるのは同一キュー内の場合です。先ほどのreal_time,backgroundの例で説明すると、backgroundキューの優先度をより高くしても、real_timeキュー内のジョブは引き続きbackgroundキュー内のジョブよりも先にフェッチされます。
キューの順序と優先度を取り違えないためにも、指定するのはどちらか一方のみにとどめることをおすすめします。これにより、ジョブの実行順序が理解しやすくなります。
🔗 キューの仕様とパフォーマンス
ポーリングのパフォーマンスを維持し、カバーするインデックスが常に使われるようにするために、Solid Queueは以下の2種類のポーリングクエリのみを実行します。
-- キューでフィルタされない
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC, job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- 単一キューでフィルタ
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC, job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
第1のクエリは、以下を指定すると使われます。すべてのキューがターゲットになるため、一時停止中のキューがない場合に使われます。
queues: *
その他の場合、並べ替えでインデックスが使われるようにするには一度に1つのキューでしかフィルタできないため、フィルタするキューのリストを順序立てて用意する必要があります。つまり、キューを以下のように指定する場合、
queues: beta*
まず以下のようなクエリを用いて、そのプレフィックスに一致する既存のすべてのキューのリストを取得する必要があります。
SELECT DISTINCT(queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE 'beta%';
インデックスの一番左のカラムに対するこの種のDISTINCTクエリは、MySQLではLoose Index Scanと呼ばれる手法により非常に高速に実行できます。
ただし、PostgreSQLとSQLiteにはこの手法は実装されていないため、キューが非常に深くなり、solid_queue_ready_executionsテーブルが非常に大きくなると、このクエリは遅くなります。
通常、solid_queue_ready_executionsテーブルは小さいのですが、そのように大きくなる可能性もあります。
プレフィックスを使う場合と同様に、一時停止中のキューがある場合にも同じことが起こります。
以下のようなクエリですべてのキューのリストを取得し、次に一時停止中のキューを削除する必要があるためです。
SELECT DISTINCT(queue_name)
FROM solid_queue_ready_executions
一般的に一時停止はめったに発生せず、特別な状況で短期間だけ行う必要があります。キューのジョブを処理したくない場合は、キューのリストから削除するのが最善の方法です。
💡まとめると、ポーリングで最適なパフォーマンスを確保したい場合、常に正確な名前を指定して、キューを一時停止しないようにするのが最善の方法です。
queues: [ background, backend ]
つまり、以下のような*を含む名前指定ではなく、上のように指定します。
queues: back*
🔗 スレッド、プロセス、シグナル
Solid Queueのワーカーは、スレッドプールを用いて複数スレッドでの作業を実行します。これは上述のthreadsパラメータで設定できます。その他に、1台のマシン上の複数プロセス(ワーカーごとにprocessesパラメータで設定可能)や水平スケーリングによってパラレリズム(並列処理)を実現できます。
スーパバイザはこれらのプロセスを管理しており、以下のシグナルに応答します。
TERM、INT- 正常な終了処理を開始します。スーパバイザは監視対象プロセスに
TERMシグナルを送信し、SolidQueue.shutdown_timeoutで指定された期間までプロセス終了を待機します。期間を過ぎても監視対象プロセスが残っている場合は、終了の必要があることを示すQUITシグナルを残りのプロセスに送信します。 QUIT- 即時終了を開始します。スーパバイザは監視対象プロセスに
QUITシグナルを送信し、プロセスをただちに終了させます。
QUITシグナルを受信したときに、ワーカーに実行中のジョブがまだ残っている場合、これらのジョブはプロセスが登録解除されるときにキューに戻されます。
プロセス終了前にクリーンアップする機会がなかった場合(ケーブルが抜けてしまったなど)、 実行中のプロセスによってジョブの実行がリクエスト中のままになる可能性があります。プロセスはハートビートを送信し、スーパバイザはハートビートが期限切れのプロセスをチェックしてクリーンアップし、リクエスト中のジョブをキューに戻します。期限切れのハートビートを持つプロセスによって要求されたジョブは、SolidQueue::Processes::ProcessPrunedErrorで失敗としてマークされます。ハートビートの頻度と、プロセスがデッドであると見なすしきい値の両方を設定できます。これについては、以下のセクションを参照してください。
同様に、ワーカーが上記のシグナルによって開始されない他の方法で終了した場合(例: ワーカーにKILLシグナルが送信される)、進行中のジョブは、SolidQueue::Processes::Process::ProcessExitErrorで検査できるように失敗としてマークされます。特定のジョブがこれを担当することがあります(ジョブにメモリリークがあり、特定のメモリしきい値を超えるプロセスを強制終了するメカニズムがある場合など)。そのため、この方法はこのような状況を特定するのに役立ちます。
🔗 データベースを設定する
Solid Queueで利用するデータベースは、config/application.rbファイルまたはconfig/environments/production.rb環境設定ファイルのconfig.solid_queue.connects_toオプションで設定できます。デフォルトでは、インストール時に指定したデータベース設定に応じて、queueという名前の単一のデータベースが書き込みと読み出しの両方で使われます。
このとき、Active Recordのマルチデータベース用のオプションもすべて利用できます。
🔗 その他の設定項目
注意: 本セクションの設定は、config/application.rbまたは環境ごとの設定ファイルでconfig.solid_queue.silence_polling = trueのように設定する必要があります。
Solid Queueの振る舞いを制御する設定項目もいくつかあります。
logger- Solid Queueで使うロガーを指定します。デフォルトはアプリのロガーです。
app_executor- 非同期操作をラップするRails executorを指定します。デフォルトはアプリのexecutorです。
on_thread_error- 発生した例外を引数として受け取るカスタムlambda/Procを指定します。Solid Queueスレッド内でエラーが発生したときに呼び出されます。デフォルトは以下です。
-> (exception) { Rails.error.report(exception, handled: false) }
これは、ジョブ実行中に発生したエラーでは使われません。ジョブで発生したエラーは、Active Jobのretry_onまたはdiscard_onによって処理され、最終的には失敗したジョブになります。これは、Solid Queue自体で発生したエラー用です。
use_skip_locked- ロック読み取りを実行するときに
FOR UPDATE SKIP LOCKEDを使うかどうかを指定します。これは将来自動的に検出される予定ですが、現時点ではデータベースがこれをサポートしていない場合にのみ、これをfalseに設定する必要があります(MySQLの場合はバージョン8未満、PostgreSQLの場合はバージョン9.5未満)。SQLiteの場合、書き込みはシーケンシャルなのでこの設定は無効です。 process_heartbeat_interval- すべてのプロセスが従うハートビート間隔を指定します。デフォルトは60秒です。
process_alive_threshold- 最後のハートビートの後にプロセスが停止しているとみなされるまで待機する時間を指定します。デフォルトは5分です。
shutdown_timeout- 監視対象プロセスに
TERMシグナルを送信してから、即時終了を要求するQUITを送信するまでスーパーバイザが待機する時間を指定します。デフォルトは5秒です。 silence_polling-
ワーカーとディスパッチャの両方をポーリングするときに出力されるActive Recordログを抑制するかどうかを指定します。デフォルトは
trueです。 supervisor_pidfile- 同じホストで複数のスーパーバイザが実行されないようにするため、またはヘルスチェックで利用する場合に備えて、スーパーバイザが起動時に作成するpidファイルへのパスを指定します。デフォルトは
nilです。 preserve_finished_jobs-
完了したジョブを
solid_queue_jobsテーブルに保持するかどうかを指定します。デフォルトはtrueです。 clear_finished_jobs_afterpreserve_finished_jobsがtrueの場合に、完了したジョブを保持する期間を指定します(デフォルトは1日)。Solid Queueをインストールすると、完了したジョブを1時間ごとに12分目からバッチ処理で削除する定期的なジョブが自動的に設定されます。この設定を変更したい場合は、recurring.yml設定ファイルを編集できます。default_concurrency_control_period- コンカレンシー制御の
durationパラメータのデフォルト値を指定します。デフォルトは3分です。
🔗 ライフサイクルのフック
Solid Queueでは、スーパーバイザのライフタイムの中で以下の2箇所のポイントにフックをかけられます。
start- スーパーバイザの起動完了後で、かつワーカーとディスパッチャをforkする直前のタイミング。
stop- シグナル(
TERM、INT、QUIT)の受信後で、かつ正常なシャットダウンまたは即時シャットダウンを開始する直前のタイミング。
ワーカー、ディスパッチャ、スケジューラのライフサイクルでは、以下の2箇所のポイントにフックをかけられます。
(worker|dispatcher|scheduler)_start- ワーカー/ディスパッチャ/スケジューラの起動完了後で、かつポーリングループの開始または定期的なスケジュール読み込みの直前。
(worker|dispatcher|scheduler)_stop- シグナル(
TERM、INT、QUIT)の受信後で、かつ正常なシャットダウンまたは即時シャットダウン(exit!のみ)を開始する直前のタイミング。
これらのフックは、「スーパバイザ」「ワーカー」「ディスパッチャ」「スケジューラ」のインスタンスをブロックにyieldするように設計されており、ユーザーはこれを用いてログ出力やメトリクスレポートに用いる設定情報を読み取れます。
これらは、次のメソッドにブロックを渡すことで利用できます。
SolidQueue.on_start
SolidQueue.on_stop
SolidQueue.on_worker_start
SolidQueue.on_worker_stop
SolidQueue.on_dispatcher_start
SolidQueue.on_dispatcher_stop
SolidQueue.on_scheduler_start
SolidQueue.on_scheduler_stop
例:
SolidQueue.on_start do |supervisor|
MyMetricsReporter.process_name = supervisor.name
start_metrics_server
end
SolidQueue.on_stop do |_supervisor|
stop_metrics_server
end
SolidQueue.on_worker_start do |worker|
MyMetricsReporter.process_name = worker.name
MyMetricsReporter.queues = worker.queues.join(',')
end
これらは、フックを複数追加するために複数回呼び出すことも可能ですが、これらの呼び出しはSolid Queueが開始される前に行う必要があります。呼び出しはイニシャライザで行うのが適切でしょう。
🔗 エンキュー時のエラー
Solid Queueは、ジョブをエンキューするときに発生するActive Recordエラーに対してSolidQueue::Job::EnqueueErrorをraiseします。
ActiveJob::EnqueueErrorをraiseしない理由は、このエラーはActive Jobによって処理されるので、perform_laterに渡す必要のなるブロック内でジョブをyieldするためには、perform_laterが falseを返してjob.enqueue_errorを設定する必要があるためです。
これは独自のジョブには非常にうまく機能しますが、Rails(またはTurbo::Streams::BroadcastJobやActiveStorage::AnalyzeJobなどの他のgem)によってエンキューされたジョブの場合は、perform_laterの呼び出しを制御できないため、失敗の処理が非常に難しくなります。
定期実行されるタスクの場合は、タスクに対応するジョブをエンキューするときにこのようなエラーがraiseすると、処理とログ出力は行われますが、エラーが上位に伝達されません。
🔗 コンカレンシー制御
Solid Queueは、Active Jobをコンカレンシー制御で拡張することで、特定の型や特定の引数を持つジョブの最大コンカレント実行可能数を制限できます。
この方法で制限をかけると、ジョブの実行はデフォルトでブロックされ、別のジョブが終了してブロックが解除されるか、設定されている有効期間(コンカレンシー制限のduration)を過ぎるまで、ブロックされたままになります。
または、指定の条件にマッチするジョブは、ブロックせずに廃棄するよう設定することも可能です。つまり、特定の引数を指定したジョブが既にエンキューされている場合は、(同じコンカレンシークラス内で)同じ条件を持つ他のジョブはエンキューされなくなります。
class MyJob < ApplicationJob
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour
# ...
key- 唯一の必須パラメータであり、ジョブ引数をパラメータとして受け取ります。渡すジョブ引数は、制限する必要のあるジョブを識別するのに用いられ、「シンボル」「文字列」「proc」のいずれかを渡すことが可能です。procがActive Recordのレコードを返す場合、キーはそのクラス名と
idからビルドされます。 to- デフォルト値は
1です。 duration- デフォルトで
SolidQueue.default_concurrency_control_periodの値(デフォルト値は3.minutes)が設定されますが、設定の変更も可能です。 group- 異なるジョブクラスをグループ化してコンカレンシーを制御するのに使います。デフォルトではジョブクラス名が使われます。
on_conflict- ジョブがキューに登録されて設定済みの最大コンカレンシー数と競合したときの振る舞いを指定します。以下のいずれか一方を設定できます。
-:block(デフォルト): あふれたジョブは一時的にブロックされ、他のジョブが完了してブロック解除されるか、durationで設定した期間を経過したときに再びディスパッチされる。
-:discard: あふれたジョブは破棄される。このオプションを選択する場合は、ジョブが実行されたもののコンカレンシーロック(または後述のセマフォ)の解除に失敗したときに、durationで設定した期間を経過するまでは、そのジョブと競合するジョブはすべて破棄されることを理解しておく必要がある。
これらの制御がジョブで指定されている場合、同じkeyを生成するジョブは最大数(toで指定される)までコンカレントに実行されるようになり、この保証はキューに入れられた各ジョブのdurationの間継続します。しかしジョブの実行順序については保証されず、保証されるのは同時に(重複)実行される ジョブについてのみである点にご注意ください。
コンカレンシーの制限では、エンキュー時にセマフォ(semaphore)の概念が利用されます。これは次のように機能します。
ジョブがエンキューされると、コンカレンシー制御が指定されているかどうかを確認します。指定されている場合は、計算された同時実行キーのセマフォを確認します。
セマフォがオープンの場合は、セマフォをリクエストしてジョブをreadyに設定します。ready とは、ワーカーがそれを取得して実行できることを意味します。ジョブの実行が終了すると (成功または失敗して実行が失敗した場合)、セマフォにシグナルを送信して、同じキーを持つ次のジョブがあれば、そのブロック解除を試みます。ここで、次のジョブのブロックを解除することは、そのジョブをただちに実行するという意味ではなく、ジョブがblockedからreadyに移行するという意味です。
on_conflictにdiscardを指定した場合は、セマフォがクローズドである期間にエンキューされたジョブは破棄されます。
最初のジョブがセマフォを解放したときに、次のジョブのブロック解除が何らかの形で妨げられる可能性があるため(例: ワーカーが実行されているマシンの電源プラグを誰かが抜いた場合)、フェイルセーフとしてdurationが用意されています。
期間を超えてブロックされているジョブは解放の候補になりますが、各ジョブが「セマフォダンスチェック」をパスしなければならないため、コンカレンシールールで許可されている数だけ解放されます。つまり、このdurationは、実際にはエンキューされたジョブや実行中のジョブに関するものではなく、ブロックされている待機中のジョブに関するものか、セマフォがクローズドの期間中に破棄されたジョブに関するものです。
ここで重要なのは、1件以上のジョブ候補のブロック状態が(ジョブが完了、もしくはduration期限を過ぎてセマフォが解放されたことで)終了すると、まだブロック状態にあるdurationタイマーもリセットされることです。これは、セマフォの有効期限が更新されることで間接的に行われます。
on_conflictにdiscardを指定する形で競合を処理する場合は、実行中のジョブが何らかの理由でセマフォを解放するのに失敗したときに、最大でdurationで設定した期間を経過するまでは、ジョブが破棄される可能性が生じる可能性があります。
以下の例を見てみましょう。
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to: 2, key: ->(contact) { contact.account }, duration: 5.minutes
def perform(contact)
# ...
上のcontactとaccountはActiveRecordのレコードです。この場合、同じaccountに対するDeliverAnnouncementToContactという種類のジョブが最大2個コンカレントに実行されるようになります。
何らかの理由でジョブの1つが5分以上かかった場合や、ジョブを取得してから5分以内にコンカレンシーロックが解放されなかった場合は、同じキーを持つ別の新しいジョブがロックを取得する可能性があります。
groupの別の利用例を見てみましょう。
class Box::MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key: ->(contact) { contact }, duration: 15.minutes, group: "ContactActions"
def perform(contact)
# ...
class Bundle::RebundlePostingsJob < ApplicationJob
limits_concurrency key: ->(bundle) { bundle.contact }, duration: 15.minutes, group: "ContactActions"
def perform(bundle)
# ...
上の場合、contactレコード(idは123)用のBox::MovePostingsByContactToDesignatedBoxJobというジョブと、bundleレコード(contactレコード123を参照している)用のBundle::RebundlePostingsJobという別のジョブが同時にエンキューされると、処理の続行を許されるジョブはどちらか1つだけとなります。続行を許されなかった方のジョブは、最初のジョブが完了するまで(または最初に何が起ころうと15分経過するまで)ブロックされたままになります。
ここで注意が必要なのは、durationの設定は、ディスパッチャで設定したconcurrency_maintenance_intervalの値に間接的に依存している点です。この値は、ブロックされたジョブをチェックしてブロックを解除する作業の頻度を指定します(このとき、コンカレンシーキーごとに最大1件のジョブだけがブロック解除されます)。durationの値は、一般に「すべてのジョブがその期間内で適切に完了する」ように設定すべきであり、コンカレンシーのメンテナンスタスクは、あくまで「万が一異常が発生したときのフェイルセーフ」として考える必要があります。
ジョブのブロック解除は優先度順に行われますが、ジョブのブロック解除ではキューの順序は考慮されない点に注意が必要です。つまり、コンカレンシーグループを共有しているジョブのグループが別のキューにも存在する場合や、同じクラスのジョブが別のキューにもエンキューされている場合は、ブロックされたジョブのブロック解除時に、ワーカーに設定したキューの順序は考慮されません。
その理由は、実行中のジョブが次のジョブのブロックを解除するときに、そのジョブ自身が特定のワーカーのキューの順序を認識していなければ(キュー順序の異なるワーカーが複数存在する可能性すらあります)、優先度しか認識しようがないためです。ブロックされたジョブがブロック解除されてポーリング可能になると、キューの順序に沿ってワーカーによって取得されます。
最後に、自動または手動で再試行されるようになっているジョブが失敗すると、その失敗したジョブは、新たにエンキューされる別のジョブと同じように扱われます。つまり、どちらもエンキューされてロックを取得しようとし、ロックを取得すれば実行されます。ジョブが過去にロックを既に取得していたかどうかは関係ありません。
🔗 スケジューリングされたジョブ
(Active Jobのwaitオプションやwait_untilオプションで)将来実行されるよう設定されたジョブに対してコンカレント実行数制限を指定する場合、このコンカレント実行数制限が適用されるのはスケジューリングした時点ではなく、実行予定時刻である点にご注意ください。
以下のジョブで考えてみましょう。
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to: 1, key: ->(contact) { contact.account }, duration: 5.minutes
def perform(contact)
# (省略)
以下のように数件のジョブがエンキューされるとします。
DeliverAnnouncementToContactJob.set(wait: 10.minutes).perform_later(contact)
DeliverAnnouncementToContactJob.set(wait: 10.minutes).perform_later(contact)
DeliverAnnouncementToContactJob.set(wait: 30.minutes).perform_later(contact)
この3件のジョブはスケジューリング用のキューに登録されて、実行開始時刻になるまでそのキューで待機します。
10分後に最初の2件のキューがエンキューされると、1件目のジョブが先に実行されるため、2件目のジョブは最初のうちはブロックされる可能性が高くなります。
そして、最初の2件のジョブが数秒で終了した場合は、3件目のジョブは通常通りエンキューされます。
スケジューリングされたジョブは、通常はバッチとしてエンキューされますが、コンカレンシー制御を指定したジョブは、バッチエンキューではなく、1件ずつエンキューされる必要があります。そのため、大量のジョブをバッチエンキューしたときと同様にパフォーマンスが低下します。
したがって、一般に、待ち状態のジョブやスケジューリングされたジョブに対してコンカレンシー制御を指定するのは一般に避けることをおすすめします(詳しくは次を参照)。
🔗 パフォーマンス上の考慮事項
コンカレンシー制御は、BlockedExecutionを作成して実行ready状態(ReadyExecution)に昇格させたり、セマフォレコードを作成・更新したりする必要が生じるため、著しいオーバーヘッドが発生します。このため、コンカレンシー制御が本当に必要かどうかについて慎重に検討する必要があります。特に、limit値に1よりもずっと大きな値を設定する形でスロットリングを行いたい場合は、以下のように処理ワーカー数をキュー単位で制限する方がよいでしょう。
class ThrottledJob < ApplicationJob
queue_as :throttled
production:
workers:
- queues: throttled
threads: 1
polling_interval: 1
- queues: default
threads: 5
polling_interval: 0.1
processes: 3
セットアップによっては、これと似たような手法が使えることもあります。ジョブをエンキューするときに、別のキューをジョブに指定することで、ジョブのエンキュー先をスロットリング用キューにするか、それとも別のキューにするかを、引数で指定することも、Active Jobガイドで説明されているようにブロックにqueue_asを指定することも可能になります。
同様に、(Active Jobのperform_all_laterで)バルクエンキューしたジョブに対してコンカレンシー制御を併用するのは好ましくありません。こうしたジョブに対してコンカレンシー制御を行うと、コンカレンシーを維持するために1件ずつ確実にエンキューされる必要があり、バルクエンキューするメリットがすべて失われてしまいます。
コンカレンシー制御とon_conflict: :discardオプションを指定したジョブをバルクエンキューする場合は、エンキューされずに破棄されるジョブについてはsuccessfully_enqueuedにfalseが設定されます。これにより、perform_all_laterメソッドが返すエンキュー済みジョブの件数に、エンキューされずに破棄されたジョブが含まれないようになります。
🔗 失敗したジョブとリトライ
Solid Queueには自動リトライのしくみはなく、自動リトライについてはActive Jobに依存しています。「失敗した実行」はシステム内で保持され、そうしたジョブ用にsolid_queue_failed_executionsテーブルのレコードが作成されます。このジョブは、手動で破棄するか再度エンキューするまで残り続けます。
ジョブの破棄やリトライは以下のようにコンソールで実行できます。
failed_execution = SolidQueue::FailedExecution.find(...) # 自分のジョブに関連する失敗した実行を検索する
failed_execution.error # エラーを調べる
failed_execution.retry # 初回と同様にジョブを新たにエンキューする
failed_execution.discard # ジョブをシステムから削除する
ただし、失敗したジョブを調べてリトライや破棄を実行できるmission_control-jobsというダッシュボードがあるので、チェックすることをおすすめします。
🔗 ジョブのエラーを報告する
Railsと統合されている一部のエラートラッキングサービス(SentryやRollbarなど)は、Active Jobにフックする形で、ジョブ実行中に発生した未処理のエラーを自動的に報告します。ただし、エラートラッキングシステムがこれを行わない場合や、またはカスタムレポートが必要な場合は、以下の方法でActive Jobに手動でフックすることが可能です。
# application_job.rb
class ApplicationJob < ActiveJob::Base
rescue_from(Exception) do |exception|
Rails.error.report(exception)
raise exception
end
end
上のロジックを複製してActionMailer::MailDeliveryJobにも配置する必要があることにご注意ください。これは、ActionMailerがApplicationJobを継承せず、代わりに ActiveJob::Baseから継承したActionMailer::MailDeliveryJobを利用するためです。
# application_mailer.rb
class ApplicationMailer < ActionMailer::Base
ActionMailer::MailDeliveryJob.rescue_from(Exception) do |exception|
Rails.error.report(exception)
raise exception
end
end
🔗 Pumaプラグイン
Solid QueueのスーパバイザをPumaと一緒に実行して、Pumaで監視や管理を行いたい場合に利用できるPumaプラグインを提供しています。puma.rb設定ファイルに以下を追加するだけで利用できます。
plugin :solid_queue
Pumaを使っているdevelopment環境でSolid Queueを使いたくない場合は、以下のような環境変数でプラグインの利用を回避してください。
plugin :solid_queue if ENV["SOLID_QUEUE_IN_PUMA"]
これにより、production環境でのみSolid Queueが設定されます。Rails 8のデフォルトのPuma設定はこのようになります。
さもないと、Pumaを使っているdevelopment環境でSolid Queueを使っていない場合に、Pumaを起動するとSolid Queueスーパーバイザーも起動され、適切に構成されないため失敗する可能性が高くなります。
注意: 現時点では、段階的再起動(phased restart)はサポートされていません(動作するためには、プラグインでアプリのプリロードを有効にする必要があります)。
🔗 ジョブとトランザクションの整合性について
警告: ジョブをアプリケーションデータと同じACID準拠のデータベースに配置すると、強力なツールが有効になります。つまり、トランザクション整合性を利用して、ジョブがコミットされない限りアプリ内の特定のアクションもコミットされないようにする(およびその逆)ことが可能になります。また、エンキューされたトランザクションがコミットされるまで、ジョブがエンキューされないようにします。これは非常に強力かつ有用ですが、将来Active Jobのバックエンドを別のものに差し替えたり、Solid Queueを単に専用のデータベースに移動して振る舞いが急に変更されたりすると、逆効果になる可能性があります。これは非常に難しい場合があり、多くの人は気にする必要がないため、デフォルトでは、Solid Queueはメインアプリとは別のデータベースで構成されます。
Rails 8以降は、Active Jobが提供する、このトランザクションの整合性に依存しないオプションは、Active Recordトランザクション内でのジョブのキューへの登録を、そのトランザクションが正常にコミットされるまで延期することです。このオプションは、ジョブレベルのenqueue_after_transaction_commitクラスメソッドを介して設定できます(デフォルトでは無効)1。
以下のようにApplicationJobを介することで、個々のジョブやすべてのジョブに対して有効にできます。
class ApplicationJob < ActiveJob::Base
self.enqueue_after_transaction_commit = true
end
このオプションを使えば、アプリと同じデータベースでSolid Queueを使うことも可能ですが、トランザクションの整合性には依存しません。
このオプションを設定していない場合でも、トランザクション整合性に悪影響が生じないようにしたいときは、以下が行われるようにしてください。
- 特定レコードに依存するジョブは、常に
after_commitコールバックでエンキューするか、ジョブが使うデータが「ジョブがエンキューされる前に」確実にコミットされる場所でエンキューする。
または、
- Solid Queueがアプリと別のデータベースを使うように設定を変更し、アプリケーションのリクエストやジョブ実行を処理するスレッド上で「別のコネクションが使われる」ようにSolid Queueのデータベースを設定する(例↓)
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
connects_to database: { writing: :primary, reading: :replica }
...
config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } }
🔗 定期的なタスク
Solid Queueは、将来の特定の時刻になると定期的に実行される、cronジョブのような繰り返しタスクの定義をサポートします。これらのタスクはスケジューラのプロセスによって管理され、独自の設定ファイルで定義されます。
デフォルトでは、ファイルはconfig/recurring.ymlに配置されますが、SOLID_QUEUE_RECURRING_SCHEDULE環境変数 を使うか、以下のようにbin/jobsコマンドに--recurring_schedule_fileオプションを指定することで、このパスを変更可能です。
bin/jobs --recurring_schedule_file=config/schedule.yml
定期的なタスクは、環境変数でSOLID_QUEUE_SKIP_RECURRING=trueを設定するか、bin/jobsコマンドに--skip-recurringオプションを追加することで完全に無効にできます。
設定自体は以下のようになります。
production:
a_periodic_job:
class: MyJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
a_cleanup_task:
command: "DeletedStuff.clear_all"
schedule: every day at 9am
タスクはハッシュ(辞書)形式で指定され、ハッシュのキーは内部でタスクのキーになります。各タスクには、class(エンキューされるジョブクラスとなる)を指定するか、command(スケジュールに沿ってsolid_queue_recurringキューにエンキューされるジョブ (SolidQueue::RecurringJob)のコンテキストで評価される`)を指定する必要があります。
各タスクにはスケジュールも必要です。スケジュールはFugitで解析されるため、Fugitにcronとして渡せる引数はすべて渡せます。
各タスクには以下のオプションも指定できます。
args- ジョブに渡す引数。「単一の引数」「ハッシュ」「引数の配列」のいずれかを指定(配列を渡す場合は、末尾の要素にキーワード引数を含めることも可能です)。
上の設定例のジョブは、次のように毎秒(1秒に1回)エンキューされます。
MyJob.perform_later(42, status: "custom_status")
queue- ジョブをエンキューするのに使う別のキューを指定します。ない場合は、そのジョブクラス用に設定されたキューが使われます。
priority- ジョブをエンキューするのに使われる優先度の数値を指定します。
タスクは、指定の時刻になると、そのタスクを所有するディスパッチャによってエンキューされ、各タスクは次のタスクをスケジューリングします。この手法は、good_job gemの機能から多くのインスピレーションを得ています。
commandとして定義される定期的なタスクを実行するジョブクラスは、以下のように変更することも可能です。
Rails.application.config.after_initialize do # or to_prepare
SolidQueue::RecurringTask.default_job_class = MyRecurringCommandJob
end
同一のrecurring_tasksを利用して複数のスケジューラを実行することも可能です。重複したタスクが同時にエンキューされるのを防ぐために、ジョブがエンキューされるのと同じトランザクション内に新しくsolid_queue_recurring_executionsテーブルが作成されます。
個別のタスクで1度に1個のエントリだけが作成されることを保証するために、このテーブルのtask_keyとrun_atにunique indexが設定されます。この機能は、preserve_finished_jobsをtrueに設定した場合にのみ有効で(デフォルトではtrue)、この保証はジョブが維持されている限り適用されます。
注: 単一の定期的スケジュールはサポートされているので、同一スケジュールを利用するスケジューラを複数使うことは可能ですが、設定の異なるスケジュールを複数利用することはできません。
最後に、Solid Queueで処理しないジョブを構成することも可能です。これは、アプリで以下のような形でジョブを実行するだけでできます。
class MyResqueJob < ApplicationJob
self.queue_adapter = :resque
def perform(arg)
# ..
end
end
Solid Queueで以下のコンフィグを利用することも可能です。
dispatchers:
- recurring_tasks:
my_periodic_resque_job:
class: MyResqueJob
args: 22
schedule: "*/5 * * * *"
この場合、ジョブはperform_laterによってエンキューされるので、Resqueで実行されます。ただし、このジョブはどのsolid_queue_recurring_executionレコードでもトラッキングされなくなるので、ジョブのエンキューが1回限りであることは保証されなくなります。
🔗 インスパイアされたプロジェクト
Solid Queueは、resqueとGoodJobにインスパイアされています。これらのプロジェクトは私たちが多くを学んだ素晴らしい事例なので、ぜひチェックしてみてください。
🔗 License
このgemは、MIT Licenseに基づいてオープンソースとして利用可能です。
関連記事
-
訳注:
enqueue_after_transaction_commitが利用可能になるのはRails 7.2からです。Automatically delay Active Job enqueues to after commit by casperisfine · Pull Request #51426 · rails/rails ↩
概要
MITライセンスに基づいて翻訳・公開いたします。
日本語タイトルは内容に即したものにしました。
Solid Queue用の公式GUIダッシュボード「Mission Control Jobs」も使えます↓。