Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)
maintenance_tasksは、メンテナンスタスクをジョブキューに入れて管理するRailsエンジンです。
このプロジェクトにおける「メンテナンスタスク」は、いわゆるデータのマイグレーション(つまりデータベース内のデータを変更するコード、多くは最終的にスキーママイグレーションをサポートするのが目的)を意味します。
たとえば、NOT NULLカラムを新たに導入する場合、以下を行わなければなりません。
- そのカラムをnullable(NULL許容)として追加する
 - そのカラムに値をバックフィルする
 - 最後にそのカラムを
NOT NULLに変更する 
maintenance_tasksのエンジンは、上のステップ2である「バックフィル(backfill: 値の埋め戻し)」を支援します。
メンテナンスタスクはコレクションベースのタスクであり、データベース内のデータをActive Recordで更新するのが普通です。このメンテナンスタスクは一時停止・中断が可能です。また、メンテナンスタスクをバッチで実行することも、データベースの負荷制御のためにスロットリングすることも可能です。
メンテナンスタスクは定期的に実行するものではなく、必要が生じたときに1回だけ実行されることが前提です。メンテナンスタスクは一時的にしか必要とされないので、利用後は削除されるのが普通です。
maintenance_tasksのRailsエンジンには、メンテナンスタスクの「一覧表示」「ステータス確認」「開始」「一時停止」「再起動」を行えるWebベースのUIが搭載されています。
🔗 メンテナンスタスクはどんなときに必要か
maintenance_tasksに搭載されているジョブUIは、あくまで用途に特化した機能に限定されています。maintenance_tasksのRailsエンジンを用いて、目的以外のデータ変更タスク(サポートリクエストのデータ変更など)を提供することも一応可能ですが、そうなるとこのエンジンで提供できる以上の柔軟性が求められることになってしまうので、そうしたユースケースにはmaintenance_tasksではなく通常のアプリケーションコードを使うことをおすすめします。
Active Jobとして実行すべきでないタスクは、おそらくこのgemの用途に合わないでしょう。
- タスクをバックグラウンドで実行する必然性がなければ、代わりにランナースクリプトの利用を検討してください。
 - タスクを中断可能にする必然性がなければ、通常のActive Jobの利用を検討してください。
 
メンテナンスタスクは、反復中に中断可能です。
タスクがコレクションベースではない場合や、バッチが極めて大きい場合は、スロットリング(イテレーションごとに休止をはさむ)や中断機能で得られるメリットは限られます。これで問題ない場合もありますが、通常のActive Jobよりも複雑なメンテナンスタスクをわざわざ追加する手間に見合わない可能性もあります。
タスクで更新する対象がデータではなくデータベーススキーマの場合は、メンテナンスタスクではなく、通常のマイグレーションタスクをお使いください。
定期的に発生するタスクを扱うのであれば、maintenance_tasks gemよりも、Active Jobにスケジューラやcron、job-iterationやrails_adminのカスタムUIを組み合わせることを検討しましょう。
新規アプリケーション上でseedデータを作成したい場合は、maintenance_tasks gemではなく、db/seeds.rbをお使いください。
途中で終わったマイグレーションを適切に処理できないアプリケーションの場合は、おそらくmaintenance_tasks gemは適切なツールではありません。maintenance_tasks gemは、メンテナンスタスクが「一時停止可能」「キャンセル可能」であることを意図しています。
🔗 インストール
gemをインストールしてジェネレータを実行するには、以下を実行します
bundle add maintenance_tasks
bin/rails generate maintenance_tasks:install
このジェネレータは、必要なテーブルをデータベースに追加するためのマイグレーションを作成して実行します。また、maintenance_tasks用のルーティングをconfig/routes.rbにマウントします。デフォルトでは、/maintenance_tasksという新しいパスでメンテナンスタスクにアクセスできます。
このgemでは、Railsのエラーレポーターを使ってエラーを通知します。バグトラッキングサービスを利用している場合はレポーターにサブスクライブするとよいでしょう。詳しくはエラーを通知するを参照してください。
🔗 Active Jobへの依存について
maintenance_tasksフレームワークは、タスクをバックグラウンドで実行するためにActive Jobに依存します。Active Jobのキューイングバックエンドはデフォルトでは非同期です。コードやインフラストラクチャの変更中にタスクの進行状況が失われるのを防ぐために、キューイングを永続化バックエンドに変更することを強く推奨します。キューイングバックエンドの設定方法について詳しくは、Active Jobガイドを参照してください。
🔗 Action ControllerとAction Viewへの依存について
maintenance_tasksフレームワークは、UIのレンダリングにAction ControllerとAction Viewを使います。RailsをAPI専用モードで利用している場合は、API専用アプリケーションで利用するを参照してください。
🔗 オートロードについての注意
maintenance_tasksフレームワークは、:classicモードでのオートロードをサポートしていません。
利用するには、アプリケーションのコードでZeitwerkが使われていることを必ず確認してください。詳しくはRailsガイドの定数のオートロードとリロードを参照してください。
🔗 利用法
maintenance_tasksの典型的なワークフローは以下のような感じになります。
- タスクを記述するためのクラスを生成して実行したい作業を記述する
 - 以下のいずれかの方法でタスクを実行する
- 組み込みのWeb UIを使う
 - コマンドラインを使う
 - Rubyでタスクを実行する
 
 - 以下のいずれかの方法でタスクを監視する
- 組み込みのWeb UI
 - タスクの実行ステータスをデータベースで手動確認する
4.不要になったタスクを削除する(オプション) 
 
🔗 タスクを作成する
タスク作成用のジェネレータが提供されているので、以下を実行して新しいタスクを生成します。
bin/rails generate maintenance_tasks:task update_posts
これで、app/tasks/maintenance/update_posts_task.rbというタスクファイルが作成されます。
生成されたタスクは、MaintenanceTasks::Taskのサブクラスです。このタスクには以下を実装します。
collection: 反復処理の対象となるActive Recordリレーション、または配列を返しますprocess: メンテナンスタスクの作業を1個のレコードに対して実行します
オプションとして、タスクに#countメソッドをカスタム実装することで、イテレーションする要素の個数を定義することも可能です。
タスクのtick_total(経過時間)はコレクションのサイズに応じて自動的に算出されますが、必要であればこの値を#countメソッドでオーバーライドできます(例: コレクションのサイズを決定するためのクエリ生成を避けたい場合)。
タスクの例:
# app/tasks/maintenance/update_posts_task.rb
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    def collection
      Post.all
    end
    def process(post)
      post.update!(content: "New content!")
    end
  end
end
🔗 バッチサイズをカスタマイズする
Active Recordリレーションを用いてレコードを処理する場合、レコードは内部でバッチで取得されたうえでレコードごとに#processメソッドに渡されます。
メンテナンスタスクがデフォルトで取得するデフォルトのバッチ数は100個ですが、バッチサイズはcollection_batch_sizeマクロで変更可能です。
# app/tasks/maintenance/update_posts_task.rb
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    # Fetch records in batches of 1000
    collection_batch_size(1000)
    def collection
      Post.all
    end
    def process(post)
      post.update!(content: "New content!")
    end
  end
end
🔗 CSV処理用のタスクを作成する
CSVファイルをイテレーションするタスクも作成できます。
CSVタスクを作成するには、Active Storageも設定しておく必要があることにご注意ください。Active Storageで必要な依存関係がアプリケーションのGemfileで指定済みであることを確認してから、セットアップ手順に沿って設定してください。後述のActive Storageサービスのカスタマイズ方法も参照してください。
以下のコマンドを実行してCSVタスクを生成します。
bin/rails generate maintenance_tasks:task import_posts --csv
生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。
process:CSV::Rowで行いたいメンテナンスタスクを記述する
# app/tasks/maintenance/import_posts_task.rb
module Maintenance
  class ImportPostsTask < MaintenanceTasks::Task
    csv_collection
    def process(row)
      Post.create!(title: row["title"], content: row["content"])
    end
  end
end
posts.csv:
title,content
My Title,Hello World!
Active Storageのサービスプロバイダにアップロードされたファイルは、ISO 8601形式のタイムスタンプを含み、タスク名をスネークケース形式にする形でリネームされます。
暗黙の#countメソッドは、ファイルの正確な行数を決定するために、ファイル全体を読み込んで解析します。ファイルが数百万行になると処理に時間がかかるので、nilを返すcountを定義する、行数を近似する(例: 新規の行数をカウントする)などの方法で、カウントをスキップすることを検討してください。
def count(task)
  task.csv_content.count("\n") - 1
end
🔗 CSVオプション
タスクにcsv_collectionキーワード引数を追加することで、RubyのCSVパーサー用のオプションをタスクに渡せるようになります。
# app/tasks/maintenance/import_posts_task.rb
module Maintenance
  class ImportPosts
    csv_collection(skip_lines: /^#/, converters: ->(field) { field.strip })
    def process(row)
      Post.create!(title: row["title"], content: row["content"])
    end
  end
end
これらのオプションは、#で始まる行をスキップするようRuby CSVパーサーに指示し、すべてのフィールドから冒頭と末尾のスペース文字を削除します。これによって、以下のファイルは前述のサンプルとまったく同様に処理されます。
posts.csv:
# コメント行
title,content
 My Title ,Hello World!
🔗 CSVタスクのバッチ処理
タスクでは、CSVをバッチで処理できます。タスク内のcsv_collectionマクロに、以下のようにin_batchesオプションを追加します。
# app/tasks/maintenance/batch_import_posts_task.rb
module Maintenance
  class BatchImportPostsTask < MaintenanceTasks::Task
    csv_collection(in_batches: 50)
    def process(batch_of_rows)
      Post.insert_all(post_rows.map(&:to_h))
    end
  end
end
通常のCSVタスクと同様に、以下のメソッドも実装しておくこと。
process: タスクで(CSV::Rowオブジェクトの配列に)行いたい処理をバッチで記述する
このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(CSVの全行数ではなく)バッチ単位で表示されます。
CSVタスクをバッチ化しない場合、実質的なバッチサイズが1となり、データベース操作の効率が落ちる可能性があります。
🔗 バッチのコレクションを処理する
maintenance_tasks gemはActive Recordのバッチ処理をサポートしています。バッチ処理を使うことで、タスクがデータベースを呼び出す回数を削減できます。
処理をレコード単位ではなくバッチ単位で処理するには、コレクションが返すリレーションのActiveRecord::Batches#in_batchesを使います。
Active Recordのバッチサイズはデフォルトで1000レコードですが、カスタムのサイズも指定可能です。
# app/tasks/maintenance/update_posts_in_batches_task.rb
module Maintenance
  class UpdatePostsInBatchesTask < MaintenanceTasks::Task
    def collection
      Post.in_batches
    end
    def process(batch_of_posts)
      batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")
    end
  end
end
このタスクには以下のメソッドを実装しておく必要があります。
collection:ActiveRecord::Batches::BatchEnumeratorを返しますprocess: タスクの作業をActiveRecord::Relationに対してバッチで行います
このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(リレーションのレコード数ではなく)バッチ単位で表示されます。
重要!
バッチ処理は、#processが#update_allや#delete_allを実行する場合に限ってお使いください。
個別のレコードをイテレーションしなければならない場合は、ActiveRecord::Relationを返すコレクションを定義する必要があります。これは内部でバッチ処理を行いますが、レコードを1件のクエリで読み込む点が異なります。
逆にバッチのコレクションは、最初にバッチのレコードから主キーを読み込んでから、#processメソッド内でeach(またはEnumerableの任意のメソッド)を呼び出すたびに追加クエリを実行してレコードを読み込みます。
🔗 コレクションの不要なタスク
場合によっては、シンプルな操作を1個だけ実行したいことがあります(例: バックグラウンドジョブを追加する、外部APIにクエリをかける)。maintenance_tasks gemは、コレクションなしのタスクもサポートしています。
以下を実行して、コレクションなしのタスクを生成します。
bin/rails generate maintenance_tasks:task no_collection_task --no-collection
生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。
process: メンテナンスタスクで行いたい処理を実行する
# app/tasks/maintenance/no_collection_task.rb
module Maintenance
  class NoCollectionTask < MaintenanceTasks::Task
    no_collection
    def process
      SomeAsyncJob.perform_later
    end
  end
end
🔗 タスクでカスタムのEnumeratorを使う
サポートされていないコレクション型(APIで取得した外部リソースなど)をイテレーションしなければならない特殊なユースケースでは、タスク内にenumerator_builder(cursor:)を実装することでイテレーションできます。
このメソッドは、[item, cursor]というペアを生成するEnumeratorを返す必要があります。
maintenance_tasksは、イテレーションの現在のカーソル位置を永続化して、タスクが中断または再開したときにcursorと引数として提供します。このcursorはStringとして保存されるので、カスタムのEnumeratorは必要に応じてこの値のシリアライズ/デシリアライズを処理する必要があります。
# app/tasks/maintenance/custom_enumerator_task.rb
module Maintenance
  class CustomEnumeratorTask < MaintenanceTasks::Task
    def enumerator_builder(cursor:)
      after_id = cursor&.to_i
      PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum
    end
    def process(post)
      Post.create!(post)
    end
  end
end
🔗 スロットリング
maintenance_tasksでは大量のデータを変更することが多いため、データベースに負荷をかけることがあります。そのため、maintenance_tasksはスロットリングメカニズムを提供しており、特定の条件が満たされたときにタスクをスロットリングできます。
タスクがスロットリングされると(スロットルブロックがtrueを返すと)、タスクを中断し、バックオフ期間をすぎるとタスクをリトライします。デフォルトのバックオフ期間は30秒です。
スロットリングの条件はブロックを渡す形で指定します。
# app/tasks/maintenance/update_posts_throttled_task.rb
module Maintenance
  class UpdatePostsThrottledTask < MaintenanceTasks::Task
    throttle_on(backoff: 1.minute) do
      DatabaseStatus.unhealthy?
    end
    def collection
      Post.all
    end
    def process(post)
      post.update!(content: "New content added on #{Time.now.utc}")
    end
  end
end
アプリのスロットリング条件を適切に定義するのは、開発者にかかっています。ShopifyではDatabaseStatus.healthy?を実装する形で、MySQLのさまざまなメトリクス(レプリケーションラグ、DBスレッド、DB書き込み可能かどうかなど)をチェックしています。
タスクには複数のスロットリング条件を定義できます。スロットリング条件は子孫クラスに継承され、新しい条件を追加しても既存の条件に影響しません。
バックオフ期間は、引数なしのProcとして指定することも可能です。
# app/tasks/maintenance/update_posts_throttled_task.rb
module Maintenance
  class UpdatePostsThrottledTask < MaintenanceTasks::Task
    throttle_on(backoff: -> { RandomBackoffGenerator.generate_duration } ) do
      DatabaseStatus.unhealthy?
    end
    # ...
  end
end
🔗 カスタムのタスクパラメータ
タスクを実行するためにパラメータで追加情報を指定する必要が生じることがあります。タスク内でパラメータをActive Model属性として定義することで、#collectionや#countや#processなどのタスクメソッドでパラメータを利用可能になります。
# app/tasks/maintenance/update_posts_via_params_task.rb
module Maintenance
  class UpdatePostsViaParamsTask < MaintenanceTasks::Task
    attribute :updated_content, :string
    validates :updated_content, presence: true
    def collection
      Post.all
    end
    def process(post)
      post.update!(content: updated_content)
    end
  end
end
タスクにパラメータを定義するときに、Active Modelのバリデーションも活用できます。パラメータを受け取れるタスクに引数を渡すと、タスクの実行開始前にバリデーションされます。引数はユーザーインターフェイス内のテキスト領域入力経由で指定されるので、引数がタスクで想定している形式に準拠するようにし、必要に応じてサニタイズすることが重要です。
🔗 タスクのパラメータをバリデーションする
タスクの属性は、Active Modelのバリデーション機能でバリデーションできます。属性のバリデーションは、タスクがエンキューされる前に行われます。
たとえば、属性でin:オプションによる範囲バリデーションを使っている場合、その値のセットは画面のドロップダウン項目で表示されます。以下の型がサポートされています。
- 配列
 - Taskインスタンスをオプションで受け取って配列を返す、Procまたはlambda
 - 引数を1個(Taskインスタンス)受け取って配列を返す、呼び出し可能オブジェクト
 - 配列を返すメソッド(このメソッドはTaskインスタンスで呼び出される)
 
サポートされる型に合わないenumerableの場合は、代わりにテキストフィールドがレンダリングされます。
🔗 タスクパラメータをマスクする
タスククラスにmask_attributeクラスメソッドを追加することで、タスク属性をUIでマスクできます。
これにより、UI内の引数リストの値が[FILTERED]に置き換えられます。
# app/tasks/maintenance/sensitive_params_task.rb
module Maintenance
  class SensitiveParamsTask < MaintenanceTasks::Task
    attribute :sensitive_content, :string
    mask_attribute :sensitive_content
  end
end
パラメータをグローバルなRailsパラメータフィルタでフィルタしている場合は、パラメータをマスクするときにもそれらのパラメータが自動的に考慮されます。つまり、グローバルなRailsパラメータフィルタにパラメータを追加すれば、すべてのタスクでパラメータをマスクできます。
Rails.application.config.filter_parameters += %i[token]
🔗 カーソルカラムをカスタマイズしてパフォーマンスを向上させる
maintenance_tasks gemが依存しているjob-iteration gemは、collectionメソッドが返すリレーションにORDER BY句を追加してレコードをイテレーション可能にします(デフォルトではidカラム順でイテレーションされます)。
job-iteration gemは、カーソルの順序付けにどのカラムを使うかというbuild_active_record_enumerator_on_recordsコンフィグをサポートしています。
maintenance-tasks gemは、job-iteration gemが提供するカーソルカラムの制御機能を、MaintenanceTasks::Taskクラスのcursor_columnsメソッドで公開します。
cursor_columnsがnilを返す場合、クエリの結果は主キー順になります。イテレーション中にカーソルカラムの値が変更されると、レコードがスキップされたり、同じレコードが何度も生成されてしまう可能性があります。
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    def cursor_columns
      [:created_at, :id]
    end
    def collection
      Post.where(created_at: 2.days.ago...1.hour.ago)
    end
    def process(post)
      post.update!(content: "updated content")
    end
  end
end
🔗 instrumentationイベントへのサブスクライブ
特定のタスクイベントに対して操作を行いたい場合は、後述するタスクコールバックセクションを参照してください。
ただし、すべてのイベントをサブスクライブしたい場合は、以下のActive Support通知をどのタスクでも利用できます。
enqueued.maintenance_tasks    # タスクがユーザーによってエンキューされるとこのイベントがpublishされる
succeeded.maintenance_tasks   # タスクがエラーなしで完了するとこのイベントがpublishされる
cancelled.maintenance_tasks   # ユーザーが何らかのタスクを明示的に停止するとこのイベントがpublishされる
paused.maintenance_tasks      # ユーザーがタスクを実行中に一時停止するとこのイベントがpublishされる
errored.maintenance_tasks     # タスクのコードがunhandled exceptionを発生するとこのイベントがpublishされる
これらの通知は、メンテナンスタスクのライフサイクルをアプリケーション内で監視する方法を提供します。
利用例:
ActiveSupport::Notifications.subscribe("succeeded.maintenance_tasks") do |*, payload|
  task_name = payload[:task_name]
  arguments = payload[:arguments]
  metadata = payload[:metadata]
  job_id = payload[:job_id]
  run_id = payload[:run_id]
  time_running = payload[:time_running]
  started_at = payload[:started_at]
  ended_at = payload[:ended_at]
rescue => e
  Rails.logger.error(e)
end
ActiveSupport::Notifications.subscribe("errored.maintenance_tasks") do |*, payload|
  task_name = payload[:task_name]
  error = payload[:error]
  error_message = error[:message]
  error_class = error[:class]
  error_backtrace = error[:backtrace]
rescue => e
  Rails.logger.error(e)
end
class MaintenanceTasksInstrumenter < ActiveSupport::Subscriber
  attach_to :maintenance_tasks
  def enqueued(event)
    task_name = event.payload[:task_name]
    arguments = event.payload[:arguments]
    metadata = event.payload[:metadata]
    SlackNotifier.broadcast(SLACK_CHANNEL,
      "Job #{task_name} was started by #{metadata[:user_email]}} with arguments #{arguments.to_s.truncate(255)}")
  rescue => e
    Rails.logger.error(e)
  end
end
🔗 タスクのコールバックを使う
タスクのライフサイクルにフックをかける以下のコールバックが提供されています。
after_startafter_pauseafter_interruptafter_cancelafter_completeafter_error
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    after_start :notify
    def notify
      NotifyJob.perform_later(self.class.name)
    end
    # ...
  end
end
注:  after_errorは完了することが保証されているので、コールバックのコード内で発生する例外はすべて無視されます。 after_errorコールバックのコードが例外をraiseする場合は、そのコールバック内で適切にrescueして処理する必要があります。
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    after_error :dangerous_notify
    def dangerous_notify
      # This error is rescued and ignored in favour of the original error causing the error flow.
      raise NotDeliveredError
    end
    # ...
  end
end
その他のコールバックが例外を発生した場合は、エラーハンドラによって処理され、タスクが実行を停止します。
🔗 タスクを記述するときに考慮すべき点
maintenance_tasksは、タスクを処理するジョブを実行するために、アプリケーション用に設定されたキューアダプタに依存します。タスクの記述方法に関するガイドラインはキューアダプタによって異なる部分もありますが、一般には以下のルールに沿って書く必要があります。
Task#processの実行時間に注意すること
コレクションの要素1個の処理が25秒を超えないようにする。
あるいは、Sidekiqやアプリケーションに設定済みのキューアダプタに設定されている実行時間を超えないようにする。
バッチを小さめにしておくことで、タスクが中断や再開を安全に行えるようになります。- 
Task#processを冪等にすること
コレクションの同じ要素に対してprocessが複数回実行されても安全になるようにすること。詳しくはSidekiqのベストプラクティスを参照。
タスクでエラーが発生した場合、エラーの原因になった要素が再度処理される可能性があるため、要素の処理を再実行可能にすることが重要です。
これは、上の状況でイテレーション期間がタイムアウトした場合に特に重要です(ジョブが再度エンキューされると1個以上の要素が再処理される可能性があるため)。 
🔗 タスクオブジェクトのライフサイクルとメモ化
タスクが実行または再開されると、ランナーはタスク処理用のジョブをエンキューします。このジョブはタスクオブジェクトをインスタンス化し、タスクオブジェクトはそのジョブが続く間存在し続けます。ジョブが初めて実行されると、タスクはcountを呼び出します。ジョブが実行されるたびに、タスクはタスクオブジェクトのcollectionを呼び出し、続いてコレクションの項目ごとにprocessを呼び出します。コレクションの処理が完了するか、ジョブの最大実行時間を超えると、ジョブは停止します。
メモ化された値は、同じジョブ内における以後のprocess呼び出しでも参照可能になるため、process内でメモ化を行うとおかしなことになる可能性があります。
それでもメモ化はスロットリングやレポート生成で引き続き利用可能です。たとえば、レポートを永続化したりログ出力するのにタスクのコールバックを利用できます。
🔗 タスクのテストを書く
タスクジェネレータを実行すると、test/tasks/maintenance/ディレクトリにタスクのテストファイルも作成されます。少なくともタスクの#processメソッドについてはテストを書いておくことをおすすめします。タスクの#collectionメソッドや#countメソッドが複雑になってきたら、これらについてもテストを書くのがよいでしょう。
テストコードの例:
# test/tasks/maintenance/update_posts_task_test.rb
require "test_helper"
module Maintenance
  class UpdatePostsTaskTest < ActiveSupport::TestCase
    test "#process performs a task iteration" do
      post = Post.new
      Maintenance::UpdatePostsTask.process(post)
      assert_equal "New content!", post.content
    end
  end
end
🔗 CSVタスクのテストを書く
CSVタスクでも#processメソッドのテストを書きましょう。テストではCSV::Rowを引数として受け取ります。CSVのある行、または文字列キーのハッシュをテストに渡すことで、#processをテストできます。
# test/tasks/maintenance/import_posts_task_test.rb
require "test_helper"
module Maintenance
  class ImportPostsTaskTest < ActiveSupport::TestCase
    test "#process performs a task iteration" do
      assert_difference -> { Post.count } do
        Maintenance::UpdatePostsTask.process({
          "title" => "My Title",
          "content" => "Hello World!",
        })
      end
      post = Post.last
      assert_equal "My Title", post.title
      assert_equal "Hello World!", post.content
    end
  end
end
🔗 パラメータ付きタスクのテストを書く
パラメータを受け取れるタスクのテストでは、属性を代入するためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、通常どおり#process をテストできるようになります。
# test/tasks/maintenance/update_posts_via_params_task_test.rb
require "test_helper"
module Maintenance
  class UpdatePostsViaParamsTaskTest < ActiveSupport::TestCase
    setup do
      @task = UpdatePostsViaParamsTask.new
      @task.updated_content = "Testing"
    end
    test "#process performs a task iteration" do
      assert_difference -> { Post.first.content } do
        @task.process(Post.first)
      end
    end
  end
end
🔗 カスタムEnumeratorを使うタスクをテストする
カスタムEnumeratorを使うタスクのテストでは、#enumerator_builderを呼び出すためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、#enumerator_builderが返すEnumeratorが[item, cursor]ペアを期待通りに生成することを検証できるようになります。
# test/tasks/maintenance/custom_enumerating_task.rb
require "test_helper"
module Maintenance
  class CustomEnumeratingTaskTest < ActiveSupport::TestCase
    setup do
      @task = CustomEnumeratingTask.new
    end
    test "#enumerator_builder returns enumerator yielding pairs of [item, cursor]" do
      enum = @task.enumerator_builder(cursor: 0)
      expected_items = [:b, :c]
      assert_equal 2, enum.size
      enum.each_with_index do |item, cursor|
        assert_equal expected_items[cursor], item
      end
    end
    test "#process performs a task iteration" do
      # ...
    end
  end
end
🔗 タスクを実行する
🔗 Web UIから実行する
Web UIを開いてタスクの"Run"をクリックすれば、新しいタスクを実行できます。
🔗 コマンドラインから実行する
以下のようにコマンドラインでタスクを実行することも可能です。
bundle exec maintenance_tasks perform Maintenance::UpdatePostsTask
CSVを処理するタスクをコマンドラインで実行するには、--csvオプションを指定します。
bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv "path/to/my_csv.csv"
この--csvオプションは、標準入力からCSVコンテンツを受け取るときにも有効です。
curl "some/remote/csv" |
  bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv
引数を受け取るタスクをコマンドラインで実行するには、--argumentsオプションを指定し、続けて<キー>:<値>ペアのセットを渡します。
bundle exec maintenance_tasks perform Maintenance::ParamsTask \
  --arguments post_ids:1,2,3 content:"Hello, World!"
🔗 タスクをRubyから実行する
runにタスク名を指定してランナーに送信すれば、タスクをRubyで実行することも可能です。
MaintenanceTasks::Runner.run(name: "Maintenance::UpdatePostsTask")
CSVをランナーで処理するタスクを実行するには、オープンするioオブジェクトとファイル名を含むハッシュをrunに渡します。
MaintenanceTasks::Runner.run(
  name: "Maintenance::ImportPostsTask",
  csv_file: { io: File.open("path/to/my_csv.csv"), filename: "my_csv.csv" }
)
ランナーを使い、かつ引数を受け取るタスクを実行するには、 runに{ パラメータ名: 引数の値 }という引数セットを含むハッシュを渡します。
MaintenanceTasks::Runner.run(
  name: "Maintenance::ParamsTask",
  arguments: { post_ids: "1,2,3" }
)
🔗 タスクのステータスを監視する
Web UIでは、タスクの最新ステータス情報を得られます。タスクのステータスは以下のとおりです。
- new(新規)
 - まだ実行されていないタスク。
 - enqueued(エンキュー済み)
 - ユーザーが実行を指示した後で実行待機中のタスク。
 - running(実行中)
 - 現在ジョブワーカーによって実行中の状態にあるタスク。
 - pausing(一時停止処理中)
 - ユーザーが一時停止を指示した後、停止前に作業を完了する必要のあるタスク。
 - paused(一時停止済み)
 - ユーザーによって一時停止され、現在は実行していないタスク。再開(resume)可能。
 - interrupted(中断済み)
 - ジョブインフラストラクチャによって一時的に中断されたタスク。
 - cancelling(キャンセル処理中)
 - ユーザーによってキャンセルされ、停止前に作業を完了する必要のあるタスク。
 - cancelled(キャンセル済み)
 - ユーザーによってキャンセルされ、現在は実行していないタスク。再開不可。
 - succeeded(成功)
 - 正常に完了したタスク。
 - errored(エラー)
 - 実行中にunhandled exceptionが発生したタスク。
 
🔗 API専用アプリケーションで利用する
maintenance_tasksエンジンは、flashメッセージとCSRFトークンの保存にRailsセッションを利用します。このエンジンをAPIのみのRailsアプリケーションで動作させるには、SessionミドルウェアとActionDispatch::Flashミドルウェアを追加する必要があります。また、エンジンは厳格なコンテンツセキュリティポリシー(CSP)を定義しています。CSPがユーザーのブラウザに確実に配信されるように、アプリのミドルウェアスタックにActionDispatch::ContentSecurityPolicy::Middlewareを含めるようにしてください。
Railsアプリケーションの設定については本ドキュメントの範囲外ですが、たとえばアプリケーションの設定ファイルに以下の行を追加することで可能になります。
# config/application.rb
module YourApplication
  class Application < Rails::Application
    # ...
    config.api_only = true
    config.middleware.insert_before ::Rack::Head, ::ActionDispatch::Flash
    config.middleware.insert_before ::Rack::Head, ::ActionDispatch::ContentSecurityPolicy::Middleware
    config.session_store :cookie_store, key: "_#{railtie_name.chomp("_application")}_session", secure: true
    config.middleware.insert_before ::ActionDispatch::Flash, config.session_store, config.session_options
    config.middleware.insert_before config.session_store, ActionDispatch::Cookies
  end
end
詳しくはRailsガイドのRailsによるAPI専用アプリケーションを参照してください。
🔗 maintenance_tasksがタスクを実行するしくみ
メンテナンスタスクの実行は長時間に及ぶ可能性があります。maintenance_tasks gemの目的は、「デプロイ」「Kubernetes Podのスケジューリング」「Heroku dynoの再起動」「その他のインフラストラクチャやコードの変更」を介してタスクの実行継続を支援することです。
つまり、processメソッドが返されれば、以後はイテレーション処理の終盤に手動で介入する必要なしに、タスクの中断やエンキュー、再開を安全に行えるようになります。
デフォルトでは、実行中のタスクは5分以上経過すると自動的に中断されます。このタイムアウトはjob-iteration gemで設定され、必要に応じてイニシャライザで調整可能です。
実行中のタスクも、必要に応じて自動的に中断および再度エンキューされます(例: デプロイでSidekiqワーカーがシャットダウンした場合)。
- Sidekiqは、
TSTPシグナルかTERMシグナルを受信すると、自分自身を停止中とみなします。 - 
Sidekiqが停止中は、job-iterationがEnumeratorのイテレーションを停止します。
このときjob-iterationはイテレーション位置を保存したうえで、作業再開用の新しいジョブをエンキューし、タスクのステータスを"interrupted"にします。 
Sidekiqが停止処理中の状態になると、ワーカーに終了まで25秒の猶予が与えられ、それをすぎると強制的にワーカーを終了します(これはデフォルト値: --timeoutオプションで変更可能)。
Sidekiqは、ワーカースレッドが終了する前にジョブの再エンキューを試みてタスクを再開しようとします。ただしこの場合、コレクションをどこまでイテレーションしたかという位置は保存されないため、1個以上のイテレーションが再実行されてしまう可能性があります。
Sidekiq以外のジョブキューでは、これを別の方法で処理しているものもあります。
🔗 タスクが詰まってしまった場合の対応方法
アプリケーションに設定されているキューアダプタにこのプロパティがない場合や、Sidekiqが「クラッシュ」「強制終了」または「実行中のキューを再エンキューできない状態」になると、タスクが詰まってしまったように見える(実行中のように見えるが実際は実行中ではない)ことがあります。
このような状況になると、タスクを一時停止またはキャンセルしようとしても、ステータスがpausingやcancellingのまま変わらなくなり、一時停止やキャンセルが行えなくなります。
回避方法として、タスクのcancellingステータスが5分以上続く場合に、タスクを再度キャンセルし、ステータスが完全にcancelledに変わったら、再度実行できることがあります。
ステータスがpausingのまま変わらない場合で、かつ(キャンセルや再実行を行わずに)タスクがどこまで処理を終えたかという位置を保持したい場合は、"Force pause"をクリックします。
🔗 エラーを通知する
maintenance_tasks gemにはいくつかの設定オプションがあります。カスタムの設定項目は、maintenance_tasks.rbイニシャライザに配置する必要があります。
🔗 エラーハンドラをカスタマイズする
タスクの実行中に発生した例外はrescueされ、エラー情報は永続化されたうえでUI上に表示されます。
エラーはRails.error.reportにも通知可能で、これはアプリケーションでカスタマイズ可能です。詳しくはRailsガイドのエラー通知ガイドを参照してください。
エラーレポーターに通知されるデータには以下が含まれます。
error: raiseする例外を指定するcontext: タスクとエラーの追加情報↓を含むハッシュtask_name: エラーが発生したタスクの名前started_at: タスクの開始時刻ended_at: タスクのエラー発生時刻run_id: エラーが発生したタスクの実行idtick_count: えらー発生時刻からの経過時間(tick)errored_element: タスクで例外が発生したときに処理中だった要素(存在する場合)
このオブジェクトを外部の例外監視サービスに渡す場合は、必ず機密データ漏洩防止のため「オブジェクトのサニタイズ」を行ってから、バグトラッカーと互換性のある「フォーマットへの変換」を行うこと。
source: これはmaintenance-tasksに設定されます。handled:MaintenanceTasks.report_errors_as_handledの値(デフォルトはtrue: 後述)
コンテキストが集められる前にタスクでエラーが発生すると、contextが空になることがあります(タスクをデシリアライズするジョブが失敗した場合など)。
以下は、例外監視サービス (Bugsnag) と統合するためのRailsエラーレポーターのカスタムサブスクライバの例です。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.report_errors_as_handled = false
class MaintenanceTasksErrorSubscriber
  def report(error, handled:, severity:, context:, source: nil)
    return unless source == "maintenance-tasks"
    unless handled
      Bugsnag.notify(error) do |notification|
        notification.add_metadata(:task, context)
      end
    else
      Rails.logger.info(error)
  end
end
Rails.error.subscribe(MaintenanceTasksErrorSubscriber.new)
handledの値は、この例ではMaintenanceTasks.report_errors_as_handledによって決まります。
この値は、後方互換性のため、デフォルトではtrueに設定されます。
これをfalseに設定すると、エラーサブスクライバで期待されるエラー(例: report_on経由)と期待されていないエラーを区別可能になるので、より正確なエラーレポートが提供されます。v3.0ではfalseがデフォルトになります。
🔗 イテレーション中のエラーを通知する
デフォルトでは、タスクのイテレーション(反復処理)中に発生したエラーはアプリケーションに通知され、イテレーションは停止します。しかし、一部のエラーについては処理を行ってからイテレーションを継続したい場合もあります。MaintenanceTasks::Task.report_onを使うことで、特定の例外をrescueしてRailsエラーレポーターに通知できます。キーワードパラメータはすべてreportメソッドに渡されます。
class MyTask < MaintenanceTasks::Task
  report_on(MyException, OtherException, severity: :info, context: {task_name: "my_task"})
end
MaintenanceTasks::TaskにはActiveSupport::Rescuableも含まれているので、これを用いてカスタムエラー処理を実装できます。
class MyTask < MaintenanceTasks::Task
  rescue_from(MyException) do |exception|
    handle(exception)
  end
end
🔗 MaintenanceTasksモジュールをカスタマイズする
以下のようにMaintenanceTasks.tasks_moduleを設定することで、タスクに配置するモジュールを定義できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.tasks_module = "TaskModule"
この値が指定されていない場合は、デフォルトでMaintenanceになります。
🔗 タスクを名前空間で整理する
タスクは、app/tasks/maintenanceディレクトリの下に任意の深さでネストできます。
たとえば、app/tasks/maintenance/team_name/service_name/update_posts_task.rbというファイルに以下のタスクを定義できます。
module Maintenance
  module TeamName
    module ServiceName
      class UpdatePostsTask < MaintenanceTasks::Task
        def process(rows)
          # ...
        end
      end
    end
  end
end
🔗 ベースとなるジョブクラスをカスタマイズする
MaintenanceTasks.jobコンフィグは、タスクで使うジョブクラスを定義するのに使えます。この設定はグローバルなので、ジョブクラスはアプリケーション内のあらゆるメンテナンスタスクで使われます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.job = "CustomTaskJob"
# app/jobs/custom_task_job.rb
class CustomTaskJob < MaintenanceTasks::TaskJob
  queue_as :low_priority
end
このジョブクラスは、MaintenanceTasks::TaskJobを必ず継承する必要があります。
ただし、retry_onはカスタムジョブクラスではサポートされないため、失敗したジョブのリトライはできない点にご注意ください。
🔗 タスク進捗状況の永続化頻度をカスタマイズする
MaintenanceTasks.ticker_delayコンフィグは、タスクの進捗状況を永続化する頻度をカスタマイズするのに使えます。Numeric値またはActiveSupport::Duration値を指定できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.ticker_delay = 2.seconds
値を指定しない場合のデフォルト値は1秒です。
🔗 Active Storageで使うストレージサービスをカスタマイズする
Rails 6.1以降のActive Storageフレームワークは、複数のストレージサービスをサポートしています。使うサービスを指定するには、MaintenanceTasks.active_storage_serviceコンフィグに、アプリケーションのconfig/storage.ymlで指定されているサービスのキーを設定できます。
# config/storage.yml
user_data:
  service: GCS
  credentials: <%= Rails.root.join("path/to/user/data/keyfile.json") %>
  project: "my-project"
  bucket: "user-data-bucket"
internal:
  service: GCS
  credentials: <%= Rails.root.join("path/to/internal/keyfile.json") %>
  project: "my-project"
  bucket: "internal-bucket"
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.active_storage_service = :internal
アプリケーションで利用するストレージサービスが1種類のみの場合は、このコンフィグオプションは不要です。その場合、Rails.configuration.active_storage.serviceがデフォルトで使われます。
🔗 バックトレースクリーナーをカスタマイズする
MaintenanceTasks.backtrace_cleanerコンフィグは、タスクのエラー時にバックトレースをクリーンアップして永続化するのに使うバックトレースクリーナーを指定するのに使えます。ActiveSupport::BacktraceCleanerを使う必要があります。
# config/initializers/maintenance_tasks.rb
cleaner = ActiveSupport::BacktraceCleaner.new
cleaner.add_silencer { |line| line =~ /ignore_this_dir/ }
MaintenanceTasks.backtrace_cleaner = cleaner
値を指定しない場合は、デフォルトでRails.backtrace_cleanerがバックトレースのクリーンアップに使われます。
🔗 Web UIで使われる親コントローラをカスタマイズする
MaintenanceTasks.parent_controllerコンフィグは、Web UIエンジンのすべてのコントローラに継承されるコントローラクラスを指定するのに使えます。
これにより、ApplicationController(または任意のコントローラー)に共通ロジックを持つアプリケーションは、イニシャライザでシンプルな割当を行うだけで、Web UIがそのロジックを継承する設定がオプションで可能になります。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.parent_controller = "Services::CustomController"
# app/controllers/services/custom_controller.rb
class Services::CustomController < ActionController::Base
  include CustomSecurityThings
  include CustomLoggingThings
  # ...
end
親コントローラの値は、必ず既存のコントローラクラスに対応する文字列でなければならず、その既存のコントローラはActionController::Baseを必ず継承していなければなりません。
値が指定されていない場合は、デフォルトで"ActionController::Base"が使われます。
🔗 タスクが詰まっていると判断するまでの時間を設定する
タスクが更新されていない場合に、タスクが詰まっているとみなすまでのタイムアウト時間を設定するには、MaintenanceTasks.stuck_task_durationコンフィグを使えます。この時間を設定するときは、ジョブインフラストラクチャのイベントがメンテナンスタスクのジョブ実行を妨げたりタスクをキャンセルしたりする分の時間も考慮して盛り込んでおく必要があります。
MaintenanceTasks.stuck_task_durationの値にはActiveSupport::Duration型を必ず使う必要があります。
値が指定されていない場合は、デフォルトで5分が設定されます。
🔗 ステータスの再読み込み頻度を設定する
MaintenanceTasks.status_reload_frequencyを設定すると、イテレーション中に実行ステータスを再読み込みする頻度を指定できます。デフォルトのステータスは1秒ごとに再読み込みされますが、パフォーマンスを向上させるためにこの間隔を長くできます。再読み込み間隔を長くすると、タスクが一時停止または中断した場合の停止速度に影響することに注意してください。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.status_reload_frequency = 10.seconds  # 10秒おきに再読み込み
個別のタスクでreload_status_everyメソッドを設定することで、設定を上書きできます。
# app/tasks/maintenance/update_posts_task.rb
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    # 5秒おきにステータスを再読み込み(グローバルなデフォルト値を使わない)
    reload_status_every(5.seconds)
    def collection
      Post.all
    end
    def process(post)
      post.update!(content: "New content!")
    end
  end
end
この最適化によって、特に短時間のイテレーションでデータベースクエリを大幅に削減可能になります。
これは、タスクのキャンセルや一時停止を頻繁にチェックする必要がない場合に特に有用です。
🔗 メタデータ
MaintenanceTasks.metadataコンフィグは、実行に関する追加情報を取得するprocを指定するのに使えます。このprocはMaintenanceTasks.parent_controllerのコンテキストで実行されるので、メンテナンスタスクを実行したユーザーのIDやメールアドレスを取得するのに利用できます。
# config/initializers/maintenance_tasks.rb
MaintenanceTasks.metadata = ->() do
  { user_email: current_user.email }
end
🔗 アップグレード方法
maintenance_tasksの新バージョンをチェックして更新するには、bundlerを使います。新バージョンをインストールした後は、インストールコマンドを再度実行すること。
bin/rails generate maintenance_tasks:install
これにより、新しいマイグレーションもインストールされて実行可能になります。
🔗 メンテナンスタスクの古いマイグレーションを削除してしまった場合の対応方法
maintenance_tasksのインストールコマンドは、古いマイグレーションの再インストールを試みるため、古いマイグレーションが削除されていると、マイグレーション中にデータベースで問題が発生します。
対応方法: bin/rails maintenance_tasks:install:migrationsを実行して、maintenance_tasksのマイグレーションをdb/migrateディレクトリにコピーしてください。
次に、maintenance_tasksに新しいマイグレーションが追加されていないかどうかをリリースノートで確認し、新しいマイグレーションがあれば残し、既に実行済みの古いマイグレーションは削除します。
以上の確認が終わったら、bin/rails db:migrateコマンドでマイグレーションを実行します。
🔗 貢献方法
私たちは、問題報告用のissueや、プルリクエストによるコードへの貢献を受け付けています。貢献方法についてはCONTRIBUTING.mdのガイドラインを参照してください。
🔗 新バージョンのリリースについて
プルリクエストがマージされると、GitHub上の最新ドラフトリリースに追加されます。
新バージョンのリリース準備ができたら、以下の手順を実行します。
maintenance_tasks.gemspecファイルのspec.versionフィールドを更新します。bundle installを実行して、Gemfile.lockのgemバージョンを上げます。- プルリクエストをオープンし、承認を経てからマージします。
 - Shipit経由でデプロイし、rubygems.orgに新バージョンが表示されていることを確認します。
 - リリースノートにすべてのChangelogが記載されていることを確認してから、新バージョンを公開します。
 - 次のリリースに備えて、“Upcoming Release”というタイトルのドラフトリリースをGitHub上に新規作成します。タグバージョンは空欄のままで構いません。これは、次期リリースに関連する変更をドキュメント化するときの開始ポイントになります。
 
      
概要
MITライセンスに基づいて翻訳・公開いたします。
本記事では、原則としてツール(gem、フレームワーク)の名前をmaintenance_tasksと表記します。
Shopifyが開発したmaintenance_tasksは、Railsガイドでも推奨されているDBのデータマイグレーション用gemです↓。
参考: 10.2 データのマイグレーション -- Active Record マイグレーション - Railsガイド