RailsアプリでConcurrent Rubyを使う(翻訳)

概要

原著者の許諾を得て翻訳・公開いたします。

RailsアプリでConcurrent Rubyを使う(翻訳)

Concurrent Rubyは、関数型言語の興味深いアイデアや古典的なコンカレンシーパターンを多数取り入れて、それを基礎として構築されたコンカレンシーツールキットです。Concurrent RubyはActive Support経由で既にRailsに導入されているので、Railsアプリでスレッド化コードを書くなら他のものを探す必要はありません。

Concurrent::Futureを使う

私たちのアプリのひとつで、パフォーマンス改善の目的でConcurrent::Futureを用いてスレッド化コードを追加しました。結果は実に上々でしたが、ある日突如として動かなくなったのです。

「なぜスレッドが必要になったのか?」と思われるかも知れません。このときに問題になったコードは、教科書にもあるスレッディングのユースケースでした。そのコードはAPI呼び出しやDBリクエストをいくつか含み、最終的にかき集められた全データに対して操作を行うというものでした。

とりあえずコードを見てみることにしましょう。

スレッド化されていないコード

selected_shipping_companies.each do | carrier |
  # api呼び出し
  distance_in_miles = find_distance_from_origin_to_destination
  historical_average_rate = historical_average_for_this_particular_carrier

  # 操作の実行
  build_price_details_for_this_carrier(distance_in_miles,
                                       historical_average_rate)
end

上のコードをConcurrent::Futureでカバーするのは簡単です

futures = selected_shipping_companies.map do |carrier|
  Concurrent::Future.execute do
    # api呼び出し
    distance_in_miles = find_distance_from_origin_to_destination
    historical_average_rate = historical_average_for_this_particular_carrier

    # 操作の実行
    build_price_details_for_this_carrier(distance_in_miles,
                                         historical_average_rate)
  end
end

futures.map(&:value)

Concurrent::Futureについての補足

スレッドの利用については二の足を踏むことがよくあります。スレッドによってコードが複雑になりますし、スレッド安全性を確保できていないと思わぬ振る舞いを示す可能性もあるからです。Rubyは参照がミュータブルな言語なので、100%のスレッド安全性の確保は何かと困難を伴います。

Concurrent::Futureはスレッド安全性を保証するシンプルなしくみで、Clojure言語のFuture関数にヒントを得ています。Concurrent::Futureは実行したいブロックを1つ取り、Concurrent Rubyのグローバルスレッドプールを用いて非同期実行します。Concurrent Rubyはこのfutureを扱えるようにし、#value(または#deref)が呼び出されるとブロックの値を返します。

問題のバグ

通常であれば、メインスレッドで例外が発生するとRubyインタプリタが停止して例外データを収集します。RubyのThreadの場合、Thread#joinが呼び出された場合にのみunhandled exceptionが発生します。この場合Thread#abort_on_exceptiontrueに設定する方が、実行中のどのスレッドで例外が発生してもすべてのスレッドが終了するので良好な結果になります。このあたりについては最近公開したBigBinaryブログで詳しく解説されています。

Concurrent Rubyで例外を扱う方法

future = Concurrent::Future.execute {
            raise StandardError.new("Boom!")
          }

sleep(0.1) # futureを実行する時間を任意に設定

future.value     #=> nil

例外はどこに行ってしまうのでしょうか。実はこのコードは例外を発生せずに落ちてしまいます。コードの実行に成功したかどうかをどうやって検出すればよいでしょうか?

future = Concurrent::Future.execute {
              raise StandardError.new("Boom!")
          }

sleep(0.1) # futureを実行する時間を任意に設定

future.value     #=> nil

future.rejected? #=> true
future.reason    #=> "#<StandardError: Boom!>"

問題の修正方法

訳注: このラッパークラスについては末尾の「追伸」もご覧ください。

私たちは、アプリでConcurrent::Futureが使われている場所を見つけました。例外はそこで飲み込まれているようです。さらに、例外を明示的に手動で出力する必要性を多くの人が見落としてしまう可能性も残されています。この懸念点を解消するため、以下のラッパークラスを用いました。

module ConcurrentExecutor
  class Error < StandardError
    def initialize(exceptions)
      @exceptions = exceptions
      super
    end

    def message
      @exceptions.map { | e | e.message }.join "\n"
    end

    def backtrace
      traces = @exceptions.map { |e| e.backtrace }
      ["ConcurrentExecutor::Error START", traces, "END"].flatten
    end
  end

  class Future
    def initialize(pool: nil)
      @pool = pool || Concurrent::FixedThreadPool.new(20)
      @exceptions = Concurrent::Array.new
    end

    # Sampleの使い方
    # executor = ConcurrentExecutor::Future.new(pool: pool)
    # executor.execute(carriers) do | carrier |
    #   ...
    # end
    #
    # values = executor.resolve

    def execute array, &block
      @futures = array.map do | element |
        Concurrent::Future.execute({ executor: @pool }) do
          yield(element)
        end.rescue do | exception |
          @exceptions << exception
        end
      end

      self
    end

    def resolve
      values = @futures.map(&:value)

      if @exceptions.length > 0
        raise ConcurrentExecutor::Error.new(@exceptions)
      end

      values
    end
  end
end

ここで注意が必要なのは、Concurrent RubyのFutureを使うとCircle CIでspecを実行中にsegmentation faultが発生することです。私たちはしばらくの間Circle CIでFutureをやめて通常のループに切り替え、その間にsegfaultの原因を切り分けて修正しました。

追伸

Concurrent::FutureのAPIには、ブロックの値を返すだけでなく、メインスレッドで発生した例外をraiseする機能もあります。

thread_pool = Concurrent::FixedThreadPool.new(20)
executors = [1, 2, 3, 4].map do |random_number|
  Concurrent::Future.execute({ executor: thread_pool }) do
    random_number / (random_number.even? ? 0 : 1)
  end
end

executors.map(&:value)
=> [1, nil, 3, nil]

executors.map(&:value!)

> ZeroDivisionError: divided by 0
> from (pry):4:in `/'

ドキュメントに記載されていないAPIについてreddit書き込みで指摘を下さったJonathan Rochkindに感謝いたします。

訳注: jrochkind氏はredditで、#value!を用いることで上述のラッパークラスが不要になると指摘しています。

関連記事

Ruby/Railsのプロ開発者としての5年間を振り返る(翻訳)

Railsのフラグメントキャッシュを分解調査する(翻訳)

デザインも頼めるシステム開発会社をお探しならBPS株式会社までどうぞ 開発エンジニア積極採用中です! Ruby on Rails の開発なら実績豊富なBPS

この記事の著者

hachi8833

Twitter: @hachi8833、GitHub: @hachi8833 コボラー、ITコンサル、ローカライズ業界、Rails開発を経てTechRachoの編集・記事作成を担当。 これまでにRuby on Rails チュートリアル第2版の半分ほど、Railsガイドの初期翻訳ではほぼすべてを翻訳。その後も折に触れてそれぞれ一部を翻訳。 かと思うと、正規表現の粋を尽くした日本語エラーチェックサービス enno.jpを運営。 実は最近Go言語が好き。 仕事に関係ないすっとこブログ「あけてくれ」は2000年頃から多少の中断をはさんで継続、現在はnote.muに移転。

hachi8833の書いた記事

週刊Railsウォッチ

インフラ

ActiveSupport探訪シリーズ