読者です 読者をやめる 読者になる 読者になる

ここにタイトルが入ります

デザイン&プログラミングのことも書くし、それ以外のことも書く。

【Ruby on Rails】非同期処理についてその②。 〜マルチスレッドとかマルチプロセスを実装してみた〜

ruby Rails

業務で非同期処理の実装が必要となったので、
前回ぼんやりマルチスレッド、マルチプロセスについて調べてみたのですが…

両方軽く実装してみたのでメモ。

実際に実装してみたところ、マルチプロセスでもマルチスレッドでも
処理時間自体は短縮されたのですが、
マルチプロセスだとメモリは食うしメモリ空間?を共有してくれないので
スレッドのほうで実装してみました。
(マルチスレッドで十分なときはそれでいいってことかな?)

〜検証環境〜
Rails:3.2.11
ruby:1.9.3p392


Railsで普通に実装

マルチスレッドで実装

〜リファレンス〜
class Thread

〜とても参考になりました!〜
ActiveRecordを複数スレッド環境で利用する - tech.recompile.net


処理内容はDBからUserのid取ってきているだけです。

threads = []
10.times.with_index(1) do |index, number|
  threads << Thread.new do
    ActiveRecord::Base.connection_pool.with_connection do
      puts "#{number}番目 #{User.find(number).id}"
    end
  end
end

# すべての処理が完了したらメインスレッドの処理が再開される
threads.each { |t| t.join }
puts "finish!!"

ポイントは
ActiveRecord::Base.connection_pool.with_connection

これを実行しないと、スレッド毎にDBコネクションが用意されるため無駄。
(というか設定された最大接続数を超えてエラーになる)


マルチプロセスで実行

同じ処理をマルチプロセスで。

〜リファレンス〜
module Process


最初こう書いたのですが…

pids = []
10.times.with_index(1) do |index, number|
  config = ActiveRecord::Base.remove_connection
  pids << fork do
    ActiveRecord::Base.connection_pool.with_connection do
      puts "#{number}番目 #{User.find(number).id}"
    end
  end
end

エラー発生(TдT)
Mysql2::Error: Lost connection to MySQL server during

…プロセスが分かれてる、つまりメモリも別なので
コネクションプールを共有できない、ということでしょうか。

というわけでプロセス別にコネクションを用意してあげる事に。

〜参考にさせて頂きました!〜
Forking Processes In Ruby On Rails

pids = []
# 一度コネクション切ります
config = ActiveRecord::Base.remove_connection

10.times.with_index(1) do |index, number|
  pids << fork do
    begin
      ActiveRecord::Base.establish_connection(config)
      puts "#{number}番目 #{User.find(number).id}"
    rescue => ex
      puts ex
    ensure
      ActiveRecord::Base.remove_connection
    end
  end
end

# すべてのプロセスが終わるまでメインスレッドをストップ
Process.waitall

# もう一回繋ぎ直します
ActiveRecord::Base.establish_connection(config)

puts "finish!!"

Rails起動時に用意されるメインプロセス?のコネクションを切って(切らなくてもいいけど)
それぞれのプロセスで接続、処理が終われば切断。
マルチプロセス部分が終了したらメインプロセスで再接続。

これでいいのか??


スレッドorプロセス数を固定する

例えば大量のデータを分割して複数スレッドやプロセスで処理する場合、
同時に起動するプロセスやスレッドの数の上限を決めておかないと逆に効率が悪くなってしまうようで。。

1000件のデータを1スレッドで処理するとして、上記のようなプログラムで10000件のデータを処理するとなると
10スレッドがほぼ同時に動くことになります。

…おそらくコア数が2のマシンでたくさんのスレッドを動かしても効率そんなに上がらない気がしますし、
予想以上のデータ数があった場合はメモリが不足してむしろ遅くなるかも…

というわけで、同時に動かすスレッド(プロセス)数を固定させるよう実装してみました。


スレッド数の固定

〜大変参考になりました!!〜
[Ruby] スレッド数固定で処理を行う | てるブログ

Queueを使えば良いようですな。

# スレッド数の上限を設定
thread_max = 3

# キューに処理対象(引数とかprocオブジェクトとか?)をセットする
jobqueue = Queue.new
(1..10).each do |n|
  jobqueue.push(n)
end

threads = []
thread_max.times do
  threads << Thread.start do
    while !jobqueue.empty?
      var = jobqueue.pop
      ActiveRecord::Base.connection_pool.with_connection do
        puts "#{var}番目 #{User.find(var).id}"
        sleep 10 - var
      end
    end
  end
end

# スレッド完了待ち
threads.each {|t| t.join}

puts "finish!!" 

プロセス数の固定

マルチプロセスでも同じように書くとこんなかんじになるのですが、

## 失敗例…

# スレッド数の上限を設定
process_max = 3

# キューに処理対象(引数とかprocオブジェクトとか?)をセットする
jobqueue = Queue.new
(1..10).each do |n|
  jobqueue.push(n)
end

pids = []
# 一度コネクション切ります
config = ActiveRecord::Base.remove_connection
process_max.times do
  pids << fork do
    while !jobqueue.empty?
      var = jobqueue.pop
      begin
        ActiveRecord::Base.establish_connection(config)
        puts "#{var}番 #{User.find(var).id}"
      rescue => ex
        puts ex
      ensure
        ActiveRecord::Base.remove_connection
      end
    end
  end
end

# すべてのプロセスが終わるまでメインスレッドをストップ
Process.waitall

# もう一回繋ぎ直します
ActiveRecord::Base.establish_connection(config)

puts "finish!!"

各プロセスが変数jobqueueを共有しないので
同じ処理がプロセス分だけ走ってしまいます(´・ω・`)
どうしたものか。


…で無理やり実装してみたのだが苦し紛れすぎてひどい…
良い子は真似しないでね…(゜_゜)

## これはひどい…なにかやり方があるはず…
## Queue使う必要ないしね…

# スレッド数の上限を設定
process_max = 3

# キューに処理対象(引数とかprocオブジェクトとか?)をセットする
jobqueue = Queue.new
(1..10).each do |n|
  jobqueue.push(n)
end

# 一度コネクション切ります
@config = ActiveRecord::Base.remove_connection

# 各processで行う処理
def exec_process(var)
  fork do
    begin
      ActiveRecord::Base.establish_connection(@config)
      puts "#{var}番 #{User.find(var).id}"
    rescue => ex
      puts ex
    ensure
      ActiveRecord::Base.remove_connection
    end
  end
end

# 最初に設定した数のプロセスを起動
process_max.times do
  exec_process(jobqueue.pop)
end

# プロセスが終わるたび次のプロセスを起動
begin
  while Process.wait
    next if jobqueue.size == 0
    exec_process(jobqueue.pop)
  end
rescue Errno::ECHILD
  # Processの個数を知る方法がわからなかった…
end

# もう一回繋ぎ直します
ActiveRecord::Base.establish_connection(@config)

puts "finish!!"


parallelを使う

てかね、こういうのは誰かが便利なgem作ってくれてるんですよね。
お手軽に非同期処理を実装してくれるparallel

〜参考にさせて頂きました!〜
Rubyで並列処理が簡単にできるgem parallel - 酒と泪とRubyとRailsと

マルチスレッドで実装

今までと同じ処理を実行…

require 'parallel'

Parallel.each((1..10).to_a, in_threads: 2) do |var|
  ActiveRecord::Base.connection_pool.with_connection do
    puts "#{var}番 #{User.find(var).id}"
  end
end

puts "finish!!"

簡単\(^o^)/

各スレッドへ渡す引数の配列をPrallel.eachの第一引数に、
同時に実行させたいスレッド数を第二引数に。
それだけ。


マルチプロセスで実行

require 'parallel'

config = ActiveRecord::Base.remove_connection

Parallel.each((1..10).to_a, in_processes: 2) do |var|
  ActiveRecord::Base.establish_connection(config)
  begin
    puts "#{var}番 #{User.find(var).id}"
  rescue => ex
    puts ex
  ensure
    ActiveRecord::Base.remove_connection
  end
end

ActiveRecord::Base.establish_connection(config)

puts "finish!!"

コネクションの処理のどうこうは置いといて
こんな感じ。

in_threadsをin_processesに変えただけ。
簡単\(^o^)/


というわけでparallel使いましたとさ!!


おまけ:複数のスレッドから一つの変数を触ったりする場合

Mutexを使いましょう。
さっきのparallelの例に加えると

# なんか各スレッドが一つの変数に数字足していく
total = 0

locker = Mutex::new

Parallel.each((1..10).to_a, in_threads: 2) do |var|
  ActiveRecord::Base.connection_pool.with_connection do
    puts "#{var}番 #{User.find(var).id}"
    # このブロック内は必ず同時に一つのスレッドしか処理しない
    locker.synchronize do
      total += var
    end
  end
end

puts total
puts "finish!!"

一つの変数を複数スレッドで更新する場合、
あるスレッドが更新している最中に他のスレッドが同じ変数に対して処理を行うと
意図した処理が行われない場合があります。
synchronizeさせることで指定した処理(メモリへのアクセス?)を
複数スレッドから同時に行わせないようにします。