Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)
maintenance_tasksは、メンテナンスタスクをジョブキューに入れて管理するRailsエンジンです。
このプロジェクトにおける「メンテナンスタスク」は、いわゆるデータのマイグレーション(つまりデータベース内のデータを変更するコード、多くは最終的にスキーママイグレーションをサポートするのが目的)を意味します。
たとえば、NOT NULL
カラムを新たに導入する場合、以下を行わなければなりません。
- そのカラムをnullable(NULL許容)として追加する
- そのカラムに値をバックフィルする
- 最後にそのカラムを
NOT NULL
に変更する
maintenance_tasksのエンジンは、上のステップ2である「バックフィル(backfill: 値の埋め戻し)」を支援します。
メンテナンスタスクはコレクションベースのタスクであり、データベース内のデータをActive Recordで更新するのが普通です。このメンテナンスタスクは一時停止・中断が可能です。また、メンテナンスタスクをバッチで実行することも、データベースの負荷制御のためにスロットリングすることも可能です。
メンテナンスタスクは定期的に実行するものではなく、必要が生じたときに1回だけ実行されることが前提です。メンテナンスタスクは一時的にしか必要とされないので、利用後は削除されるのが普通です。
maintenance_tasksのRailsエンジンには、メンテナンスタスクの「一覧表示」「ステータス確認」「開始」「一時停止」「再起動」を行えるWebベースのUIが搭載されています。
🔗 メンテナンスタスクはどんなときに必要か
maintenance_tasksに搭載されているジョブUIは、あくまで用途に特化した機能に限定されています。maintenance_tasksのRailsエンジンを用いて、目的以外のデータ変更タスク(サポートリクエストのデータ変更など)を提供することも一応可能ですが、そうなるとこのエンジンで提供できる以上の柔軟性が求められることになってしまうので、そうしたユースケースにはmaintenance_tasksではなく通常のアプリケーションコードを使うことをおすすめします。
Active Jobとして実行すべきでないタスクは、おそらくこのgemの用途に合わないでしょう。
- タスクをバックグラウンドで実行する必然性がなければ、代わりにランナースクリプトの利用を検討してください。
- タスクを中断可能にする必然性がなければ、通常のActive Jobの利用を検討してください。
メンテナンスタスクは、反復中に中断可能です。
タスクがコレクションベースではない場合や、バッチが極めて大きい場合は、スロットリング(イテレーションごとに休止をはさむ)や中断機能で得られるメリットは限られます。これで問題ない場合もありますが、通常のActive Jobよりも複雑なメンテナンスタスクをわざわざ追加する手間に見合わない可能性もあります。
タスクで更新する対象がデータではなくデータベーススキーマの場合は、メンテナンスタスクではなく、通常のマイグレーションタスクをお使いください。
定期的に発生するタスクを扱うのであれば、maintenance_tasks gemよりも、Active Jobにスケジューラやcron、job-iterationやrails_adminのカスタムUIを組み合わせることを検討しましょう。
新規アプリケーション上でseedデータを作成したい場合は、maintenance_tasks gemではなく、db/seeds.rb
をお使いください。
途中で終わったマイグレーションを適切に処理できないアプリケーションの場合は、おそらくmaintenance_tasks gemは適切なツールではありません。maintenance_tasks gemは、メンテナンスタスクが「一時停止可能」「キャンセル可能」であることを意図しています。
🔗 インストール
gemをインストールしてジェネレータを実行するには、以下を実行します
bundle add maintenance_tasks
bin/rails generate maintenance_tasks:install
このジェネレータは、必要なテーブルをデータベースに追加するためのマイグレーションを作成して実行します。また、maintenance_tasks用のルーティングをconfig/routes.rb
にマウントします。デフォルトでは、/maintenance_tasks
という新しいパスでメンテナンスタスクにアクセスできます。
例外レポートサービス(Bugsnagなど)を利用している場合は、エラーハンドラを定義する必要が生じるでしょう。詳しくはエラーハンドラのカスタマイズを参照してください。
🔗 Active Jobへの依存について
maintenance_tasksフレームワークは、タスクをバックグラウンドで実行するためにActive Jobに依存します。Active Jobのキューイングバックエンドはデフォルトでは非同期です。コードやインフラストラクチャの変更中にタスクの進行状況が失われるのを防ぐために、キューイングを永続化バックエンドに変更することを強く推奨します。キューイングバックエンドの設定方法について詳しくは、Active Jobガイドを参照してください。
🔗 オートロードについての注意
maintenance_tasksフレームワークは、:classic
モードでのオートロードをサポートしていません。
利用するには、アプリケーションのコードでZeitwerkが使われていることを必ず確認してください。詳しくはRailsガイドの定数のオートロードとリロードを参照してください。
🔗 利用法
maintenance_tasksの典型的なワークフローは以下のような感じになります。
- タスクを記述するためのクラスを生成して実行したい作業を記述する
- 以下のいずれかの方法でタスクを実行する
- 組み込みのWeb UIを使う
- コマンドラインを使う
- Rubyでタスクを実行する
- 以下のいずれかの方法でタスクを監視する
- 組み込みのWeb UI
- タスクの実行ステータスをデータベースで手動確認する
4.不要になったタスクを削除する(オプション)
🔗 タスクを作成する
タスク作成用のジェネレータが提供されているので、以下を実行して新しいタスクを生成します。
bin/rails generate maintenance_tasks:task update_posts
これで、app/tasks/maintenance/update_posts_task.rb
というタスクファイルが作成されます。
生成されたタスクは、MaintenanceTasks::Task
のサブクラスです。このタスクには以下を実装します。
collection
: 反復処理の対象となるActive Recordリレーション、または配列を返しますprocess
: メンテナンスタスクの作業を1個のレコードに対して実行します
オプションとして、タスクに#count
メソッドをカスタム実装することで、イテレーションする要素の個数を定義することも可能です。
タスクのtick_total
(経過時間)はコレクションのサイズに応じて自動的に算出されますが、必要であればこの値を#count
メソッドでオーバーライドできます(例: コレクションのサイズを決定するためのクエリ生成を避けたい場合)。
タスクの例:
# app/tasks/maintenance/update_posts_task.rb
module Maintenance
class UpdatePostsTask < MaintenanceTasks::Task
def collection
Post.all
end
def process(post)
post.update!(content: "New content!")
end
end
end
🔗 バッチサイズをカスタマイズする
Active Recordリレーションを用いてレコードを処理する場合、レコードは内部でバッチで取得されたうえでレコードごとに#process
メソッドに渡されます。
メンテナンスタスクがデフォルトで取得するデフォルトのバッチ数は100個ですが、バッチサイズはcollection_batch_size
マクロで変更可能です。
# app/tasks/maintenance/update_posts_task.rb
module Maintenance
class UpdatePostsTask < MaintenanceTasks::Task
# Fetch records in batches of 1000
collection_batch_size(1000)
def collection
Post.all
end
def process(post)
post.update!(content: "New content!")
end
end
end
🔗 CSV処理用のタスクを作成する
CSVファイルをイテレーションするタスクも作成できます。
CSVタスクを作成するには、Active Storageも設定しておく必要があることにご注意ください。Active Storageで必要な依存関係がアプリケーションのGemfileで指定済みであることを確認してから、セットアップ手順に沿って設定してください。後述のActive Storageサービスのカスタマイズ方法も参照してください。
以下のコマンドを実行してCSVタスクを生成します。
bin/rails generate maintenance_tasks:task import_posts --csv
生成されたタスクはMaintenanceTasks::Task
のサブクラスです。ここに以下を実装します。
process
:CSV::Row
で行いたいメンテナンスタスクを記述する
# app/tasks/maintenance/import_posts_task.rb
module Maintenance
class ImportPostsTask < MaintenanceTasks::Task
csv_collection
def process(row)
Post.create!(title: row["title"], content: row["content"])
end
end
end
posts.csv
:
title,content
My Title,Hello World!
Active Storageのサービスプロバイダにアップロードされたファイルは、ISO 8601形式のタイムスタンプを含み、タスク名をスネークケース形式にする形でリネームされます。
暗黙の#count
メソッドは、ファイルの正確な行数を決定するために、ファイル全体を読み込んで解析します。ファイルが数百万行になると処理に時間がかかるので、nil
を返すcount
を定義する、行数を近似する(例: 新規の行数をカウントする)などの方法で、カウントをスキップすることを検討してください。
def count(task)
task.csv_content.count("\n") - 1
end
🔗 CSVオプション
タスクにcsv_collection
キーワード引数を追加することで、RubyのCSVパーサー用のオプションをタスクに渡せるようになります。
# app/tasks/maintenance/import_posts_task.rb
module Maintenance
class ImportPosts
csv_collection(skip_lines: /^#/, converters: ->(field) { field.strip })
def process(row)
Post.create!(title: row["title"], content: row["content"])
end
end
end
これらのオプションは、#
で始まる行をスキップするようRuby CSVパーサーに指示し、すべてのフィールドから冒頭と末尾のスペース文字を削除します。これによって、以下のファイルは前述のサンプルとまったく同様に処理されます。
posts.csv
:
# コメント行
title,content
My Title ,Hello World!
🔗 CSVタスクのバッチ処理
タスクでは、CSVをバッチで処理できます。タスク内のcsv_collection
マクロに、以下のようにin_batches
オプションを追加します。
# app/tasks/maintenance/batch_import_posts_task.rb
module Maintenance
class BatchImportPostsTask < MaintenanceTasks::Task
csv_collection(in_batches: 50)
def process(batch_of_rows)
Post.insert_all(post_rows.map(&:to_h))
end
end
end
通常のCSVタスクと同様に、以下のメソッドも実装しておくこと。
process
: タスクで(CSV::Row
オブジェクトの配列に)行いたい処理をバッチで記述する
このとき、#count
はコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(CSVの全行数ではなく)バッチ単位で表示されます。
CSVタスクをバッチ化しない場合、実質的なバッチサイズが1となり、データベース操作の効率が落ちる可能性があります。
🔗 バッチのコレクションを処理する
maintenance_tasks gemはActive Recordのバッチ処理をサポートしています。バッチ処理を使うことで、タスクがデータベースを呼び出す回数を削減できます。
処理をレコード単位ではなくバッチ単位で処理するには、コレクションが返すリレーションのActiveRecord::Batches#in_batches
を使います。
Active Recordのバッチサイズはデフォルトで1000レコードですが、カスタムのサイズも指定可能です。
# app/tasks/maintenance/update_posts_in_batches_task.rb
module Maintenance
class UpdatePostsInBatchesTask < MaintenanceTasks::Task
def collection
Post.in_batches
end
def process(batch_of_posts)
batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")
end
end
end
このタスクには以下のメソッドを実装しておく必要があります。
collection
:ActiveRecord::Batches::BatchEnumerator
を返しますprocess
: タスクの作業をActiveRecord::Relation
に対してバッチで行います
このとき、#count
はコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(リレーションのレコード数ではなく)バッチ単位で表示されます。
重要!
バッチ処理は、#process
が#update_all
や#delete_all
を実行する場合に限ってお使いください。
個別のレコードをイテレーションしなければならない場合は、ActiveRecord::Relation
を返すコレクションを定義する必要があります。これは内部でバッチ処理を行いますが、レコードを1件のクエリで読み込む点が異なります。
逆にバッチのコレクションは、最初にバッチのレコードから主キーを読み込んでから、#process
メソッド内でeach
(またはEnumerable
の任意のメソッド)を呼び出すたびに追加クエリを実行してレコードを読み込みます。
🔗 コレクションの不要なタスク
場合によっては、シンプルな操作を1個だけ実行したいことがあります(例: バックグラウンドジョブを追加する、外部APIにクエリをかける)。maintenance_tasks gemは、コレクションなしのタスクもサポートしています。
以下を実行して、コレクションなしのタスクを生成します。
bin/rails generate maintenance_tasks:task no_collection_task --no-collection
生成されたタスクはMaintenanceTasks::Task
のサブクラスです。ここに以下を実装します。
process
: メンテナンスタスクで行いたい処理を実行する
# app/tasks/maintenance/no_collection_task.rb
module Maintenance
class NoCollectionTask < MaintenanceTasks::Task
no_collection
def process
SomeAsyncJob.perform_later
end
end
end
🔗 タスクでカスタムのEnumeratorを使う
サポートされていないコレクション型(APIで取得した外部リソースなど)をイテレーションしなければならない特殊なユースケースでは、タスク内にenumerator_builder(cursor:)
を実装することでイテレーションできます。
このメソッドは、[item, cursor]
というペアを生成するEnumerator
を返す必要があります。
maintenance_tasksは、イテレーションの現在のカーソル位置を永続化して、タスクが中断または再開したときにcursor
と引数として提供します。このcursor
はString
として保存されるので、カスタムのEnumerator
は必要に応じてこの値のシリアライズ/デシリアライズを処理する必要があります。
# app/tasks/maintenance/custom_enumerator_task.rb
module Maintenance
class CustomEnumeratorTask < MaintenanceTasks::Task
def enumerator_builder(cursor:)
after_id = cursor&.to_i
PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum
end
def process(post)
Post.create!(post)
end
end
end
🔗 スロットリング
maintenance_tasksでは大量のデータを変更することが多いため、データベースに負荷をかけることがあります。そのため、maintenance_tasksはスロットリングメカニズムを提供しており、特定の条件が満たされたときにタスクをスロットリングできます。
タスクがスロットリングされると(スロットルブロックがtrue
を返すと)、タスクを中断し、バックオフ期間をすぎるとタスクをリトライします。デフォルトのバックオフ期間は30秒です。
スロットリングの条件はブロックを渡す形で指定します。
# app/tasks/maintenance/update_posts_throttled_task.rb
module Maintenance
class UpdatePostsThrottledTask < MaintenanceTasks::Task
throttle_on(backoff: 1.minute) do
DatabaseStatus.unhealthy?
end
def collection
Post.all
end
def process(post)
post.update!(content: "New content added on #{Time.now.utc}")
end
end
end
アプリのスロットリング条件を適切に定義するのは、開発者にかかっています。ShopifyではDatabaseStatus.healthy?
を実装する形で、MySQLのさまざまなメトリクス(レプリケーションラグ、DBスレッド、DB書き込み可能かどうかなど)をチェックしています。
タスクには複数のスロットリング条件を定義できます。スロットリング条件は子孫クラスに継承され、新しい条件を追加しても既存の条件に影響しません。
バックオフ期間は、引数なしのProc
として指定することも可能です。
# app/tasks/maintenance/update_posts_throttled_task.rb
module Maintenance
class UpdatePostsThrottledTask < MaintenanceTasks::Task
throttle_on(backoff: -> { RandomBackoffGenerator.generate_duration } ) do
DatabaseStatus.unhealthy?
end
# ...
end
end
🔗 カスタムのタスクパラメータ
タスクを実行するためにパラメータで追加情報を指定する必要が生じることがあります。タスク内でパラメータをActive Model属性として定義することで、#collection
や#count
や#process
などのタスクメソッドでパラメータを利用可能になります。
# app/tasks/maintenance/update_posts_via_params_task.rb
module Maintenance
class UpdatePostsViaParamsTask < MaintenanceTasks::Task
attribute :updated_content, :string
validates :updated_content, presence: true
def collection
Post.all
end
def process(post)
post.update!(content: updated_content)
end
end
end
タスクにパラメータを定義するときに、Active Modelのバリデーションも活用できます。パラメータを受け取れるタスクに引数を渡すと、タスクの実行開始前にバリデーションされます。引数はユーザーインターフェイス内のテキスト領域入力経由で指定されるので、引数がタスクで想定している形式に準拠するようにし、必要に応じてサニタイズすることが重要です。
🔗 カーソルカラムをカスタマイズしてパフォーマンスを向上させる
maintenance_tasks gemが依存しているjob-iteration gemは、collection
メソッドが返すリレーションにORDER BY
句を追加してレコードをイテレーション可能にします(デフォルトではid
カラム順でイテレーションされます)。
job-iteration gemは、カーソルの順序付けにどのカラムを使うかというbuild_active_record_enumerator_on_records
コンフィグをサポートしています。
maintenance-tasks gemは、job-iteration
gemが提供するカーソルカラムの制御機能を、MaintenanceTasks::Task
クラスのcursor_columns
メソッドで公開します。
cursor_columns
がnil
を返す場合、クエリの結果は主キー順になります。イテレーション中にカーソルカラムの値が変更されると、レコードがスキップされたり、同じレコードが何度も生成されてしまう可能性があります。
module Maintenance
class UpdatePostsTask < MaintenanceTasks::Task
def cursor_columns
[:created_at, :id]
end
def collection
Post.where(created_at: 2.days.ago...1.hour.ago)
end
def process(post)
post.update!(content: "updated content")
end
end
end
🔗 instrumentationイベントへのサブスクライブ
特定のタスクイベントに対して操作を行いたい場合は、後述するタスクコールバックセクションを参照してください。
ただし、すべてのイベントをサブスクライブしたい場合は、以下のActive Support通知をどのタスクでも利用できます。
enqueued.maintenance_tasks # タスクがユーザーによってエンキューされるとこのイベントがpublishされる
succeeded.maintenance_tasks # タスクがエラーなしで完了するとこのイベントがpublishされる
cancelled.maintenance_tasks # ユーザーが何らかのタスクを明示的に停止するとこのイベントがpublishされる
paused.maintenance_tasks # ユーザーがタスクを実行中に一時停止するとこのイベントがpublishされる
errored.maintenance_tasks # タスクのコードがunhandled exceptionを発生するとこのイベントがpublishされる
これらの通知は、メンテナンスタスクのライフサイクルをアプリケーション内で監視する方法を提供します。
利用例:
ActiveSupport::Notifications.subscribe("succeeded.maintenance_tasks") do |*, payload|
task_name = payload[:task_name]
arguments = payload[:arguments]
metadata = payload[:metadata]
job_id = payload[:job_id]
run_id = payload[:run_id]
time_running = payload[:time_running]
started_at = payload[:started_at]
ended_at = payload[:ended_at]
rescue => e
Rails.logger.error(e)
end
ActiveSupport::Notifications.subscribe("errored.maintenance_tasks") do |*, payload|
task_name = payload[:task_name]
error = payload[:error]
error_message = error[:message]
error_class = error[:class]
error_backtrace = error[:backtrace]
rescue => e
Rails.logger.error(e)
end
class MaintenanceTasksInstrumenter < ActiveSupport::Subscriber
attach_to :maintenance_tasks
def enqueued(event)
task_name = event.payload[:task_name]
arguments = event.payload[:arguments]
metadata = event.payload[:metadata]
SlackNotifier.broadcast(SLACK_CHANNEL,
"Job #{task_name} was started by #{metadata[:user_email]}} with arguments #{arguments.to_s.truncate(255)}")
rescue => e
Rails.logger.error(e)
end
end
🔗 タスクのコールバックを使う
タスクのライフサイクルにフックをかける以下のコールバックが提供されています。
after_start
after_pause
after_interrupt
after_cancel
after_complete
after_error
module Maintenance
class UpdatePostsTask < MaintenanceTasks::Task
after_start :notify
def notify
NotifyJob.perform_later(self.class.name)
end
# ...
end
end
注: after_error
は完了することが保証されているので、コールバックのコード内で発生する例外はすべて無視されます。 after_error
コールバックのコードが例外をraise
する場合は、そのコールバック内で適切にrescue
して処理する必要があります。
module Maintenance
class UpdatePostsTask < MaintenanceTasks::Task
after_error :dangerous_notify
def dangerous_notify
# This error is rescued and ignored in favour of the original error causing the error flow.
raise NotDeliveredError
end
# ...
end
end
その他のコールバックが例外を発生した場合は、エラーハンドラによって処理され、タスクが実行を停止します。
🔗 タスクを記述するときに考慮すべき点
maintenance_tasksは、タスクを処理するジョブを実行するために、アプリケーション用に設定されたキューアダプタに依存します。タスクの記述方法に関するガイドラインはキューアダプタによって異なる部分もありますが、一般には以下のルールに沿って書く必要があります。
Task#process
の実行時間に注意すること
コレクションの要素1個の処理が25秒を超えないようにする。
あるいは、Sidekiqやアプリケーションに設定済みのキューアダプタに設定されている実行時間を超えないようにする。
バッチを小さめにしておくことで、タスクが中断や再開を安全に行えるようになります。-
Task#process
を冪等にすること
コレクションの同じ要素に対してprocess
が複数回実行されても安全になるようにすること。詳しくはSidekiqのベストプラクティスを参照。
タスクでエラーが発生した場合、エラーの原因になった要素が再度処理される可能性があるため、要素の処理を再実行可能にすることが重要です。
これは、上の状況でイテレーション期間がタイムアウトした場合に特に重要です(ジョブが再度エンキューされると1個以上の要素が再処理される可能性があるため)。
🔗 タスクオブジェクトのライフサイクルとメモ化
タスクが実行または再開されると、ランナーはタスク処理用のジョブをエンキューします。このジョブはタスクオブジェクトをインスタンス化し、タスクオブジェクトはそのジョブが続く間存在し続けます。ジョブが初めて実行されると、タスクはcount
を呼び出します。ジョブが実行されるたびに、タスクはタスクオブジェクトのcollection
を呼び出し、続いてコレクションの項目ごとにprocess
を呼び出します。コレクションの処理が完了するか、ジョブの最大実行時間を超えると、ジョブは停止します。
メモ化された値は、同じジョブ内における以後のprocess
呼び出しでも参照可能になるため、process
内でメモ化を行うとおかしなことになる可能性があります。
それでもメモ化はスロットリングやレポート生成で引き続き利用可能です。たとえば、レポートを永続化したりログ出力するのにタスクのコールバックを利用できます。
🔗 タスクのテストを書く
タスクジェネレータを実行すると、test/tasks/maintenance/
ディレクトリにタスクのテストファイルも作成されます。少なくともタスクの#process
メソッドについてはテストを書いておくことをおすすめします。タスクの#collection
メソッドや#count
メソッドが複雑になってきたら、これらについてもテストを書くのがよいでしょう。
テストコードの例:
# test/tasks/maintenance/update_posts_task_test.rb
require "test_helper"
module Maintenance
class UpdatePostsTaskTest < ActiveSupport::TestCase
test "#process performs a task iteration" do
post = Post.new
Maintenance::UpdatePostsTask.process(post)
assert_equal "New content!", post.content
end
end
end
🔗 CSVタスクのテストを書く
CSVタスクでも#process
メソッドのテストを書きましょう。テストではCSV::Row
を引数として受け取ります。CSVのある行、または文字列キーのハッシュをテストに渡すことで、#process
をテストできます。
# test/tasks/maintenance/import_posts_task_test.rb
require "test_helper"
module Maintenance
class ImportPostsTaskTest < ActiveSupport::TestCase
test "#process performs a task iteration" do
assert_difference -> { Post.count } do
Maintenance::UpdatePostsTask.process({
"title" => "My Title",
"content" => "Hello World!",
})
end
post = Post.last
assert_equal "My Title", post.title
assert_equal "Hello World!", post.content
end
end
end
🔗 パラメータ付きタスクのテストを書く
パラメータを受け取れるタスクのテストでは、属性を代入するためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、通常どおり#process
をテストできるようになります。
# test/tasks/maintenance/update_posts_via_params_task_test.rb
require "test_helper"
module Maintenance
class UpdatePostsViaParamsTaskTest < ActiveSupport::TestCase
setup do
@task = UpdatePostsViaParamsTask.new
@task.updated_content = "Testing"
end
test "#process performs a task iteration" do
assert_difference -> { Post.first.content } do
@task.process(Post.first)
end
end
end
end
🔗 カスタムEnumeratorを使うタスクをテストする
カスタムEnumeratorを使うタスクのテストでは、#enumerator_builder
を呼び出すためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、#enumerator_builder
が返すEnumeratorが[item, cursor]
ペアを期待通りに生成することを検証できるようになります。
# test/tasks/maintenance/custom_enumerating_task.rb
require "test_helper"
module Maintenance
class CustomEnumeratingTaskTest < ActiveSupport::TestCase
setup do
@task = CustomEnumeratingTask.new
end
test "#enumerator_builder returns enumerator yielding pairs of [item, cursor]" do
enum = @task.enumerator_builder(cursor: 0)
expected_items = [:b, :c]
assert_equal 2, enum.size
enum.each_with_index do |item, cursor|
assert_equal expected_items[cursor], item
end
end
test "#process performs a task iteration" do
# ...
end
end
end
🔗 タスクを実行する
🔗 Web UIから実行する
Web UIを開いてタスクの"Run"をクリックすれば、新しいタスクを実行できます。
🔗 コマンドラインから実行する
以下のようにコマンドラインでタスクを実行することも可能です。
bundle exec maintenance_tasks perform Maintenance::UpdatePostsTask
CSVを処理するタスクをコマンドラインで実行するには、--csv
オプションを指定します。
bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv "path/to/my_csv.csv"
この--csv
オプションは、標準入力からCSVコンテンツを受け取るときにも有効です。
curl "some/remote/csv" |
bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv
引数を受け取るタスクをコマンドラインで実行するには、--arguments
オプションを指定し、続けて<キー>:<値>
ペアのセットを渡します。
bundle exec maintenance_tasks perform Maintenance::ParamsTask \
--arguments post_ids:1,2,3 content:"Hello, World!"
🔗 タスクをRubyから実行する
run
にタスク名を指定してランナーに送信すれば、タスクをRubyで実行することも可能です。
MaintenanceTasks::Runner.run(name: "Maintenance::UpdatePostsTask")
CSVをランナーで処理するタスクを実行するには、オープンするioオブジェクトとファイル名を含むハッシュをrun
に渡します。
MaintenanceTasks::Runner.run(
name: "Maintenance::ImportPostsTask",
csv_file: { io: File.open("path/to/my_csv.csv"), filename: "my_csv.csv" }
)
ランナーを使い、かつ引数を受け取るタスクを実行するには、 run
に{ パラメータ名: 引数の値 }
という引数セットを含むハッシュを渡します。
MaintenanceTasks::Runner.run(
name: "Maintenance::ParamsTask",
arguments: { post_ids: "1,2,3" }
)
🔗 タスクのステータスを監視する
Web UIでは、タスクの最新ステータス情報を得られます。タスクのステータスは以下のとおりです。
- new(新規)
- まだ実行されていないタスク。
- enqueued(エンキュー済み)
- ユーザーが実行を指示した後で実行待機中のタスク。
- running(実行中)
- 現在ジョブワーカーによって実行中の状態にあるタスク。
- pausing(一時停止処理中)
- ユーザーが一時停止を指示した後、停止前に作業を完了する必要のあるタスク。
- paused(一時停止済み)
- ユーザーによって一時停止され、現在は実行していないタスク。再開(resume)可能。
- interrupted(中断済み)
- ジョブインフラストラクチャによって一時的に中断されたタスク。
- cancelling(キャンセル処理中)
- ユーザーによってキャンセルされ、停止前に作業を完了する必要のあるタスク。
- cancelled(キャンセル済み)
- ユーザーによってキャンセルされ、現在は実行していないタスク。再開不可。
- succeeded(成功)
- 正常に完了したタスク。
- errored(エラー)
- 実行中にunhandled exceptionが発生したタスク。
🔗 maintenance_tasksがタスクを実行するしくみ
メンテナンスタスクの実行は長時間に及ぶ可能性があります。maintenance_tasks gemの目的は、「デプロイ」「Kubernetes Podのスケジューリング」「Heroku dynoの再起動」「その他のインフラストラクチャやコードの変更」を介してタスクの実行継続を支援することです。
つまり、process
メソッドが返されれば、以後はイテレーション処理の終盤に手動で介入する必要なしに、タスクの中断やエンキュー、再開を安全に行えるようになります。
デフォルトでは、実行中のタスクは5分以上経過すると自動的に中断されます。このタイムアウトはjob-iteration
gemで設定され、必要に応じてイニシャライザで調整可能です。
実行中のタスクも、必要に応じて自動的に中断および再度エンキューされます(例: デプロイでSidekiqワーカーがシャットダウンした場合)。
- Sidekiqは、
TSTP
シグナルかTERM
シグナルを受信すると、自分自身を停止中とみなします。 -
Sidekiqが停止中は、job-iterationがEnumeratorのイテレーションを停止します。
このときjob-iterationはイテレーション位置を保存したうえで、作業再開用の新しいジョブをエンキューし、タスクのステータスを"interrupted"にします。
Sidekiqが停止処理中の状態になると、ワーカーに終了まで25秒の猶予が与えられ、それをすぎると強制的にワーカーを終了します(これはデフォルト値: --timeout
オプションで変更可能)。
Sidekiqは、ワーカースレッドが終了する前にジョブの再エンキューを試みてタスクを再開しようとします。ただしこの場合、コレクションをどこまでイテレーションしたかという位置は保存されないため、1個以上のイテレーションが再実行されてしまう可能性があります。
Sidekiq以外のジョブキューでは、これを別の方法で処理しているものもあります。
🔗 タスクが詰まってしまった場合の対応方法
アプリケーションに設定されているキューアダプタにこのプロパティがない場合や、Sidekiqが「クラッシュ」「強制終了」または「実行中のキューを再エンキューできない状態」になると、タスクが詰まってしまったように見える(実行中のように見えるが実際は実行中ではない)ことがあります。
このような状況になると、タスクを一時停止またはキャンセルしようとしても、ステータスがpausing
やcancelling
のまま変わらなくなり、一時停止やキャンセルが行えなくなります。
回避方法として、タスクのcancelling
ステータスが5分以上続く場合に、タスクを再度キャンセルし、ステータスが完全にcancelled
に変わったら、再度実行できることがあります。
ステータスがpausing
のまま変わらない場合で、かつ(キャンセルや再実行を行わずに)タスクがどこまで処理を終えたかという位置を保持したい場合は、"Force pause"をクリックします。
🔗 maintenance_tasksの設定項目
maintenance_tasks gemにはいくつかの設定オプションがあります。カスタムの設定項目は、maintenance_tasks.rb
イニシャライザに配置する必要があります。
🔗 エラーハンドラをカスタマイズする
タスクの実行中に発生した例外はrescue
され、エラー情報は永続化されたうえでUI上に表示されます。
これを例外監視サービス(例: Bugsnag)に統合したい場合は、以下のようにエラーハンドラを定義できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.error_handler = ->(error, task_context, _errored_element) do
Bugsnag.notify(error) do |notification|
notification.add_metadata(:task, task_context)
end
end
エラーハンドラは、以下の3つの引数を受け取るlambdaにする必要があります。
error
: raiseする例外を指定するtask_context
: タスクとエラーの追加情報↓を含むハッシュtask_name
: エラーが発生したタスクの名前started_at
: タスクの開始時刻ended_at
: タスクのエラー発生時刻
注:task_context
は、コンテキストが収集される前のタイミングでタスクがエラーを生成した場合は空になる可能性があります(例: タスクを処理するジョブのデシリアライズが失敗した場合)。
errored_element
: タスクで例外が発生したときに処理中だった要素(存在する場合)
注: このオブジェクトを外部の例外監視サービスに渡す場合は、必ず機密データ漏洩防止のため「オブジェクトのサニタイズ」を行ってから、バグトラッカーと互換性のある「フォーマットへの変換」を行うこと。
たとえば、Bugsnagサービスは機密データ保護のため、Active RecordオブジェクトのIDとクラス名だけを送信します。しかしCSV行は文字列に変換されるとBugsnagにそのまま渡されてしまうため、レポートに追加する前にオブジェクトに含まれる個人データをフィルタで削除してください。
🔗 MaintenanceTasks
モジュールをカスタマイズする
以下のようにMaintenanceTasks.tasks_module
を設定することで、タスクに配置するモジュールを定義できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.tasks_module = "TaskModule"
この値が指定されていない場合は、デフォルトでMaintenance
になります。
🔗 タスクを名前空間で整理する
タスクは、app/tasks/maintenance
ディレクトリの下に任意の深さでネストできます。
たとえば、app/tasks/maintenance/team_name/service_name/update_posts_task.rb
というファイルに以下のタスクを定義できます。
module Maintenance
module TeamName
module ServiceName
class UpdatePostsTask < MaintenanceTasks::Task
def process(rows)
# ...
end
end
end
end
end
🔗 ベースとなるジョブクラスをカスタマイズする
MaintenanceTasks.job
コンフィグは、タスクで使うジョブクラスを定義するのに使えます。この設定はグローバルなので、ジョブクラスはアプリケーション内のあらゆるメンテナンスタスクで使われます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.job = "CustomTaskJob"
# app/jobs/custom_task_job.rb
class CustomTaskJob < MaintenanceTasks::TaskJob
queue_as :low_priority
end
このジョブクラスは、MaintenanceTasks::TaskJob
を必ず継承する必要があります。
ただし、retry_on
はカスタムジョブクラスではサポートされないため、失敗したジョブのリトライはできない点にご注意ください。
🔗 タスク進捗状況の永続化頻度をカスタマイズする
MaintenanceTasks.ticker_delay
コンフィグは、タスクの進捗状況を永続化する頻度をカスタマイズするのに使えます。Numeric
値またはActiveSupport::Duration
値を指定できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.ticker_delay = 2.seconds
値を指定しない場合のデフォルト値は1秒です。
🔗 Active Storageで使うストレージサービスをカスタマイズする
Rails 6.1以降のActive Storageフレームワークは、複数のストレージサービスをサポートしています。使うサービスを指定するには、MaintenanceTasks.active_storage_service
コンフィグに、アプリケーションのconfig/storage.yml
で指定されているサービスのキーを設定できます。
# config/storage.yml
user_data:
service: GCS
credentials: <%= Rails.root.join("path/to/user/data/keyfile.json") %>
project: "my-project"
bucket: "user-data-bucket"
internal:
service: GCS
credentials: <%= Rails.root.join("path/to/internal/keyfile.json") %>
project: "my-project"
bucket: "internal-bucket"
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.active_storage_service = :internal
アプリケーションで利用するストレージサービスが1種類のみの場合は、このコンフィグオプションは不要です。その場合、Rails.configuration.active_storage.service
がデフォルトで使われます。
🔗 バックトレースクリーナーをカスタマイズする
MaintenanceTasks.backtrace_cleaner
コンフィグは、タスクのエラー時にバックトレースをクリーンアップして永続化するのに使うバックトレースクリーナーを指定するのに使えます。ActiveSupport::BacktraceCleaner
を使う必要があります。
# config/initializers/maintenance_tasks.rb
cleaner = ActiveSupport::BacktraceCleaner.new
cleaner.add_silencer { |line| line =~ /ignore_this_dir/ }
MaintenanceTasks.backtrace_cleaner = cleaner
値を指定しない場合は、デフォルトでRails.backtrace_cleaner
がバックトレースのクリーンアップに使われます。
🔗 Web UIで使われる親コントローラをカスタマイズする
MaintenanceTasks.parent_controller
コンフィグは、Web UIエンジンのすべてのコントローラに継承されるコントローラクラスを指定するのに使えます。
これにより、ApplicationController
(または任意のコントローラー)に共通ロジックを持つアプリケーションは、イニシャライザでシンプルな割当を行うだけで、Web UIがそのロジックを継承する設定がオプションで可能になります。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.parent_controller = "Services::CustomController"
# app/controllers/services/custom_controller.rb
class Services::CustomController < ActionController::Base
include CustomSecurityThings
include CustomLoggingThings
# ...
end
親コントローラの値は、必ず既存のコントローラクラスに対応する文字列でなければならず、その既存のコントローラはActionController::Base
を必ず継承していなければなりません。
値が指定されていない場合は、デフォルトで"ActionController::Base"
が使われます。
🔗 タスクが詰まっていると判断するまでの時間を設定する
タスクが更新されていない場合に、タスクが詰まっているとみなすまでのタイムアウト時間を設定するには、MaintenanceTasks.stuck_task_duration
コンフィグを使えます。この時間を設定するときは、ジョブインフラストラクチャのイベントがメンテナンスタスクのジョブ実行を妨げたりタスクをキャンセルしたりする分の時間も考慮して盛り込んでおく必要があります。
MaintenanceTasks.stuck_task_duration
の値にはActiveSupport::Duration
型を必ず使う必要があります。
値が指定されていない場合は、デフォルトで5分が設定されます。
🔗 メタデータ
MaintenanceTasks.metadata
コンフィグは、実行に関する追加情報を取得するprocを指定するのに使えます。このprocはMaintenanceTasks.parent_controller
のコンテキストで実行されるので、メンテナンスタスクを実行したユーザーのIDやメールアドレスを取得するのに利用できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.metadata = ->() do
{ user_email: current_user.email }
end
🔗 アップグレード方法
maintenance_tasksの新バージョンをチェックして更新するには、bundlerを使います。新バージョンをインストールした後は、インストールコマンドを再度実行すること。
bin/rails generate maintenance_tasks:install
これにより、新しいマイグレーションもインストールされて実行可能になります。
🔗 メンテナンスタスクの古いマイグレーションを削除してしまった場合の対応方法
maintenance_tasksのインストールコマンドは、古いマイグレーションの再インストールを試みるため、古いマイグレーションが削除されていると、マイグレーション中にデータベースで問題が発生します。
対応方法: bin/rails maintenance_tasks:install:migrations
を実行して、maintenance_tasksのマイグレーションをdb/migrate
ディレクトリにコピーしてください。
次に、maintenance_tasksに新しいマイグレーションが追加されていないかどうかをリリースノートで確認し、新しいマイグレーションがあれば残し、既に実行済みの古いマイグレーションは削除します。
以上の確認が終わったら、bin/rails db:migrate
コマンドでマイグレーションを実行します。
🔗 貢献方法
私たちは、問題報告用のissueや、プルリクエストによるコードへの貢献を受け付けています。貢献方法についてはCONTRIBUTING.mdのガイドラインを参照してください。
🔗 新バージョンのリリースについて
プルリクエストがマージされると、GitHub上の最新ドラフトリリースに追加されます。
新バージョンのリリース準備ができたら、以下の手順を実行します。
maintenance_tasks.gemspec
ファイルのspec.version
フィールドを更新します。bundle install
を実行して、Gemfile.lock
のgemバージョンを上げます。- プルリクエストをオープンし、承認を経てからマージします。
- Shipit経由でデプロイし、rubygems.orgに新バージョンが表示されていることを確認します。
- リリースノートにすべてのChangelogが記載されていることを確認してから、新バージョンを公開します。
- 次のリリースに備えて、“Upcoming Release”というタイトルのドラフトリリースをGitHub上に新規作成します。タグバージョンは空欄のままで構いません。これは、次期リリースに関連する変更をドキュメント化するときの開始ポイントになります。
概要
MITライセンスに基づいて翻訳・公開いたします。
本記事では、原則としてツール(gem、フレームワーク)の名前をmaintenance_tasksと表記します。
Shopifyが開発したmaintenance_tasksは、Railsガイドでも推奨されているDBのデータマイグレーション用gemです↓。
参考: 10.2 データのマイグレーション -- Active Record マイグレーション - Railsガイド