Tech Racho エンジニアの「?」を「!」に。
  • 開発

RailsアプリでConcurrent Rubyを使う(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

RailsアプリでConcurrent Rubyを使う(翻訳)

Concurrent Rubyは、関数型言語の興味深いアイデアや古典的なコンカレンシーパターンを多数取り入れて、それを基礎として構築されたコンカレンシーツールキットです。Concurrent RubyはActive Support経由で既にRailsに導入されているので、Railsアプリでスレッド化コードを書くなら他のものを探す必要はありません。

Concurrent::Futureを使う

私たちのアプリのひとつで、パフォーマンス改善の目的でConcurrent::Futureを用いてスレッド化コードを追加しました。結果は実に上々でしたが、ある日突如として動かなくなったのです。

「なぜスレッドが必要になったのか?」と思われるかも知れません。このときに問題になったコードは、教科書にもあるスレッディングのユースケースでした。そのコードはAPI呼び出しやDBリクエストをいくつか含み、最終的にかき集められた全データに対して操作を行うというものでした。

とりあえずコードを見てみることにしましょう。

スレッド化されていないコード

selected_shipping_companies.each do | carrier |
  # api呼び出し
  distance_in_miles = find_distance_from_origin_to_destination
  historical_average_rate = historical_average_for_this_particular_carrier

  # 操作の実行
  build_price_details_for_this_carrier(distance_in_miles,
                                       historical_average_rate)
end

上のコードをConcurrent::Futureでカバーするのは簡単です

futures = selected_shipping_companies.map do |carrier|
  Concurrent::Future.execute do
    # api呼び出し
    distance_in_miles = find_distance_from_origin_to_destination
    historical_average_rate = historical_average_for_this_particular_carrier

    # 操作の実行
    build_price_details_for_this_carrier(distance_in_miles,
                                         historical_average_rate)
  end
end

futures.map(&:value)

Concurrent::Futureについての補足

スレッドの利用については二の足を踏むことがよくあります。スレッドによってコードが複雑になりますし、スレッド安全性を確保できていないと思わぬ振る舞いを示す可能性もあるからです。Rubyは参照がミュータブルな言語なので、100%のスレッド安全性の確保は何かと困難を伴います。

Concurrent::Futureはスレッド安全性を保証するシンプルなしくみで、Clojure言語のFuture関数にヒントを得ています。Concurrent::Futureは実行したいブロックを1つ取り、Concurrent Rubyのグローバルスレッドプールを用いて非同期実行します。Concurrent Rubyはこのfutureを扱えるようにし、#value(または#deref)が呼び出されるとブロックの値を返します。

問題のバグ

通常であれば、メインスレッドで例外が発生するとRubyインタプリタが停止して例外データを収集します。RubyのThreadの場合、Thread#joinが呼び出された場合にのみunhandled exceptionが発生します。この場合Thread#abort_on_exceptiontrueに設定する方が、実行中のどのスレッドで例外が発生してもすべてのスレッドが終了するので良好な結果になります。このあたりについては最近公開したBigBinaryブログで詳しく解説されています。

Concurrent Rubyで例外を扱う方法

future = Concurrent::Future.execute {
            raise StandardError.new("Boom!")
          }

sleep(0.1) # futureを実行する時間を任意に設定

future.value     #=> nil

例外はどこに行ってしまうのでしょうか。実はこのコードは例外を発生せずに落ちてしまいます。コードの実行に成功したかどうかをどうやって検出すればよいでしょうか?

future = Concurrent::Future.execute {
              raise StandardError.new("Boom!")
          }

sleep(0.1) # futureを実行する時間を任意に設定

future.value     #=> nil

future.rejected? #=> true
future.reason    #=> "#<StandardError: Boom!>"

問題の修正方法

訳注: このラッパークラスについては末尾の「追伸」もご覧ください。

私たちは、アプリでConcurrent::Futureが使われている場所を見つけました。例外はそこで飲み込まれているようです。さらに、例外を明示的に手動で出力する必要性を多くの人が見落としてしまう可能性も残されています。この懸念点を解消するため、以下のラッパークラスを用いました。

module ConcurrentExecutor
  class Error < StandardError
    def initialize(exceptions)
      @exceptions = exceptions
      super
    end

    def message
      @exceptions.map { | e | e.message }.join "\n"
    end

    def backtrace
      traces = @exceptions.map { |e| e.backtrace }
      ["ConcurrentExecutor::Error START", traces, "END"].flatten
    end
  end

  class Future
    def initialize(pool: nil)
      @pool = pool || Concurrent::FixedThreadPool.new(20)
      @exceptions = Concurrent::Array.new
    end

    # Sampleの使い方
    # executor = ConcurrentExecutor::Future.new(pool: pool)
    # executor.execute(carriers) do | carrier |
    #   ...
    # end
    #
    # values = executor.resolve

    def execute array, &block
      @futures = array.map do | element |
        Concurrent::Future.execute({ executor: @pool }) do
          yield(element)
        end.rescue do | exception |
          @exceptions << exception
        end
      end

      self
    end

    def resolve
      values = @futures.map(&:value)

      if @exceptions.length > 0
        raise ConcurrentExecutor::Error.new(@exceptions)
      end

      values
    end
  end
end

ここで注意が必要なのは、Concurrent RubyのFutureを使うとCircle CIでspecを実行中にsegmentation faultが発生することです。私たちはしばらくの間Circle CIでFutureをやめて通常のループに切り替え、その間にsegfaultの原因を切り分けて修正しました。

追伸

Concurrent::FutureのAPIには、ブロックの値を返すだけでなく、メインスレッドで発生した例外をraiseする機能もあります。

thread_pool = Concurrent::FixedThreadPool.new(20)
executors = [1, 2, 3, 4].map do |random_number|
  Concurrent::Future.execute({ executor: thread_pool }) do
    random_number / (random_number.even? ? 0 : 1)
  end
end

executors.map(&:value)
=> [1, nil, 3, nil]

executors.map(&:value!)

> ZeroDivisionError: divided by 0
> from (pry):4:in `/'

ドキュメントに記載されていないAPIについてreddit書き込みで指摘を下さったJonathan Rochkindに感謝いたします。

訳注: jrochkind氏はredditで、#value!を用いることで上述のラッパークラスが不要になると指摘しています。

関連記事

Ruby/Railsのプロ開発者としての5年間を振り返る(翻訳)

Railsのフラグメントキャッシュを分解調査する(翻訳)


CONTACT

TechRachoでは、パートナーシップをご検討いただける方からの
ご連絡をお待ちしております。ぜひお気軽にご意見・ご相談ください。