Tech Racho エンジニアの「?」を「!」に。
  • Ruby / Rails関連

Rails: Active Recordコールバックを使わずにカウンタキャッシュを更新する(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

日本語タイトルは内容に即したものにしました。

Rails: Active Recordコールバックを使わずにカウンタキャッシュを更新する(翻訳)

本記事では、Active Recordと好みのSQLデータベースで、データベーストリガーを「集計されたデータの一貫性を維持するツール」として使う実験を行いました。フィルタや検索でElasticSearchのような洗練されたツールを使うのではなく、データベースですぐ使える機能をいくつか使って同じ結果を得られるシンプルなアプローチをデモします。ボーナスとして、いまいましい競合状態を回避する方法についても解説します!

集計された値をいくつか用いて、データベース内のレコードをソートまたはフィルタする必要が生じることがあります。たとえば、管理画面でユーザーリストのページネーション機能を作り込んでいる場合や、注文数とユーザーがそれまで使った総額でフィルタする機能を実装したい場合が考えられます。そうしたツールはElasticSearchをはじめいろいろありますが、わずか数カラムを処理するためだけに大げさな検索エンジンとそれに必要なインフラをセットアップするのは少々やりすぎ感があります。もっと素直な方法を見つけましょう!

本記事のコード例はすべてgistでご覧いただけます。

トリガーを引く指

本記事ではすべてのコード例でPostgreSQLを利用します。

以下のデータモデルで考えてみましょう。

ActiveRecord::Schema.define do
  create_table "orders", force: :cascade do |t|
    t.bigint "user_id", null: false
    t.decimal "amount"
    t.datetime "created_at", precision: 6, null: false
    t.datetime "updated_at", precision: 6, null: false
    t.index ["user_id"], name: "index_orders_on_user_id"
  end

  create_table "users", force: :cascade do |t|
    t.datetime "created_at", precision: 6, null: false
    t.datetime "updated_at", precision: 6, null: false
  end

  add_foreign_key "orders", "users"
end

class User < ActiveRecord::Base
  has_many :orders
end

class Order < ActiveRecord::Base
  belongs_to :user
end

ユーザーを注文の総額でフィルタしてページネーションする方法を考えてみましょう。素のSQL文なら簡単にやれますが、たちまちパフォーマンスの問題が発生するでしょう。このことを示すために、ユーザー数10,000人と注文数100,000件をデータベースに入れて、explainを実行します。

User.insert_all(10_000.times.map { { created_at: Time.now, updated_at: Time.now } })

Order.insert_all(
  10_000.times.map do
    {
      user_id: rand(1...1000),
      amount: rand(1000) / 10.0,
      created_at: Time.now,
      updated_at: Time.now
    }
  end
)

ActiveRecord::Base.connection.execute <<~SQL
  EXPLAIN ANALYZE SELECT users.id, SUM(orders.amount), COUNT(orders.id)
  FROM users JOIN orders ON orders.user_id = users.id
  GROUP BY users.id
  HAVING SUM(orders.amount) > 100 AND COUNT(orders.id) > 1
  ORDER BY SUM(orders.amount)
  LIMIT 50
SQL

上の結果は以下のような感じになるでしょう。

Limit  (cost=3206.16..3206.29 rows=50 width=48) (actual time=59.737..59.746 rows=50 loops=1)
  ->  Sort  (cost=3206.16..3208.95 rows=1116 width=48) (actual time=59.736..59.739 rows=50 loops=1)
        Sort Key: (sum(orders.amount))
        Sort Method: top-N heapsort  Memory: 31kB
        ->  HashAggregate  (cost=2968.13..3169.09 rows=1116 width=48) (actual time=59.103..59.452 rows=1000 loops=1)
              Group Key: users.id
              Filter: ((sum(orders.amount) > '100'::numeric) AND (count(orders.id) > 1))
              ->  Hash Join  (cost=290.08..2050.73 rows=73392 width=48) (actual time=2.793..37.022 rows=100000 loops=1)
                    Hash Cond: (orders.user_id = users.id)
                    ->  Seq Scan on orders  (cost=0.00..1567.92 rows=73392 width=48) (actual time=0.011..11.650 rows=100000 loops=1)
                    ->  Hash  (cost=164.48..164.48 rows=10048 width=8) (actual time=2.760..2.760 rows=10000 loops=1)
                          Buckets: 16384  Batches: 1  Memory Usage: 519kB
                          ->  Seq Scan on users  (cost=0.00..164.48 rows=10048 width=8) (actual time=0.006..1.220 rows=10000 loops=1)
Planning Time: 0.237 ms
Execution Time: 64.151 ms

データベースが育つとさらに多くの時間がかかるようになりますので、うまくスケールする、よりよいソリューションを見つける必要があります。データベースの正規化レベルを下げて、orders_amountを別のuser_statsテーブルに保存してみましょう。

上の実装例はgistで1ファイルにまとまったものをご覧いただけます。

class CreateUserStats < ActiveRecord::Migration[6.0]
  def change
    create_table :user_stats do |t|
      t.integer :user_id, null: false, foreign_key: true
      t.decimal :orders_amount
      t.integer :orders_count

      t.index :user_id, unique: true
    end
  end
end

このとき、orders_countorders_amountの同期を維持する方法を決定しておくべきです。ここでは(マイグレーションなどで)素のSQL文を用いてデータを変更した場合でもuser_statsが正しく更新されるようにしたいので、Active Recordコールバックはそうした操作を行う場所としてはふさわしくなさそうです。Railsには一応counter_cacheオプションも組み込まれていますが、orders_amountには通用しません。トリガーを引いて助けましょう!

トリガーとは、テーブルでINSERTやUPDATEやDELETEが実行されたときに自動的に呼び出される関数です。

Railsアプリでトリガーを利用するには、hair_trigger gemやfx gemを使う方法もありますし、手作りする方法もあります。本記事のコード例ではhair_trigger gemを用いています。このgemは、SQLプロシージャの最新版のみを用いてトリガー更新するマイグレーションを生成できます。

ここでご注意ください!hair_trigger gemはRails 6とZeitwerkで既知の問題が発生します(#84)。もしこの問題を踏んだら、とりあえず私のfork版をご自由にお使いください。修正が済んだら元に戻すのをお忘れなく(訳注: 現時点では#84はまだオープンされています)。

訳注: hair triggerは「引き金が軽い」、転じて「気が短い」ことを表す慣用表現です。

それではOrderモデルにトリガーを追加しましょう。ここで実行したいトリガーはUPSERTです。つまりマッチするuser_iduser_statsにない場合は新しい行を1行追加し、それ以外の場合は既存の行を更新します(user_idには必ずunique制約をかけておくこと)。

class Order < ActiveRecord::Base
  belongs_to :user

  trigger.after(:insert) do
    <<~SQL
      INSERT INTO user_stats (user_id, orders_amount, orders_count)
      SELECT
        NEW.user_id as user_id,
        SUM(orders.amount) as orders_amount,
        COUNT(orders.id) as orders_count
      FROM orders WHERE orders.user_id = NEW.user_id
      ON CONFLICT (user_id) DO UPDATE
      SET
        orders_amount = EXCLUDED.orders_amount,
        orders_count = EXCLUDED.orders_count;
    SQL
  end
end

そしてrake db:generate_trigger_migrationを実行してマイグレーションを生成し、rails db:migrateでマイグレーションを実行してからアプリを実行します。

競合状態にまっしぐら

訳注: 原文見出しのoff to the racesは「素早く行動する」という慣用表現を競合状態にかけています。

どうやらうまく動いているようですが、複数の注文をパラレルにINSERTしようとしたらどうなるでしょう?(以下のコードはrakeタスクとしても実行できますし、私の実装をgistでご覧いただくこともできます)

user = User.create

threads = []

4.times do
  threads << Thread.new(user.id) do |user_id|
    user = User.find(user_id)
    user.orders.create(amount: rand(1000) / 10.0)
  end
end

threads.each(&:join)

inconsistent_stats = UserStat.joins(user: :orders)
                             .where(user_id: user.id)
                             .having("user_stats.orders_amount <> SUM(orders.amount)")
                             .group("user_stats.id")

if inconsistent_stats.any?
  calculated_amount = UserStat.find_by(user: user).orders_amount
  real_amount = Order.where(user: user).sum(:amount).to_f

  puts
  puts "Race condition detected:"
  puts "calculated amount: #{calculated_amount}"
  puts "real amount: #{real_amount}."
else
  puts
  puts "Data is consistent."
end

上のコードは競合状態が発生する可能性が非常に高くなっています。その理由がおわかりでしょうか?問題は、このトリガーが現在のトランザクションの「内部」で実行されることと、デフォルトのトランザクション分離レベルREAD COMMITTEDになっていることです。この分離レベルでは競合状態を扱えません。

PostgreSQLでは「READ UNCOMMITTED」「READ COMMITTED」「REPEATABLE READ」「SERIALIZABLE」という4つのトランザクション分離レベルをサポートしています。

ここで明らかなソリューションは、分離レベルをより厳密なSERIALIZABLEにすることですが、残念ながら実行中のトランザクションの内部では分離レベルを臨時に切り替えられません。注文を扱うたびに明示的に新しいトランザクションを発行するのは正しい方法とは思えないので、トリガーを常にシーケンシャルに実行するための別のアプローチを試してみましょう。つまりadvisory lock(勧告的ロック)を使います。

ここで必要な変更は、プロシージャコードの冒頭にPERFORM pg_advisory_xact_lock(NEW.user_id);を追加するだけです。

class Order < ActiveRecord::Base
  belongs_to :user

  trigger.after(:insert) do
    <<~SQL
      PERFORM pg_advisory_xact_lock(NEW.user_id);

      INSERT INTO user_stats (user_id, orders_amount, orders_count)
      SELECT
        NEW.user_id as user_id,
        SUM(orders.amount) as orders_amount,
        COUNT(orders.id) as orders_count
      FROM orders WHERE orders.user_id = NEW.user_id
      ON CONFLICT (user_id) DO UPDATE
      SET
        orders_amount = EXCLUDED.orders_amount,
        orders_count = EXCLUDED.orders_count;
    SQL
  end
end

速くなりましたね!gistにある更新版のコードを実行していただければ、競合状態が解消されてアプリがパラレルなリクエストを扱えるようになったことを確認できます。今度はuser_statsテーブルのorders_amountカラムにインデックスを追加してから、クエリを変更してパフォーマンスを比較してみましょう。

EXPLAIN ANALYZE SELECT user_id, orders_amount, orders_count
FROM user_stats
WHERE orders_amount > 100 AND orders_count > 1
ORDER BY orders_amount
LIMIT 50

Limit  (cost=0.29..22.99 rows=50 width=40) (actual time=0.059..11.241 rows=50 loops=1)
  ->  Index Scan using index_user_stats_on_orders_amount on user_stats  (cost=0.29..3438.69 rows=7573 width=40) (actual time=0.058..11.2 rows=50 loops=1)
        Index Cond: (orders_amount > '100'::numeric)
        Filter: (orders_count > 1)
Planning Time: 0.105 ms
Execution Time: 11.272 ms

ロックを使わない方法

実は、ロックを使わずに同じ結果を得られ、しかも高速に動く方法があるのです(提案してくれたのはSergey Ponomarevです)。それは以下のようにdeltaを使う方法です。

class Order < ActiveRecord::Base
  belongs_to :user

  trigger.after(:insert) do
    <<~SQL
      INSERT INTO user_stats (user_id, orders_amount, orders_count)
      SELECT
        NEW.user_id as user_id,
        NEW.amount as orders_amount,
        1 as orders_count
      ON CONFLICT (user_id) DO UPDATE
      SET
        orders_amount = user_stats.orders_amount + EXCLUDED.orders_amount,
        orders_count = user_stats.orders_count + EXCLUDED.orders_count;
    SQL
  end
end

ポイントはサブクエリを一切用いていないことで、これによって競合状態は発生しなくなります。さらにボーナスとして、新しいレコードをINSERTするときのパフォーマンスも向上します。このアプローチは、本記事で説明しているようなシンプルなケースでは有用なこともありますが、もっと込み入ったロジックを扱う場合は最終的にロックに頼らざるを得なくなるかもしれません(例: 注文にステータスがあると、注文のカウントをステータスごとにキャッシュして注文を更新可能にする必要がある)。

完全な実装についてはgistをご覧ください。

UPSERTの代わりにループを使う

先の例ではPostgreSQL 9.5で導入されたUPSERTを使いましたが、それより前のバージョンではどうなるのでしょう?ここでトリガーの動作をおさらいしてみましょう。トリガーはuser_statsテーブルへの新しい行のINSERTを試み、コンフリクトが発生したら既存の行を更新します。現実のアプリケーションでは「ほとんどの場合で」コンフリクトが発生します(正確に言うと、INSERTはユーザーごとに「1回しか発生しません」)。この事実を利用すれば、トリガーを以下のように書き換えられます。

トリガー内でループするコード例についてはgistをご覧ください。

class Order < ActiveRecord::Base
  belongs_to :user

  trigger.after(:insert) do
    <<~SQL
      <<insert_update>>
      LOOP
        UPDATE user_stats
        SET orders_count = orders_count + 1,
            orders_amount = orders_amount + NEW.amount
        WHERE user_id = NEW.user_id;

        EXIT insert_update WHEN FOUND;

        BEGIN
          INSERT INTO user_stats (
            user_id, orders_amount, orders_count
          ) VALUES (
            NEW.user_id, 1, NEW.amount
          );

          EXIT insert_update;
        EXCEPTION
          WHEN UNIQUE_VIOLATION THEN
            -- do nothing
        END;
      END LOOP insert_update;
    SQL
  end
end

ここではロジックを反転させて、トリガーはまず既存の行のUPDATEを試み、失敗した場合は新しい行をINSERTしています。


集計データの扱いは面倒です。カウンタ(あるいはその他の)キャッシュが多数ある場合は、そのための特別なツールを使う方が理にかなっています。しかしシンプルなケースであれば、昔ながらのデータベーストリガでもやれるのです。設定が正しければ、相当なパフォーマンスを発揮してくれます。

本記事の翻訳や転載についてのご相談は、まずメールにてお願いします。

Evil Martiansでは、火星人流の製品開発およびご相談を承ります。

関連記事

Rails向け高機能カウンタキャッシュ gem「counter_culture」README(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。