RubyはWeb界隈でよく使われてるスクリプト言語の中では比較的簡単に割と本格的なマルチスレッドプログラムを書くことができます。
うまく使うとI/O待ちで遅くなっているが、必ずしも順番に行う必要のないプログラムの実行時間を短縮したりすることができます。(たくさんのURLにアクセスしてレスポンスを取得してくるクロウラーなど)
しかし現在最も広く使われているRuby1.8ではスレッドの実装はいわゆるグリーンスレッドという実装で、OSによるスレッドを使えないため、マルチコアの恩恵を受けることができないので、大量のデータをマルチコアで処理する目的などで使っても効果が薄い(どころかまったくない)可能性が高いです。
Ruby1.9ではRubyでスレッドの動きを制御しているものの、ネイティブスレッドベースでの動作となっているため、マルチコアの恩恵を受けることができます。
それではさっそく、配列の各要素を並列処理できるルーチンを貼付けてみます。
これは一番シンプルな実装で、配列に含まれる要素分だけスレッドを新規に立ち上げて、ガンガン処理させるというものです。
def multi_process(ary) threads = [] ary.each_with_index do |e, i| threads << Thread.start(e, i){|tle, tli| yield(tle, tli) } end threads.each{|t| t.join } end # --- sample a = [1, 2, 3] multi_process(a) do |item, index| sleep 0.1*rand(10) puts "[#{index}]" + (item * 2).to_s end
しかし、この実装では配列の個数の数だけスレッドが走ってしまうので、1つずつの要素に対する処理が重くて個数が多い場合、大量の要素を処理する場合Rubyが固まったようになってしまうでしょう。それにスレッドの切り替えや、実行チェックなどに多くのCPUタイムを取られてしまって実際の処理が思ったより進まないと思われます。この場合マルチスレッドプログラムにおけるProducer-Consumerパターンを適用して、Consumer(Worker)スレッドの数を限定することで同時にN個のスレッドしか動作させないことを保証することで並列性と処理効率のバランスを図ることができそうです。
Producer-Consumerパターンについて軽く解説すると、実際に処理を行うConsumer(Worker)スレッドをN個走らせて、これらに与えるタスクはブロッキングキューを使ってProducerスレッドから提供されます。ブロッキングキューは同期処理されたキューで、同時にアクセスされてもデータ構造が壊れることなく、正確に1つエンキュー(キューに追加)したり正確に1つデキュー(キューから取り出し)することができます。バグのないブロッキングキューを実装するのは思い出したり調べたりしないといけなくて意外と面倒だったりするのですが、rubyでは標準ライブラリthreadをrequireすることでQueueやSizedQueueといったクラスで利用することができます。SizedQueueではキューの最大の長さを指定することができ、キューが指定された大きさの状態でエンキューしようとすると他のスレッドからデキューされるまでエンキューメソッドはブロックします。
require "thread" def multi_process2(ary, concurrency = 10, qsize = nil) q = (qsize) ? SizedQueue.new(qsize) : Queue.new producer = Thread.start(q, concurrency){|p_q, p_c| ary.each_with_index do |item, index| q.enq [ item, index, true] end p_c.times{ q.enq [nil, nil, false] } } workers = [] concurrency.times do workers << Thread.start(q){ |w_q| task, index, flag = w_q.deq while flag yield task, index task, index, flag = w_q.deq end } end producer.join workers.each{|w| w.join } end # --- sample a = [1, 2, 3] multi_process2(a) do |item, index| sleep 0.1*rand(10) puts "[#{index}] #{item*2}" end
こんなかんじになりました。
やはり多少コードは増えてしまいましたね。
multi_process2メソッドの第2引数では並列度としてConsumerスレッドの数を指定できるようにし、第3引数ではキューの長さを指定できるようにしました。キューの長さは明示的に指定しない場合無限(Queue.new)にするようになっています。
こんな感じで、案外簡単にマルチスレッド動作するプログラムが書けてしまいます。
ここで最後に、マルチスレッド初心者の方向けではありますが、基本的な落とし穴をご紹介しておきます。
以下の1〜10000の数をマルチスレッドで全て合計するプログラムを作ってみました。
数学的には1..nの合計はn(n+1)/2で求められることがわかっていますので、50005000が答えになるはずです。
しかし何回かこのプログラムを走らせると答えがおかしくなることがあります。
a = (1..10000).to_a total = 0 multi_process2(a) do |item, index| total += item end puts total
これはtotalを読み書きしているdo〜endの部分で、total += itemという1文の命令が、実際には
1. totalを読み出す
2. (1)で読み出したtotalの値にitemの値を足す
3. (2)で計算した値をtotalに代入する
という分割されたフェーズで実行されることが原因です。
これが各スレッドで、まとまった固まりとして実行されず、互い違いに実行されるとおかしくなります。
これを防ぐには、一定の文の固まりを互い違いに実行させない仕組みが必要です。
RubyではMutexクラスのオブジェクトを使って、排他制御を行うことでこれを実現できます。
a = (1..10000).to_a total = 0 lock = Mutex.new multi_process2(a) do |item, index| lock.synchronize{ total += item } end puts total
これで答えがおかしくなることはなくなりました。
マルチスレッドから変数を操作する際には排他制御に気を配りましょう!
しかし、このスレッドのように並行実行しようとしている全てのコードが排他制御されているようでは本質的にマルチスレッドで実行する意味がまったくなくなってしまうので注意ですね。