Rails: 競合状態のテストを再現可能にするConcurrent::CyclicBarrierと"Seam"の概念(翻訳)
ペアプログラミング中に注文を分割する機能を実装していて、アドバイザリーロック(advisory lock: 強制力を持たないロック、勧告的ロックなどとも)を使って競合状態を防ぎたいと思いました。この種のセッションを扱うときに当然思い浮かぶ疑問のひとつといえば、「このロックが本当に必要かどうかをどうやって判定するか?」です。
当初、ロックなしの場合は失敗し、ロックありの場合はパスするテストを書けばよさそうだと高をくくっていました。
しかし実際は、言うは易く行うは難しというヤツでした。
🔗 テスト対象にしたいコード
このコードで重要なのは、ほぼ同時に呼び出されるプロジェクション(projection: ドメイン駆動開発やイベントソーシングの用語)の部分です。これによって、アイテムリストが2つの異なる注文(order)に移動されます。
def perform(event)
case event
when OrderSplitIntoTwo
source_order_id = event.data[:source_order_id]
target_order_id = event.data[:target_order_id]
ApplicationRecord.with_advisory_lock('transfer_items', source_order_id) do
items = projection.items_in_order(source_order_id) # 移動すべきアイテムを算出する
transfer(items, source_order_id, target_order_id)
end
end
end
このロックは、以下のシナリオを保護します。
- 2つのスレッドが同一のプロジェクションステートを読み取り、両方が同一のアイテムリストを異なる注文に移動する
- ロックがない場合は、同じアイテムリストから複数の注文に移動される可能性がある
しかし、このロックがテストで必要かどうかをどうやって証明すればよいのでしょうか?
試行その1: スレッド同士をConcurrent::CyclicBarrierで同期するためにコンカレントに実行する
it 'prevents duplicate transfers' do
barrier = Concurrent::CyclicBarrier.new(2)
threads = [
Thread.new do
barrier.wait(1);
subject.perform(split_event_1)
end,
Thread.new do
barrier.wait(1);
subject.perform(split_event_2)
end,
]
threads.each(&:join)
items_in_target_1 = order_items(target_order_1_id)
items_in_target_2 = order_items(target_order_2_id)
expect(items_in_target_1 & items_in_target_2).to be_empty
end
訳注
wait(1)は、タイムアウト1秒で待機するという意味です。
concurrent-rubyのConcurrent::CyclicBarrierは、競合状態(race condition)のテストに適したクラスです。いつもなら、このクラスを使って最初にシンプルな方法を試して、必要な結果を得るところです。
しかし、このテストは期待通りに失敗してくれませんでした。要するに、競合状態が発生する正確なタイミングでマッチしなかったのです。
「そんなのはsleepでタイミングをちょっぴり遅らせればいいのでは?」と考える人もいるかもしれません。
しかしここでsleepを使うと、むしろ事態は悪化します。テストが遅くなり、しかも壊れやすくなるのです。
CIで落ちたり落ちなかったりするテストほどむかつくものはありません。そういうわけで、sleepを使う方向には進みたくありませんでした。
🔗 洞察「"Seam"があるならそれを使おう」
説明しましょう。Seamとは、Michael Feathersの有名な著書『Working Effectively with Legacy Code』で解説されている概念です。
seam(=継ぎ目、縫い目)とは、その部分を変更せずにプログラム内での振る舞いを改変できる場所のことである。
私たちのコードを見てみると、seamになっているのはprojectionメソッドの部分です。
def projection
@projection ||= OrderItemsProjection.new
end
このprojectionメソッドこそが決め手です。
テストでは、このprojectionを「スレッドを同期する形で制御するバージョン」にスタブ化します。このときproduction用のコードは変更しません。
🔗 ソリューション: "seam"を活用して同期を注入する
ここに大事な洞察があります。
競合が発生するのは、2つのスレッドが同一のプロジェクションステートを読み出し、次に2つのスレッドが移動のための書き込みを行おうとしたときなのです。つまり、2つのスレッドは以下のように振る舞う必要があります。
- プロジェクションを読み出す
- お互いの読み出し完了を待つ(ここが同期ポイント)
- お互いの読み出し同期が完了したら、どちらが先に移動するかを争う
Concurrent::CyclicBarrierは、まさにそのための同期用プリミティブを提供してくれます。
it 'fails without advisory lock, proving it is needed' do
barrier = Concurrent::CyclicBarrier.new(2)
shared_projection = OrderItemsProjection.new
original_method = shared_projection.method(:items_in_order)
# ここでプロジェクションの読み出しをインターセプトすることで2つのスレッドを同期する
allow(shared_projection).to receive(:items_in_order) do |order_id|
items = original_method.call(order_id)
# 2つのスレッドは読み出し完了後にここで合流する
barrier.wait(1)
items
end
subject_1 = TransferItems.new
subject_2 = TransferItems.new
# "seam"を使って制御プロジェクションを注入
allow(subject_1).to receive(:projection).and_return(shared_projection)
allow(subject_2).to receive(:projection).and_return(shared_projection)
results = Concurrent::Array.new
threads = [
Thread.new { subject_1.perform(split_event_1); results << :success_1 },
Thread.new { subject_2.perform(split_event_2); results << :success_2 }
]
threads.each(&:join)
items_in_target_1 = order_items(target_order_1_id)
items_in_target_2 = order_items(target_order_2_id)
expect(results.size).to eq(2) # 2つのスレッドが処理を確実に完了したことを確認する
expect(items_in_target_1 & items_in_target_2).to be_empty # これがビジネス上で欲しいステート
end
🔗 このシナリオを応用する場合の注意
このケースは、注文(order)間のアイテムリストを移動するときの競合状態に関連していました。ここで重要なのは、適切なtarget_idを算出してアイテムを「1回だけ」移動することです。
しかし、このパターンは他のケースでも使える可能性があります(データベースへのコンカレントな書き込みをテストするときなど)。その場合は、必ずテストをuses_transactionで囲んでおくこと。
uses_transactionメソッドを使うと、そのテストはトランザクションで囲まれなくなるので、テスト後のクリーンアップは自分で行う必要があることもお忘れなく。
uses_transaction('to prove advisory lock is needed')
it 'prevents duplicate transfers' do
barrier = Concurrent::CyclicBarrier.new(2)
threads = [
Thread.new do
barrier.wait(1);
subject.perform(split_event_1)
end,
Thread.new do
barrier.wait(1);
subject.perform(split_event_2)
end,
]
threads.each(&:join)
items_in_target_1 = order_items(target_order_1_id)
items_in_target_2 = order_items(target_order_2_id)
expect(items_in_target_1 & items_in_target_2).to be_empty
end
🔗 CyclicBarrierでスレッドを同期するしくみ
CyclicBarrierのカウントの初期値は2です(同期したいスレッドの個数)。この場合、以下のように処理が進められます。
- スレッド1は、移動すべきアイテムリストを読み込む
- スレッド1は、
wait(1)を呼び出して処理をブロックし、他方のスレッドが読み出しを完了するまで待つ - スレッド2は、移動すべきアイテムリストを読み込む
- スレッド2は、
wait(1)を呼び出す。
ここでバリアのカウントが満たされる - 2つのスレッドは同時に解放され、同一のプロジェクションステートを用いて処理を進める
- 2つのスレッドは、どちらが先に移動するかを争う
これにより、競合状態が決定論的に推移するようになるので、再現可能になります。
🔗 このパターンがうまくいく理由
productionコードを1行たりとも変更せずに、以下の方法で実現しました。
projectionメソッド(プロジェクションのインスタンスを返す)が"seam"であることを突き止めたprojectionメソッドの実装をスタブ化によって変更した- 同期のロジックを注入した(
CyclicBarrierで調整) - 競合が決定論的になり、2つのスレッドが同一のステートを参照したことが保証された
productionコードは何も足さず何も引かれず、クリーンなままです。このテストには、フックもデバッグフラグも条件分岐もありません。
🔗 証明
アドバイザリーロックの部分をコメントアウトしてみます。
# ApplicationRecord.with_advisory_lock('transfer_items', source_order_id) do
items = projection.items_in_order(source_order_id)
transfer(items, source_order_id, target_order_id)
# end
これでテストを実行してみましょう。
Failure/Error: expect(items_in_target_1 & items_in_target_2).to be_empty
expected: []
got: ["item_A", "item_B"]
(items appeared in both target orders - race condition detected!)
# 2つのターゲットorderに同じアイテムがある: 競合状態を検出!
完璧です。このロックが不可欠であることがテストで証明されました。
🔗 まとめ
競合状態をテストするために、テスト対象のコードを強制的に同期する必要が生じることがあります。しかし私はそのステートを実現するためにproductionコードを改変するのは好きではありません。それよりも、競合状態を露出させるためにコードをごくわずか変更するだけにとどめるのが好きです。
CyclicBarrierは、スレッドを調整することで競合状態を再現可能にする方法の1つです。
このパターンは、あらゆるプロジェクションベースの競合状態をテストするのに利用できます。"seam"を発見してDI(dependency injection: 依存性の注入)やメソッド切り出しなどを行い、そこに同期を注入することで、コンカレンシーの保護が適切に機能していることを証明できます。
概要
元サイトの許諾を得て翻訳・公開いたします。
日本語タイトルは内容に即したものにしました。
参考: ロック (計算機科学) - Wikipedia