Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)

概要

MITライセンスに基づいて翻訳・公開いたします。

本記事では、原則としてツール(gem、フレームワーク)の名前をmaintenance_tasksと表記します。

Shopify/maintenance_tasks - GitHub

Shopifyが開発したmaintenance_tasksは、Railsガイドでも推奨されているDBのデータマイグレーション用gemです↓。

参考: 10.2 データのマイグレーション -- Active Record マイグレーション - Railsガイド

Rails: DBメンテナンス支援ツール "maintenance_tasks" README(翻訳)

maintenance_tasksは、メンテナンスタスクをジョブキューに入れて管理するRailsエンジンです。

このプロジェクトにおける「メンテナンスタスク」は、いわゆるデータのマイグレーション(つまりデータベース内のデータを変更するコード、多くは最終的にスキーママイグレーションをサポートするのが目的)を意味します。
たとえば、NOT NULLカラムを新たに導入する場合、以下を行わなければなりません。

  1. そのカラムをnullable(NULL許容)として追加する
  2. そのカラムに値をバックフィルする
  3. 最後にそのカラムをNOT NULLに変更する

maintenance_tasksのエンジンは、上のステップ2である「バックフィル(backfill: 値の埋め戻し)」を支援します。

メンテナンスタスクはコレクションベースのタスクであり、データベース内のデータをActive Recordで更新するのが普通です。このメンテナンスタスクは一時停止・中断が可能です。また、メンテナンスタスクをバッチで実行することも、データベースの負荷制御のためにスロットリングすることも可能です。

メンテナンスタスクは定期的に実行するものではなく、必要が生じたときに1回だけ実行されることが前提です。メンテナンスタスクは一時的にしか必要とされないので、利用後は削除されるのが普通です。

maintenance_tasksのRailsエンジンには、メンテナンスタスクの「一覧表示」「ステータス確認」「開始」「一時停止」「再起動」を行えるWebベースのUIが搭載されています。

🔗 メンテナンスタスクはどんなときに必要か

maintenance_tasksに搭載されているジョブUIは、あくまで用途に特化した機能に限定されています。maintenance_tasksのRailsエンジンを用いて、目的以外のデータ変更タスク(サポートリクエストのデータ変更など)を提供することも一応可能ですが、そうなるとこのエンジンで提供できる以上の柔軟性が求められることになってしまうので、そうしたユースケースにはmaintenance_tasksではなく通常のアプリケーションコードを使うことをおすすめします。

Active Jobとして実行すべきでないタスクは、おそらくこのgemの用途に合わないでしょう。

  • タスクをバックグラウンドで実行する必然性がなければ、代わりにランナースクリプトの利用を検討してください。
  • タスクを中断可能にする必然性がなければ、通常のActive Jobの利用を検討してください。

メンテナンスタスクは、反復中に中断可能です。
タスクがコレクションベースではない場合や、バッチが極めて大きい場合は、スロットリング(イテレーションごとに休止をはさむ)や中断機能で得られるメリットは限られます。これで問題ない場合もありますが、通常のActive Jobよりも複雑なメンテナンスタスクをわざわざ追加する手間に見合わない可能性もあります。

タスクで更新する対象がデータではなくデータベーススキーマの場合は、メンテナンスタスクではなく、通常のマイグレーションタスクをお使いください。

定期的に発生するタスクを扱うのであれば、maintenance_tasks gemよりも、Active Jobにスケジューラやcron、job-iterationrails_adminのカスタムUIを組み合わせることを検討しましょう。

新規アプリケーション上でseedデータを作成したい場合は、maintenance_tasks gemではなく、db/seeds.rbをお使いください。

途中で終わったマイグレーションを適切に処理できないアプリケーションの場合は、おそらくmaintenance_tasks gemは適切なツールではありません。maintenance_tasks gemは、メンテナンスタスクが「一時停止可能」「キャンセル可能」であることを意図しています。

🔗 インストール

gemをインストールしてジェネレータを実行するには、以下を実行します

bundle add maintenance_tasks
bin/rails generate maintenance_tasks:install

このジェネレータは、必要なテーブルをデータベースに追加するためのマイグレーションを作成して実行します。また、maintenance_tasks用のルーティングをconfig/routes.rbにマウントします。デフォルトでは、/maintenance_tasksという新しいパスでメンテナンスタスクにアクセスできます。

例外レポートサービス(Bugsnagなど)を利用している場合は、エラーハンドラを定義する必要が生じるでしょう。詳しくはエラーハンドラのカスタマイズを参照してください。

🔗 Active Jobへの依存について

maintenance_tasksフレームワークは、タスクをバックグラウンドで実行するためにActive Jobに依存します。Active Jobのキューイングバックエンドはデフォルトでは非同期です。コードやインフラストラクチャの変更中にタスクの進行状況が失われるのを防ぐために、キューイングを永続化バックエンドに変更することを強く推奨します。キューイングバックエンドの設定方法について詳しくは、Active Jobガイドを参照してください。

🔗 オートロードについての注意

maintenance_tasksフレームワークは、:classicモードでのオートロードをサポートしていません。
利用するには、アプリケーションのコードでZeitwerkが使われていることを必ず確認してください。詳しくはRailsガイドの定数のオートロードとリロードを参照してください。

🔗 利用法

maintenance_tasksの典型的なワークフローは以下のような感じになります。

  1. タスクを記述するためのクラスを生成して実行したい作業を記述する
  2. 以下のいずれかの方法でタスクを実行する
  3. 以下のいずれかの方法でタスクを監視する
    • 組み込みのWeb UI
    • タスクの実行ステータスをデータベースで手動確認する
      4.不要になったタスクを削除する(オプション)

🔗 タスクを作成する

タスク作成用のジェネレータが提供されているので、以下を実行して新しいタスクを生成します。

bin/rails generate maintenance_tasks:task update_posts

これで、app/tasks/maintenance/update_posts_task.rbというタスクファイルが作成されます。

生成されたタスクは、MaintenanceTasks::Taskのサブクラスです。このタスクには以下を実装します。

  • collection: 反復処理の対象となるActive Recordリレーション、または配列を返します
  • process: メンテナンスタスクの作業を1個のレコードに対して実行します

オプションとして、タスクに#countメソッドをカスタム実装することで、イテレーションする要素の個数を定義することも可能です。

タスクのtick_total(経過時間)はコレクションのサイズに応じて自動的に算出されますが、必要であればこの値を#countメソッドでオーバーライドできます(例: コレクションのサイズを決定するためのクエリ生成を避けたい場合)。

タスクの例:

# app/tasks/maintenance/update_posts_task.rb

module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    def collection
      Post.all
    end

    def process(post)
      post.update!(content: "New content!")
    end
  end
end
🔗 バッチサイズをカスタマイズする

Active Recordリレーションを用いてレコードを処理する場合、レコードは内部でバッチで取得されたうえでレコードごとに#processメソッドに渡されます。
メンテナンスタスクがデフォルトで取得するデフォルトのバッチ数は100個ですが、バッチサイズはcollection_batch_sizeマクロで変更可能です。

# app/tasks/maintenance/update_posts_task.rb

module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    # Fetch records in batches of 1000
    collection_batch_size(1000)

    def collection
      Post.all
    end

    def process(post)
      post.update!(content: "New content!")
    end
  end
end

🔗 CSV処理用のタスクを作成する

CSVファイルをイテレーションするタスクも作成できます。
CSVタスクを作成するには、Active Storageも設定しておく必要があることにご注意ください。Active Storageで必要な依存関係がアプリケーションのGemfileで指定済みであることを確認してから、セットアップ手順に沿って設定してください。後述のActive Storageサービスのカスタマイズ方法も参照してください。

以下のコマンドを実行してCSVタスクを生成します。

bin/rails generate maintenance_tasks:task import_posts --csv

生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。

  • process: CSV::Rowで行いたいメンテナンスタスクを記述する
# app/tasks/maintenance/import_posts_task.rb

module Maintenance
  class ImportPostsTask < MaintenanceTasks::Task
    csv_collection

    def process(row)
      Post.create!(title: row["title"], content: row["content"])
    end
  end
end
  • posts.csv:
title,content
My Title,Hello World!

Active Storageのサービスプロバイダにアップロードされたファイルは、ISO 8601形式のタイムスタンプを含み、タスク名をスネークケース形式にする形でリネームされます。

暗黙の#countメソッドは、ファイルの正確な行数を決定するために、ファイル全体を読み込んで解析します。ファイルが数百万行になると処理に時間がかかるので、nilを返すcountを定義する、行数を近似する(例: 新規の行数をカウントする)などの方法で、カウントをスキップすることを検討してください。

def count(task)
  task.csv_content.count("\n") - 1
end
🔗 CSVオプション

タスクにcsv_collectionキーワード引数を追加することで、RubyのCSVパーサー用のオプションをタスクに渡せるようになります。

# app/tasks/maintenance/import_posts_task.rb

module Maintenance
  class ImportPosts
    csv_collection(skip_lines: /^#/, converters: ->(field) { field.strip })

    def process(row)
      Post.create!(title: row["title"], content: row["content"])
    end
  end
end

これらのオプションは、#で始まる行をスキップするようRuby CSVパーサーに指示し、すべてのフィールドから冒頭と末尾のスペース文字を削除します。これによって、以下のファイルは前述のサンプルとまったく同様に処理されます。

posts.csv:

# コメント行
title,content
 My Title ,Hello World!
🔗 CSVタスクのバッチ処理

タスクでは、CSVをバッチで処理できます。タスク内のcsv_collectionマクロに、以下のようにin_batchesオプションを追加します。

# app/tasks/maintenance/batch_import_posts_task.rb

module Maintenance
  class BatchImportPostsTask < MaintenanceTasks::Task
    csv_collection(in_batches: 50)

    def process(batch_of_rows)
      Post.insert_all(post_rows.map(&:to_h))
    end
  end
end

通常のCSVタスクと同様に、以下のメソッドも実装しておくこと。

  • process: タスクで(CSV::Rowオブジェクトの配列に)行いたい処理をバッチで記述する

このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(CSVの全行数ではなく)バッチ単位で表示されます。

CSVタスクをバッチ化しない場合、実質的なバッチサイズが1となり、データベース操作の効率が落ちる可能性があります。

🔗 バッチのコレクションを処理する

maintenance_tasks gemはActive Recordのバッチ処理をサポートしています。バッチ処理を使うことで、タスクがデータベースを呼び出す回数を削減できます。
処理をレコード単位ではなくバッチ単位で処理するには、コレクションが返すリレーションのActiveRecord::Batches#in_batchesを使います。
Active Recordのバッチサイズはデフォルトで1000レコードですが、カスタムのサイズも指定可能です。

# app/tasks/maintenance/update_posts_in_batches_task.rb

module Maintenance
  class UpdatePostsInBatchesTask < MaintenanceTasks::Task
    def collection
      Post.in_batches
    end

    def process(batch_of_posts)
      batch_of_posts.update_all(content: "New content added on #{Time.now.utc}")
    end
  end
end

このタスクには以下のメソッドを実装しておく必要があります。

  • collection: ActiveRecord::Batches::BatchEnumeratorを返します
  • process: タスクの作業をActiveRecord::Relationに対してバッチで行います

このとき、#countはコレクション内のバッチの個数に応じて自動的に算出され、タスクの進行状況は(リレーションのレコード数ではなく)バッチ単位で表示されます。

重要!

バッチ処理は、#process#update_all#delete_allを実行する場合に限ってお使いください。
個別のレコードをイテレーションしなければならない場合は、ActiveRecord::Relationを返すコレクションを定義する必要があります。これは内部でバッチ処理を行いますが、レコードを1件のクエリで読み込む点が異なります。
逆にバッチのコレクションは、最初にバッチのレコードから主キーを読み込んでから、#processメソッド内でeach(またはEnumerableの任意のメソッド)を呼び出すたびに追加クエリを実行してレコードを読み込みます。

🔗 コレクションの不要なタスク

場合によっては、シンプルな操作を1個だけ実行したいことがあります(例: バックグラウンドジョブを追加する、外部APIにクエリをかける)。maintenance_tasks gemは、コレクションなしのタスクもサポートしています。

以下を実行して、コレクションなしのタスクを生成します。

bin/rails generate maintenance_tasks:task no_collection_task --no-collection

生成されたタスクはMaintenanceTasks::Taskのサブクラスです。ここに以下を実装します。

  • process: メンテナンスタスクで行いたい処理を実行する
# app/tasks/maintenance/no_collection_task.rb

module Maintenance
  class NoCollectionTask < MaintenanceTasks::Task
    no_collection

    def process
      SomeAsyncJob.perform_later
    end
  end
end

🔗 タスクでカスタムのEnumeratorを使う

サポートされていないコレクション型(APIで取得した外部リソースなど)をイテレーションしなければならない特殊なユースケースでは、タスク内にenumerator_builder(cursor:)を実装することでイテレーションできます。

このメソッドは、[item, cursor]というペアを生成するEnumeratorを返す必要があります。
maintenance_tasksは、イテレーションの現在のカーソル位置を永続化して、タスクが中断または再開したときにcursorと引数として提供します。このcursorStringとして保存されるので、カスタムのEnumeratorは必要に応じてこの値のシリアライズ/デシリアライズを処理する必要があります。

# app/tasks/maintenance/custom_enumerator_task.rb

module Maintenance
  class CustomEnumeratorTask < MaintenanceTasks::Task
    def enumerator_builder(cursor:)
      after_id = cursor&.to_i
      PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum
    end

    def process(post)
      Post.create!(post)
    end
  end
end

🔗 スロットリング

maintenance_tasksでは大量のデータを変更することが多いため、データベースに負荷をかけることがあります。そのため、maintenance_tasksはスロットリングメカニズムを提供しており、特定の条件が満たされたときにタスクをスロットリングできます。
タスクがスロットリングされると(スロットルブロックがtrueを返すと)、タスクを中断し、バックオフ期間をすぎるとタスクをリトライします。デフォルトのバックオフ期間は30秒です。

スロットリングの条件はブロックを渡す形で指定します。

# app/tasks/maintenance/update_posts_throttled_task.rb

module Maintenance
  class UpdatePostsThrottledTask < MaintenanceTasks::Task
    throttle_on(backoff: 1.minute) do
      DatabaseStatus.unhealthy?
    end

    def collection
      Post.all
    end

    def process(post)
      post.update!(content: "New content added on #{Time.now.utc}")
    end
  end
end

アプリのスロットリング条件を適切に定義するのは、開発者にかかっています。ShopifyではDatabaseStatus.healthy?を実装する形で、MySQLのさまざまなメトリクス(レプリケーションラグ、DBスレッド、DB書き込み可能かどうかなど)をチェックしています。

タスクには複数のスロットリング条件を定義できます。スロットリング条件は子孫クラスに継承され、新しい条件を追加しても既存の条件に影響しません。

バックオフ期間は、引数なしのProcとして指定することも可能です。

# app/tasks/maintenance/update_posts_throttled_task.rb

module Maintenance
  class UpdatePostsThrottledTask < MaintenanceTasks::Task
    throttle_on(backoff: -> { RandomBackoffGenerator.generate_duration } ) do
      DatabaseStatus.unhealthy?
    end
    # ...
  end
end

🔗 カスタムのタスクパラメータ

タスクを実行するためにパラメータで追加情報を指定する必要が生じることがあります。タスク内でパラメータをActive Model属性として定義することで、#collection#count#processなどのタスクメソッドでパラメータを利用可能になります。

# app/tasks/maintenance/update_posts_via_params_task.rb

module Maintenance
  class UpdatePostsViaParamsTask < MaintenanceTasks::Task
    attribute :updated_content, :string
    validates :updated_content, presence: true

    def collection
      Post.all
    end

    def process(post)
      post.update!(content: updated_content)
    end
  end
end

タスクにパラメータを定義するときに、Active Modelのバリデーションも活用できます。パラメータを受け取れるタスクに引数を渡すと、タスクの実行開始前にバリデーションされます。引数はユーザーインターフェイス内のテキスト領域入力経由で指定されるので、引数がタスクで想定している形式に準拠するようにし、必要に応じてサニタイズすることが重要です。

🔗 カーソルカラムをカスタマイズしてパフォーマンスを向上させる

maintenance_tasks gemが依存しているjob-iteration gemは、collectionメソッドが返すリレーションにORDER BY句を追加してレコードをイテレーション可能にします(デフォルトではidカラム順でイテレーションされます)。

job-iteration gemは、カーソルの順序付けにどのカラムを使うかというbuild_active_record_enumerator_on_recordsコンフィグをサポートしています。

maintenance-tasks gemは、job-iteration gemが提供するカーソルカラムの制御機能を、MaintenanceTasks::Taskクラスのcursor_columnsメソッドで公開します。
cursor_columnsnilを返す場合、クエリの結果は主キー順になります。イテレーション中にカーソルカラムの値が変更されると、レコードがスキップされたり、同じレコードが何度も生成されてしまう可能性があります。

module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    def cursor_columns
      [:created_at, :id]
    end

    def collection
      Post.where(created_at: 2.days.ago...1.hour.ago)
    end

    def process(post)
      post.update!(content: "updated content")
    end
  end
end

🔗 instrumentationイベントへのサブスクライブ

特定のタスクイベントに対して操作を行いたい場合は、後述するタスクコールバックセクションを参照してください。

ただし、すべてのイベントをサブスクライブしたい場合は、以下のActive Support通知をどのタスクでも利用できます。

enqueued.maintenance_tasks    # タスクがユーザーによってエンキューされるとこのイベントがpublishされる
succeeded.maintenance_tasks   # タスクがエラーなしで完了するとこのイベントがpublishされる
cancelled.maintenance_tasks   # ユーザーが何らかのタスクを明示的に停止するとこのイベントがpublishされる
paused.maintenance_tasks      # ユーザーがタスクを実行中に一時停止するとこのイベントがpublishされる
errored.maintenance_tasks     # タスクのコードがunhandled exceptionを発生するとこのイベントがpublishされる

これらの通知は、メンテナンスタスクのライフサイクルをアプリケーション内で監視する方法を提供します。

利用例:

ActiveSupport::Notifications.subscribe("succeeded.maintenance_tasks") do |*, payload|
  task_name = payload[:task_name]
  arguments = payload[:arguments]
  metadata = payload[:metadata]
  job_id = payload[:job_id]
  run_id = payload[:run_id]
  time_running = payload[:time_running]
  started_at = payload[:started_at]
  ended_at = payload[:ended_at]
rescue => e
  Rails.logger.error(e)
end
ActiveSupport::Notifications.subscribe("errored.maintenance_tasks") do |*, payload|
  task_name = payload[:task_name]
  error = payload[:error]
  error_message = error[:message]
  error_class = error[:class]
  error_backtrace = error[:backtrace]
rescue => e
  Rails.logger.error(e)
end
class MaintenanceTasksInstrumenter < ActiveSupport::Subscriber
  attach_to :maintenance_tasks

  def enqueued(event)
    task_name = event.payload[:task_name]
    arguments = event.payload[:arguments]
    metadata = event.payload[:metadata]

    SlackNotifier.broadcast(SLACK_CHANNEL,
      "Job #{task_name} was started by #{metadata[:user_email]}} with arguments #{arguments.to_s.truncate(255)}")
  rescue => e
    Rails.logger.error(e)
  end
end

🔗 タスクのコールバックを使う

タスクのライフサイクルにフックをかける以下のコールバックが提供されています。

  • after_start
  • after_pause
  • after_interrupt
  • after_cancel
  • after_complete
  • after_error
module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    after_start :notify

    def notify
      NotifyJob.perform_later(self.class.name)
    end

    # ...
  end
end

: after_errorは完了することが保証されているので、コールバックのコード内で発生する例外はすべて無視されます。 after_errorコールバックのコードが例外をraiseする場合は、そのコールバック内で適切にrescueして処理する必要があります。

module Maintenance
  class UpdatePostsTask < MaintenanceTasks::Task
    after_error :dangerous_notify

    def dangerous_notify
      # This error is rescued and ignored in favour of the original error causing the error flow.
      raise NotDeliveredError
    end

    # ...
  end
end

その他のコールバックが例外を発生した場合は、エラーハンドラによって処理され、タスクが実行を停止します。

🔗 タスクを記述するときに考慮すべき点

maintenance_tasksは、タスクを処理するジョブを実行するために、アプリケーション用に設定されたキューアダプタに依存します。タスクの記述方法に関するガイドラインはキューアダプタによって異なる部分もありますが、一般には以下のルールに沿って書く必要があります。

  • Task#processの実行時間に注意すること
    コレクションの要素1個の処理が25秒を超えないようにする。
    あるいは、Sidekiqやアプリケーションに設定済みのキューアダプタに設定されている実行時間を超えないようにする。
    バッチを小さめにしておくことで、タスクが中断や再開を安全に行えるようになります。

  • Task#processを冪等にすること
    コレクションの同じ要素に対してprocessが複数回実行されても安全になるようにすること。詳しくはSidekiqのベストプラクティスを参照。
    タスクでエラーが発生した場合、エラーの原因になった要素が再度処理される可能性があるため、要素の処理を再実行可能にすることが重要です。
    これは、上の状況でイテレーション期間がタイムアウトした場合に特に重要です(ジョブが再度エンキューされると1個以上の要素が再処理される可能性があるため)。

🔗 タスクオブジェクトのライフサイクルとメモ化

タスクが実行または再開されると、ランナーはタスク処理用のジョブをエンキューします。このジョブはタスクオブジェクトをインスタンス化し、タスクオブジェクトはそのジョブが続く間存在し続けます。ジョブが初めて実行されると、タスクはcountを呼び出します。ジョブが実行されるたびに、タスクはタスクオブジェクトのcollectionを呼び出し、続いてコレクションの項目ごとにprocessを呼び出します。コレクションの処理が完了するか、ジョブの最大実行時間を超えると、ジョブは停止します。

メモ化された値は、同じジョブ内における以後のprocess呼び出しでも参照可能になるため、process内でメモ化を行うとおかしなことになる可能性があります。
それでもメモ化はスロットリングやレポート生成で引き続き利用可能です。たとえば、レポートを永続化したりログ出力するのにタスクのコールバックを利用できます。

🔗 タスクのテストを書く

タスクジェネレータを実行すると、test/tasks/maintenance/ディレクトリにタスクのテストファイルも作成されます。少なくともタスクの#processメソッドについてはテストを書いておくことをおすすめします。タスクの#collectionメソッドや#countメソッドが複雑になってきたら、これらについてもテストを書くのがよいでしょう。

テストコードの例:

# test/tasks/maintenance/update_posts_task_test.rb

require "test_helper"

module Maintenance
  class UpdatePostsTaskTest < ActiveSupport::TestCase
    test "#process performs a task iteration" do
      post = Post.new

      Maintenance::UpdatePostsTask.process(post)

      assert_equal "New content!", post.content
    end
  end
end

🔗 CSVタスクのテストを書く

CSVタスクでも#processメソッドのテストを書きましょう。テストではCSV::Rowを引数として受け取ります。CSVのある行、または文字列キーのハッシュをテストに渡すことで、#processをテストできます。

# test/tasks/maintenance/import_posts_task_test.rb

require "test_helper"

module Maintenance
  class ImportPostsTaskTest < ActiveSupport::TestCase
    test "#process performs a task iteration" do
      assert_difference -> { Post.count } do
        Maintenance::UpdatePostsTask.process({
          "title" => "My Title",
          "content" => "Hello World!",
        })
      end

      post = Post.last
      assert_equal "My Title", post.title
      assert_equal "Hello World!", post.content
    end
  end
end

🔗 パラメータ付きタスクのテストを書く

パラメータを受け取れるタスクのテストでは、属性を代入するためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、通常どおり#process をテストできるようになります。

# test/tasks/maintenance/update_posts_via_params_task_test.rb

require "test_helper"

module Maintenance
  class UpdatePostsViaParamsTaskTest < ActiveSupport::TestCase
    setup do
      @task = UpdatePostsViaParamsTask.new
      @task.updated_content = "Testing"
    end

    test "#process performs a task iteration" do
      assert_difference -> { Post.first.content } do
        @task.process(Post.first)
      end
    end
  end
end

🔗 カスタムEnumeratorを使うタスクをテストする

カスタムEnumeratorを使うタスクのテストでは、#enumerator_builderを呼び出すためにタスククラスをインスタンス化しておく必要があります。タスクインスタンスがセットアップされれば、#enumerator_builderが返すEnumeratorが[item, cursor]ペアを期待通りに生成することを検証できるようになります。

# test/tasks/maintenance/custom_enumerating_task.rb

require "test_helper"

module Maintenance
  class CustomEnumeratingTaskTest < ActiveSupport::TestCase
    setup do
      @task = CustomEnumeratingTask.new
    end

    test "#enumerator_builder returns enumerator yielding pairs of [item, cursor]" do
      enum = @task.enumerator_builder(cursor: 0)
      expected_items = [:b, :c]

      assert_equal 2, enum.size

      enum.each_with_index do |item, cursor|
        assert_equal expected_items[cursor], item
      end
    end

    test "#process performs a task iteration" do
      # ...
    end
  end
end

🔗 タスクを実行する

🔗 Web UIから実行する

Web UIを開いてタスクの"Run"をクリックすれば、新しいタスクを実行できます。

🔗 コマンドラインから実行する

以下のようにコマンドラインでタスクを実行することも可能です。

bundle exec maintenance_tasks perform Maintenance::UpdatePostsTask

CSVを処理するタスクをコマンドラインで実行するには、--csvオプションを指定します。

bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv "path/to/my_csv.csv"

この--csvオプションは、標準入力からCSVコンテンツを受け取るときにも有効です。

curl "some/remote/csv" |
  bundle exec maintenance_tasks perform Maintenance::ImportPostsTask --csv

引数を受け取るタスクをコマンドラインで実行するには、--argumentsオプションを指定し、続けて<キー>:<値>ペアのセットを渡します。

bundle exec maintenance_tasks perform Maintenance::ParamsTask \
  --arguments post_ids:1,2,3 content:"Hello, World!"
🔗 タスクをRubyから実行する

runにタスク名を指定してランナーに送信すれば、タスクをRubyで実行することも可能です。

MaintenanceTasks::Runner.run(name: "Maintenance::UpdatePostsTask")

CSVをランナーで処理するタスクを実行するには、オープンするioオブジェクトとファイル名を含むハッシュをrunに渡します。

MaintenanceTasks::Runner.run(
  name: "Maintenance::ImportPostsTask",
  csv_file: { io: File.open("path/to/my_csv.csv"), filename: "my_csv.csv" }
)

ランナーを使い、かつ引数を受け取るタスクを実行するには、 run{ パラメータ名: 引数の値 }という引数セットを含むハッシュを渡します。

MaintenanceTasks::Runner.run(
  name: "Maintenance::ParamsTask",
  arguments: { post_ids: "1,2,3" }
)

🔗 タスクのステータスを監視する

Web UIでは、タスクの最新ステータス情報を得られます。タスクのステータスは以下のとおりです。

new(新規)
まだ実行されていないタスク。
enqueued(エンキュー済み)
ユーザーが実行を指示した後で実行待機中のタスク。
running(実行中)
現在ジョブワーカーによって実行中の状態にあるタスク。
pausing(一時停止処理中)
ユーザーが一時停止を指示した後、停止前に作業を完了する必要のあるタスク。
paused(一時停止済み)
ユーザーによって一時停止され、現在は実行していないタスク。再開(resume)可能。
interrupted(中断済み)
ジョブインフラストラクチャによって一時的に中断されたタスク。
cancelling(キャンセル処理中)
ユーザーによってキャンセルされ、停止前に作業を完了する必要のあるタスク。
cancelled(キャンセル済み)
ユーザーによってキャンセルされ、現在は実行していないタスク。再開不可。
succeeded(成功)
正常に完了したタスク。
errored(エラー)
実行中にunhandled exceptionが発生したタスク。

🔗 maintenance_tasksがタスクを実行するしくみ

メンテナンスタスクの実行は長時間に及ぶ可能性があります。maintenance_tasks gemの目的は、「デプロイ」「Kubernetes Podのスケジューリング」「Heroku dynoの再起動」「その他のインフラストラクチャやコードの変更」を介してタスクの実行継続を支援することです。

つまり、processメソッドが返されれば、以後はイテレーション処理の終盤に手動で介入する必要なしに、タスクの中断やエンキュー、再開を安全に行えるようになります。

デフォルトでは、実行中のタスクは5分以上経過すると自動的に中断されます。このタイムアウトはjob-iteration gemで設定され、必要に応じてイニシャライザで調整可能です。

実行中のタスクも、必要に応じて自動的に中断および再度エンキューされます(例: デプロイでSidekiqワーカーがシャットダウンした場合)。

  • Sidekiqは、TSTPシグナルかTERMシグナルを受信すると、自分自身を停止中とみなします。

  • Sidekiqが停止中は、job-iterationがEnumeratorのイテレーションを停止します。
    このときjob-iterationはイテレーション位置を保存したうえで、作業再開用の新しいジョブをエンキューし、タスクのステータスを"interrupted"にします。

Sidekiqが停止処理中の状態になると、ワーカーに終了まで25秒の猶予が与えられ、それをすぎると強制的にワーカーを終了します(これはデフォルト値: --timeoutオプションで変更可能)。
Sidekiqは、ワーカースレッドが終了する前にジョブの再エンキューを試みてタスクを再開しようとします。ただしこの場合、コレクションをどこまでイテレーションしたかという位置は保存されないため、1個以上のイテレーションが再実行されてしまう可能性があります。

Sidekiq以外のジョブキューでは、これを別の方法で処理しているものもあります。

🔗 タスクが詰まってしまった場合の対応方法

アプリケーションに設定されているキューアダプタにこのプロパティがない場合や、Sidekiqが「クラッシュ」「強制終了」または「実行中のキューを再エンキューできない状態」になると、タスクが詰まってしまったように見える(実行中のように見えるが実際は実行中ではない)ことがあります。

このような状況になると、タスクを一時停止またはキャンセルしようとしても、ステータスがpausingcancellingのまま変わらなくなり、一時停止やキャンセルが行えなくなります。

回避方法として、タスクのcancellingステータスが5分以上続く場合に、タスクを再度キャンセルし、ステータスが完全にcancelledに変わったら、再度実行できることがあります。

ステータスがpausingのまま変わらない場合で、かつ(キャンセルや再実行を行わずに)タスクがどこまで処理を終えたかという位置を保持したい場合は、"Force pause"をクリックします。

🔗 maintenance_tasksの設定項目

maintenance_tasks gemにはいくつかの設定オプションがあります。カスタムの設定項目は、maintenance_tasks.rbイニシャライザに配置する必要があります。

🔗 エラーハンドラをカスタマイズする

タスクの実行中に発生した例外はrescueされ、エラー情報は永続化されたうえでUI上に表示されます。

これを例外監視サービス(例: Bugsnag)に統合したい場合は、以下のようにエラーハンドラを定義できます。

# config/initializers/maintenance_tasks.rb

MaintenanceTasks.error_handler = ->(error, task_context, _errored_element) do
  Bugsnag.notify(error) do |notification|
    notification.add_metadata(:task, task_context)
  end
end

エラーハンドラは、以下の3つの引数を受け取るlambdaにする必要があります。

  • error: raiseする例外を指定する
  • task_context: タスクとエラーの追加情報↓を含むハッシュ
    • task_name: エラーが発生したタスクの名前
    • started_at: タスクの開始時刻
    • ended_at: タスクのエラー発生時刻
      : task_contextは、コンテキストが収集される前のタイミングでタスクがエラーを生成した場合は空になる可能性があります(例: タスクを処理するジョブのデシリアライズが失敗した場合)。
  • errored_element: タスクで例外が発生したときに処理中だった要素(存在する場合)
    : このオブジェクトを外部の例外監視サービスに渡す場合は、必ず機密データ漏洩防止のため「オブジェクトのサニタイズ」を行ってから、バグトラッカーと互換性のある「フォーマットへの変換」を行うこと。
    たとえば、Bugsnagサービスは機密データ保護のため、Active RecordオブジェクトのIDとクラス名だけを送信します。しかしCSV行は文字列に変換されるとBugsnagにそのまま渡されてしまうため、レポートに追加する前にオブジェクトに含まれる個人データをフィルタで削除してください。

🔗 MaintenanceTasksモジュールをカスタマイズする

以下のようにMaintenanceTasks.tasks_moduleを設定することで、タスクに配置するモジュールを定義できます。

# config/initializers/maintenance_tasks.rb

MaintenanceTasks.tasks_module = "TaskModule"

この値が指定されていない場合は、デフォルトでMaintenanceになります。

🔗 タスクを名前空間で整理する

タスクは、app/tasks/maintenanceディレクトリの下に任意の深さでネストできます。
たとえば、app/tasks/maintenance/team_name/service_name/update_posts_task.rbというファイルに以下のタスクを定義できます。

module Maintenance
  module TeamName
    module ServiceName
      class UpdatePostsTask < MaintenanceTasks::Task
        def process(rows)
          # ...
        end
      end
    end
  end
end
🔗 ベースとなるジョブクラスをカスタマイズする

MaintenanceTasks.jobコンフィグは、タスクで使うジョブクラスを定義するのに使えます。この設定はグローバルなので、ジョブクラスはアプリケーション内のあらゆるメンテナンスタスクで使われます。

# config/initializers/maintenance_tasks.rb

MaintenanceTasks.job = "CustomTaskJob"

# app/jobs/custom_task_job.rb

class CustomTaskJob < MaintenanceTasks::TaskJob
  queue_as :low_priority
end

このジョブクラスは、MaintenanceTasks::TaskJob必ず継承する必要があります。

ただし、retry_onはカスタムジョブクラスではサポートされないため、失敗したジョブのリトライはできない点にご注意ください。

🔗 タスク進捗状況の永続化頻度をカスタマイズする

MaintenanceTasks.ticker_delayコンフィグは、タスクの進捗状況を永続化する頻度をカスタマイズするのに使えます。Numeric値またはActiveSupport::Duration値を指定できます。

# config/initializers/maintenance_tasks.rb

MaintenanceTasks.ticker_delay = 2.seconds

値を指定しない場合のデフォルト値は1秒です。

🔗 Active Storageで使うストレージサービスをカスタマイズする

Rails 6.1以降のActive Storageフレームワークは、複数のストレージサービスをサポートしています。使うサービスを指定するには、MaintenanceTasks.active_storage_serviceコンフィグに、アプリケーションのconfig/storage.ymlで指定されているサービスのキーを設定できます。

# config/storage.yml

user_data:
  service: GCS
  credentials: <%= Rails.root.join("path/to/user/data/keyfile.json") %>
  project: "my-project"
  bucket: "user-data-bucket"

internal:
  service: GCS
  credentials: <%= Rails.root.join("path/to/internal/keyfile.json") %>
  project: "my-project"
  bucket: "internal-bucket"
# config/initializers/maintenance_tasks.rb

MaintenanceTasks.active_storage_service = :internal

アプリケーションで利用するストレージサービスが1種類のみの場合は、このコンフィグオプションは不要です。その場合、Rails.configuration.active_storage.serviceがデフォルトで使われます。

🔗 バックトレースクリーナーをカスタマイズする

MaintenanceTasks.backtrace_cleanerコンフィグは、タスクのエラー時にバックトレースをクリーンアップして永続化するのに使うバックトレースクリーナーを指定するのに使えます。ActiveSupport::BacktraceCleanerを使う必要があります。

# config/initializers/maintenance_tasks.rb

cleaner = ActiveSupport::BacktraceCleaner.new
cleaner.add_silencer { |line| line =~ /ignore_this_dir/ }

MaintenanceTasks.backtrace_cleaner = cleaner

値を指定しない場合は、デフォルトでRails.backtrace_cleanerがバックトレースのクリーンアップに使われます。

🔗 Web UIで使われる親コントローラをカスタマイズする

MaintenanceTasks.parent_controllerコンフィグは、Web UIエンジンのすべてのコントローラに継承されるコントローラクラスを指定するのに使えます。

これにより、ApplicationController(または任意のコントローラー)に共通ロジックを持つアプリケーションは、イニシャライザでシンプルな割当を行うだけで、Web UIがそのロジックを継承する設定がオプションで可能になります。

# config/initializers/maintenance_tasks.rb

MaintenanceTasks.parent_controller = "Services::CustomController"

# app/controllers/services/custom_controller.rb

class Services::CustomController < ActionController::Base
  include CustomSecurityThings
  include CustomLoggingThings
  # ...
end

親コントローラの値は、必ず既存のコントローラクラスに対応する文字列でなければならず、その既存のコントローラはActionController::Base必ず継承していなければなりません

値が指定されていない場合は、デフォルトで"ActionController::Base"が使われます。

🔗 タスクが詰まっていると判断するまでの時間を設定する

タスクが更新されていない場合に、タスクが詰まっているとみなすまでのタイムアウト時間を設定するには、MaintenanceTasks.stuck_task_durationコンフィグを使えます。この時間を設定するときは、ジョブインフラストラクチャのイベントがメンテナンスタスクのジョブ実行を妨げたりタスクをキャンセルしたりする分の時間も考慮して盛り込んでおく必要があります。

MaintenanceTasks.stuck_task_durationの値にはActiveSupport::Duration型を必ず使う必要があります
値が指定されていない場合は、デフォルトで5分が設定されます。

🔗 メタデータ

MaintenanceTasks.metadataコンフィグは、実行に関する追加情報を取得するprocを指定するのに使えます。このprocはMaintenanceTasks.parent_controllerのコンテキストで実行されるので、メンテナンスタスクを実行したユーザーのIDやメールアドレスを取得するのに利用できます。

# config/initializers/maintenance_tasks.rb
MaintenanceTasks.metadata = ->() do
  { user_email: current_user.email }
end

🔗 アップグレード方法

maintenance_tasksの新バージョンをチェックして更新するには、bundlerを使います。新バージョンをインストールした後は、インストールコマンドを再度実行すること。

bin/rails generate maintenance_tasks:install

これにより、新しいマイグレーションもインストールされて実行可能になります。

🔗 メンテナンスタスクの古いマイグレーションを削除してしまった場合の対応方法

maintenance_tasksのインストールコマンドは、古いマイグレーションの再インストールを試みるため、古いマイグレーションが削除されていると、マイグレーション中にデータベースで問題が発生します。

対応方法: bin/rails maintenance_tasks:install:migrationsを実行して、maintenance_tasksのマイグレーションをdb/migrateディレクトリにコピーしてください。
次に、maintenance_tasksに新しいマイグレーションが追加されていないかどうかをリリースノートで確認し、新しいマイグレーションがあれば残し、既に実行済みの古いマイグレーションは削除します。

以上の確認が終わったら、bin/rails db:migrateコマンドでマイグレーションを実行します。

🔗 貢献方法

私たちは、問題報告用のissueや、プルリクエストによるコードへの貢献を受け付けています。貢献方法についてはCONTRIBUTING.mdのガイドラインを参照してください。

🔗 新バージョンのリリースについて

プルリクエストがマージされると、GitHub上の最新ドラフトリリースに追加されます。

新バージョンのリリース準備ができたら、以下の手順を実行します。

  • maintenance_tasks.gemspecファイルのspec.versionフィールドを更新します。
  • bundle installを実行して、Gemfile.lockのgemバージョンを上げます。
  • プルリクエストをオープンし、承認を経てからマージします。
  • Shipit経由でデプロイし、rubygems.orgに新バージョンが表示されていることを確認します。
  • リリースノートにすべてのChangelogが記載されていることを確認してから、新バージョンを公開します。
  • 次のリリースに備えて、“Upcoming Release”というタイトルのドラフトリリースをGitHub上に新規作成します。タグバージョンは空欄のままで構いません。これは、次期リリースに関連する変更をドキュメント化するときの開始ポイントになります。

関連記事

Solid Queue README -- DBベースのActive Jobバックエンド(翻訳)

Rails: Mission Control Jobs gem README(翻訳)

Rails: Active Jobスタイルガイド(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。