Rubyで並列処理を行うparallel gemの使い方と勘所
タグ: rubyparallel / 初版公開: 2018-07-22

parallelを使うとKenrel#forkThreadを駆使するのと比べて簡単に並列処理を書くことができます。parallelは拙作のBestGems.orgによると、合計ダウンロード数で151位、デイリーダウンロード数は100位前後で、現時点で非常にメジャーなGemとなっています。

この記事ではparallelの基本的な使い方と、実際に使ってみて感じた注意点をTipsとして整理したいと思います。

parallelはREADME.mdが親切に書かれています。 加えて主要な部分は500行程度の小さなGemです。 利用する場合は公式のドキュメントとソースコードを確認されることをおすすめします。

前提ソフトウェア

ソフトウェアバージョン備考
ruby2.5.1-
parallel1.12.1-
rails5.0-

使い方

インストール

gem install parallelするかGemfileに以下の1行を追加してbundle installして下さい。

gem 'parallel'

明示的にロードする場合はparallelを利用するRubyのプログラムでrequire 'parallel'して下さい。 以降のサンプルコードではこの記述は省略しています。

できること

parallelにはRubyのeachやmapに相当する操作を並列処理するための以下のメソッドがあります。

Rubyのメソッド対応するparallelのメソッド
Enumerable#eachParallel.each
Enumerable#mapParallel.map
Enumerable#any?Parallel.any?
Enumerable#all?Parallel.all?
Enumerable#each_with_indexParallel.each_with_index
Enumerable#map, Enumerator#with_indexParallel.map_with_index

each

Parallel.eachはブロックが並列に実行されるeachです。 並列に処理しているためブロックの実行が完了する順序はバラバラです。 戻り値はParallel.eachの引数が返ります。

result = Parallel.each(1..10) do |item|
  # 普通のeachのようだがブロックは並列に実行される
  p item ** 2
end

p result

以下はこのコードの出力の例です。

1
9
16
4
25
64
36
100
49
81
1..10

map

Parallel.mapはブロックが並列に実行されるmapです。 並列に処理しているためブロックの実行が完了する順序はバラバラです。 戻り値はmapと同様に入力した各要素に対応した値の配列が返ります。

result = Parallel.map(1..10) do |item|
  # 普通のmapのようだがブロックは並列に実行される
  p item ** 2
end

p result

以下はこのコードの出力の例です。

1
4
9
16
25
36
49
64
100
81
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

ワーカー番号の取得

上記のメソッド以外の機能もあります。 ブロック内でParallel.worker_numberを呼ぶとワーカースレッド/プロセスの番号を取得できます。 デバッグやロギングで利用できます。

require 'parallel'

result = Parallel.map(1..10) do |item|
  p [item ** 2, Parallel.worker_number]
end

p result

以下はこのプログラムの出力の例です。

[1, 0]
[4, 1]
[9, 0]
[16, 3]
[25, 4]
[81, 1]
[36, 5]
[49, 6]
[100, 1]
[64, 7]
[[1, 0], [4, 1], [9, 0], [16, 3], [25, 4], [36, 5], [49, 6], [64, 7], [81, 1], [100, 1]]

スレッドとプロセス

parallelのメソッドはオプションでプロセスで処理(in_processes)するかスレッドで処理(in_threads)するかを切り替えることができます。

プロセスで処理する場合

CRubyで何もオプションを指定しなければプロセスによる並列処理になります。 並列数はparallel内部のParallel::ProcessorCount.countが返す論理コア数になります。

in_processes: 並列数を指定すると指定したプロセス数で並列化して実行できます。

Paralell.each(1..10, in_processes: 10); end

parallelの内部でforkして作られたワーカープロセスはメソッド呼び出しが完了するまで使い回されます。 1回のブロックの実行毎にforkさせたい場合はisolation: trueを指定することもできます。

Paralell.each(1..10, in_processes: 10, isolation: true); end

スレッドで処理する場合

JRubyで何もオプションを指定しなければスレッドによる並列処理になります。 並列数はparallel内部のParallel::ProcessorCount.countが返す論理コア数になります。

in_threads: 並列数を指定すると指定したスレッド数で並列化して実行できます。

Paralell.each(1..10, in_threads: 10) do; end

Tips

parallelを使ってわかった勘所をまとめます。

第1引数について

eachmapの第1引数はどんなオブジェクトを渡すかによってparallelの挙動は異なります。

基本的に第1引数はto_aされてparallelの内部で配列になります。 この挙動を知らないとparallelの呼び出し元で意図せずしてメモリ使用量が増大することがあります。 例外的にProcThread::Queueを第1引数とした場合に、配列にはされずに処理します。

第一引数parallelによる判定条件挙動
Procオブジェクト.callできることブロックの実行ごとに.callします。.callParallel::Stopを返すと処理を完了します。
Thread::Queueオブジェクトnum_waitingおよびpopできることブロックの実行ごとに.popします。
その他上記以外まずto_aして配列にします。ブロックの実行ごとに先頭から要素を処理します。

詳細はparallelのJobFactoryクラスのソースを見て下さい。

https://github.com/grosser/parallel/blob/v1.12.1/lib/parallel.rb#L89-L145

Rangeや他のEnumerableを渡しても動作しますが、呼び出し元でArray, Proc, Thread::Queueのいずれかを渡した方が、誤解を招かないコードになると思います。

ワーカープロセスとの通信について

parallelはワーカープロセスとの通信をIO.pipeで生成したパイプの入出力で行います。 ワーカープロセスとのオブジェクトの受け渡しはMarshal.dump, Marshal.loadを使います。 このためMarshalでシリアライズできないオブジェクトをワーカープロセスと受け渡すことはできません。

オプションについての注意点

実装上parallelのメソッドのオプションは、パラメータ引数にはなっておらず、キー名のチェックもされません。 このためtypoしたオプションは無視されます。

例えば以下のコードは10プロセスで並列化することを意図しています。 しかしin_processesを誤ってin_processとtypoしているため、デフォルトどおり論理コア数のプロセスで並列化されてしまいます。

Parallel.each(1..10, in_process: 10) do; end

parallelの呼び出し時にはオプションをtypoしないよう細心の注意を払いましょう。

ブロック内での例外の発生やreturnについて

ブロック内で例外(Parallel::Break, Parallel::Killを除く)を発生させたりreturnしたりするとparallelの呼び出しは例外を発生させます。 この時にparallelの呼び出し元でrescueできる例外は、並列処理がスレッドとプロセスどちらか、発生した例外がStandarErrorのサブクラスかそれ以外か、により様々です。

以下はparallelのブロック内で何かまずいことが起こった時にparallelの呼び出し元でどのような例外が発生するかの例です。 parallelのソースを読めばなぜこうなるのかわかりますが、仕組みを理解していないと挙動を推し量ることは難しいかも知れません。

begin
  Parallel.each([1, 2, 3], in_threads: 2){ raise StandardError }
rescue Exception => e
  p e.class #=> StandardError
end

begin
  Parallel.each([1, 2, 3], in_processes: 2){ raise StandardError }
rescue Exception => e
  p e.class #=> StandardError
end

begin
  Parallel.each([1, 2, 3], in_threads: 2){ raise Exception }
rescue Exception => e
  p e.class #=> Exception
end

begin
  Parallel.each([1, 2, 3], in_processes: 2){ raise Exception }
rescue Exception => e
  p e.class #=> Parallel::DeadWorker
end

begin
  Parallel.each([1, 2, 3], in_threads: 2){ return }
rescue => e
  p e.class #=> LocalJupError
end

begin
  Parallel.each([1, 2, 3], in_processes: 2){ return }
rescue => e
  p e.class #=> Parallel::DeadWorker
end

例外以外の出力について

アプリケーションが出力するログではparallelで落ちる原因がわからない場合があるかも知れません。 その場合はRubyプロセスが何か出力していないかも確認して下さい。 例外やバックトレースからはわからない情報が出力されていることがあります。

Parallel::DeadWorkerについて

一番厄介なのはプロセスによる並列処理で発生するParallel::DeadWorkerです。

もし例外が発生するコードや意図せずブロックを抜ける箇所も存在しないのにParallel::DeadWorkerが発生する場合は、ワーカープロセスのメモリ使用量が増加したことでメモリ不足に陥ったことも疑って下さい。 NoMemoryError(Exceptionのサブクラス)がブロック内で発生してParallel::DeadWorkerとなっている可能性があります。

Railsでparallelを利用する

Railsでparallelを利用する場合のTipsです。

ActiveRecordのコネクションについて

parallelに限らずアプリケーションのコードでプロセスやスレッドを作ってActiveRecordを使う際にはコネクションをケアしなければならない時があります。 ActiveRecordのコネクションプールには明るくないため詳しくは説明しません。 対処法はparallelのREADME.mdに記載がありますので参考にして下さい。

# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do |user|
  user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do |user|
  ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)
  end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do |user|
  @reconnected ||= User.connection.reconnect! || true
  user.update_attribute(:some_attribute, some_value)
end

どうやってparallelを使うかによっても様々だと思いますが、基本的にはブロック内およびparallelの呼び出し直後でreconnect!でコネクションを取得し直すようにすれば、ActiveRecord絡みのエラーは起こらなくなるはずです。

大きなテーブルの中身を並列処理したい場合

parallelは使っていますが普通のRailsアプリケーションで大きなテーブルを扱う時の書き方と変わりません。 find_in_batchesin_batchesを使ってちょっとずつテーブルから読んで処理すると良いです。

SomeModel.find_in_batches do |some_models|
  Parallel.each(some_models) do |some_model|
    # 処理
  end
end

parallelのREADME.mdにあるようにparallelのメソッドにSomeModel.allを渡す際は注意して下さい。 テーブルの中身をすべて読み込んでRubyのオブジェクトとしてメモリに乗ってしまいます。

SomeModel.allの戻り値はSomeModel::ActiveRecord_Relationです。 このオブジェクトは.call.num_waiting, .popもできません。 第1引数の注意点として説明したとおり、このような引数を渡すとparallelの内部でto_aされるため、テーブルの全ての内容を一気に読み込むことになります。