概要
原著者の許諾を得て翻訳・公開いたします。
- 英語記事: Using Concurrent Ruby in a Ruby on Rails Application
- 原文公開日: 2018/06/08
- 著者: Midhun Krishna
- サイト: BigBinary
RailsアプリでConcurrent Rubyを使う(翻訳)
Concurrent Rubyは、関数型言語の興味深いアイデアや古典的なコンカレンシーパターンを多数取り入れて、それを基礎として構築されたコンカレンシーツールキットです。Concurrent RubyはActive Support経由で既にRailsに導入されているので、Railsアプリでスレッド化コードを書くなら他のものを探す必要はありません。
Concurrent::Future
を使う
私たちのアプリのひとつで、パフォーマンス改善の目的でConcurrent::Future
を用いてスレッド化コードを追加しました。結果は実に上々でしたが、ある日突如として動かなくなったのです。
「なぜスレッドが必要になったのか?」と思われるかも知れません。このときに問題になったコードは、教科書にもあるスレッディングのユースケースでした。そのコードはAPI呼び出しやDBリクエストをいくつか含み、最終的にかき集められた全データに対して操作を行うというものでした。
とりあえずコードを見てみることにしましょう。
スレッド化されていないコード
selected_shipping_companies.each do | carrier |
# api呼び出し
distance_in_miles = find_distance_from_origin_to_destination
historical_average_rate = historical_average_for_this_particular_carrier
# 操作の実行
build_price_details_for_this_carrier(distance_in_miles,
historical_average_rate)
end
上のコードをConcurrent::Future
でカバーするのは簡単です
futures = selected_shipping_companies.map do |carrier|
Concurrent::Future.execute do
# api呼び出し
distance_in_miles = find_distance_from_origin_to_destination
historical_average_rate = historical_average_for_this_particular_carrier
# 操作の実行
build_price_details_for_this_carrier(distance_in_miles,
historical_average_rate)
end
end
futures.map(&:value)
Concurrent::Future
についての補足
スレッドの利用については二の足を踏むことがよくあります。スレッドによってコードが複雑になりますし、スレッド安全性を確保できていないと思わぬ振る舞いを示す可能性もあるからです。Rubyは参照がミュータブルな言語なので、100%のスレッド安全性の確保は何かと困難を伴います。
Concurrent::Future
はスレッド安全性を保証するシンプルなしくみで、Clojure言語のFuture関数にヒントを得ています。Concurrent::Future
は実行したいブロックを1つ取り、Concurrent Rubyのグローバルスレッドプールを用いて非同期実行します。Concurrent Rubyはこのfutureを扱えるようにし、#value
(または#deref
)が呼び出されるとブロックの値を返します。
問題のバグ
通常であれば、メインスレッドで例外が発生するとRubyインタプリタが停止して例外データを収集します。RubyのThreadの場合、Thread#join
が呼び出された場合にのみunhandled exceptionが発生します。この場合Thread#abort_on_exception
をtrue
に設定する方が、実行中のどのスレッドで例外が発生してもすべてのスレッドが終了するので良好な結果になります。このあたりについては最近公開したBigBinaryブログで詳しく解説されています。
Concurrent Rubyで例外を扱う方法
future = Concurrent::Future.execute {
raise StandardError.new("Boom!")
}
sleep(0.1) # futureを実行する時間を任意に設定
future.value #=> nil
例外はどこに行ってしまうのでしょうか。実はこのコードは例外を発生せずに落ちてしまいます。コードの実行に成功したかどうかをどうやって検出すればよいでしょうか?
future = Concurrent::Future.execute {
raise StandardError.new("Boom!")
}
sleep(0.1) # futureを実行する時間を任意に設定
future.value #=> nil
future.rejected? #=> true
future.reason #=> "#<StandardError: Boom!>"
問題の修正方法
訳注: このラッパークラスについては末尾の「追伸」もご覧ください。
私たちは、アプリでConcurrent::Future
が使われている場所を見つけました。例外はそこで飲み込まれているようです。さらに、例外を明示的に手動で出力する必要性を多くの人が見落としてしまう可能性も残されています。この懸念点を解消するため、以下のラッパークラスを用いました。
module ConcurrentExecutor
class Error < StandardError
def initialize(exceptions)
@exceptions = exceptions
super
end
def message
@exceptions.map { | e | e.message }.join "\n"
end
def backtrace
traces = @exceptions.map { |e| e.backtrace }
["ConcurrentExecutor::Error START", traces, "END"].flatten
end
end
class Future
def initialize(pool: nil)
@pool = pool || Concurrent::FixedThreadPool.new(20)
@exceptions = Concurrent::Array.new
end
# Sampleの使い方
# executor = ConcurrentExecutor::Future.new(pool: pool)
# executor.execute(carriers) do | carrier |
# ...
# end
#
# values = executor.resolve
def execute array, &block
@futures = array.map do | element |
Concurrent::Future.execute({ executor: @pool }) do
yield(element)
end.rescue do | exception |
@exceptions << exception
end
end
self
end
def resolve
values = @futures.map(&:value)
if @exceptions.length > 0
raise ConcurrentExecutor::Error.new(@exceptions)
end
values
end
end
end
ここで注意が必要なのは、Concurrent RubyのFutureを使うとCircle CIでspecを実行中にsegmentation faultが発生することです。私たちはしばらくの間Circle CIでFutureをやめて通常のループに切り替え、その間にsegfaultの原因を切り分けて修正しました。
追伸
Concurrent::Future
のAPIには、ブロックの値を返すだけでなく、メインスレッドで発生した例外をraiseする機能もあります。
thread_pool = Concurrent::FixedThreadPool.new(20)
executors = [1, 2, 3, 4].map do |random_number|
Concurrent::Future.execute({ executor: thread_pool }) do
random_number / (random_number.even? ? 0 : 1)
end
end
executors.map(&:value)
=> [1, nil, 3, nil]
executors.map(&:value!)
> ZeroDivisionError: divided by 0
> from (pry):4:in `/'
ドキュメントに記載されていないAPIについてreddit書き込みで指摘を下さったJonathan Rochkindに感謝いたします。
訳注: jrochkind氏はredditで、
#value!
を用いることで上述のラッパークラスが不要になると指摘しています。