目次
- 目次
- はじめに
- subprocessによる複数子プロセス処理
- concurrent.futuresによるマルチプロセス処理
- Queueによる並行プログラミング
- 最後に
- 参考資料
- MyEnigma Supporters
はじめに
元々Pythonは、
そこまで処理速度が早い言語ではないので、
システムの処理速度を最適化したい場合は、
C++などで書き直した方が良いのかもしれませんが、
Pythonシステムをあと少し早くしたい時には、
並行処理プログラミングをしてみると、
効果があるかもしれません。
最近のコンピューターはマルチコアで、
かつハイパースレッディングなどに対応しているため、
複数の処理を同時並行で処理させることで、
処理時間を早くすることはできます。
他の言語では、スレッドなどを使って
比較的カンタンに並列処理を実現することができますが、
実はPythonはグローバルインタープリタロック(GIL)という仕組みがあるので、
普通にマルチスレッドのプログラミングをしても、
一つのスレッドづつしか実行されず、
処理が早くならないことが多いようです。
そこで、そんなPythonにおいても
上手く並行処理を実施する方法がいくつかあるので、
これらの手法について
簡単にまとめようと思います。
詳しいPythonにおける
並行処理のプログラミングについては、
下記の書籍が詳しいため参照下さい。
subprocessによる複数子プロセス処理
並行処理を実施したい処理同士が、
データのやり取りなどが不要な場合、
デフォルトモジュールのsubprocessを使って
複数の子プロセスを起動する方法が
シンプルで利用しやすいと思います。
17.1. subprocess — サブプロセス管理 — Python 2.7.x ドキュメント
Python の subprocess で出力を受け取るときは communicate() を使おう - Qiita
下記のサンプルコードのように、
subprocessのPopen関数を使えば、
任意のコマンドラインツールを起動することができます。
コマンド自体はPopenのコンストラクタで実行され、
その下のcommunicateメソッドは、
それぞれのコマンドの実行を待つようになっています。
1秒のスリープを10個実行しているので、
通常の実行だと10秒かかるはずですが、
実行してみると分かる通り、1秒で実行が終了します。
つまり、並列的にコマンドが実行されているのがわかります。
#!/usr/bin/python # -*- coding: utf-8 -*- from time import time import subprocess start=time() procs=[] for i in range(10): proc=subprocess.Popen(['sleep',str(1)]) procs.append(proc) for proc in procs: proc.communicate() end=time() print("%f sec" %(end-start))
もちろん別のPythonスクリプトを起動することも可能です。
#!/usr/bin/python # -*- coding: utf-8 -*- from time import time import subprocess start=time() procs=[] for i in range(10): proc=subprocess.Popen(['python','hoge.py'],shell=True) procs.append(proc) for proc in procs: proc.communicate() end=time() print("%f sec" %(end-start))
上記のように、
コマンドラインツールを
複数並列実行させたい場合は、この方法が良いようです。
また標準出力を取得することもできます。
concurrent.futuresによるマルチプロセス処理
続いて、
上記のようなコマンドラインツールではなく、
処理の重いPython関数などを、
並列処理したい場合には、
concurrent.futuresというモジュールを使うと便利です。
ちなみに、似たモジュールとして、
multiprocessingモジュールもありますが、
concurrent.futuresはmultiprocessingモジュールの上位モジュールで、
より複雑な処理ができるようになっているようです。
concurrent.futuresは
Python 3.2からデフォルトのモジュールになっているので、
Python3.x系を使っている人はインストール不要です。
使用方法は簡単で、
下記のように並列処理したい関数を準備し、
あとは、concurrent.futures.ProcessPoolExecutorのオブジェクトの
mapを使って、関数オブジェクトと引数を渡すだけです。
ProcessPoolExecutorの引数の
max_workersは同時並行に処理するプロセスの数で、
使用するCPUのコア数や性能に応じて設定するのが良いようです。
#!/usr/bin/python # -*- coding: utf-8 -*- import concurrent.futures import time def Calc(number): #何か重い処理 time.sleep(1) numbers=[14,60,900] start = time.time() #multi process pool=concurrent.futures.ProcessPoolExecutor(max_workers=2) result=list(pool.map(Calc,numbers)) #single process # result=list(map(Calc,numbers)) end = time.time() print('%.3f' %(end-start))
上記のsingle process版のコードをコメントアウトして
比較するとわかりますが、
処理速度が早くなっていることがわかります。
このモジュールはマルチプロセスで処理したあと
それぞれのプロセスのデータをプロセス間通信で
データのやり取りをしているので、
処理のオーバヘッドが多く、
あまりワーカーを増やしすぎても、
処理性能はあまり上がらないことも多いようです。
ちなみに、
プログレスバーを表示するtqdmというモジュールと一緒に使うと、
簡単に、並列実行しながらプログレスバーを表示できます。
(複数入力にも対応しています)
import concurrent.futures from tqdm import tqdm def concurrent_progress(func, args, max_workers=None, use_thread=False): if use_thread: pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) else: pool = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) try: return list(tqdm(pool.map(func, *zip(*args)), total=len(args))) except TypeError: return list(tqdm(pool.map(func, args), total=len(args))) def calc(number): time.sleep(1.0) def calc2(number, x): time.sleep(1.0) def main(): import time numbers = range(30) start = time.time() concurrent_progress(calc, numbers, use_thread=True) end = time.time() print('%.3f' % (end - start)) start = time.time() args2 = [(n, 1) for n in numbers] concurrent_progress(calc2, args2) end = time.time() print('%.3f' % (end - start)) if __name__ == '__main__': main()
Queueによる並行プログラミング
ある一つの計算時間が大きい処理がある時、
その処理が幾つかの処理に分離でき、
それらの処理が、ひとつ前のデータを利用して、
次の処理にデータを渡すような処理の場合、
(処理1 -> 処理2 => 処理3)
マルチプロセスを使って、
効率的に処理することが可能になります。
multiprocessモジュールのQueueを使うことにより、
それぞれ別のプロセスで処理されている
処理とその次の処理にプロセス間でデータを送ることができます。
下記はQueueを使ったマルチプロセス処理のサンプルコードです。
2つのQueueを使うことにより、
それぞれ別のプロセスで動いている
Calc1,Calc2,Calc3の処理の間のプロセス間通信を実現しています。
Queueはputでデータを追加し、
getでデータを受信しますが、
getする際にデータが無い場合は自動的にプロセスを停止してくれます。
#!/usr/bin/python # -*- coding: utf-8 -*- import time import multiprocessing q12 = multiprocessing.Queue() q23 = multiprocessing.Queue() def Calc1(number): time.sleep(1) q12.put(number) def Calc2(number): data=q12.get() time.sleep(2) q23.put(data) def Calc3(number): data=q23.get() time.sleep(1) print("result"+str(data)) if __name__ == '__main__': start = time.time() for i in range(10): p1 = multiprocessing.Process(target=Calc1, args=(i,)) p1.start() p2 = multiprocessing.Process(target=Calc2, args=(1,)) p2.start() p3 = multiprocessing.Process(target=Calc3, args=(1,)) p3.start() p1.join() p2.join() p3.join() end = time.time() print('%.3f' %(end-start))
上記のプログラムにより、
10個の重い処理を実施しても
1個の処理とほぼ同じほどの処理時間になります。
しかし、これだけですと
前述の並列処理と同じように思えますが、
処理1の入力が逐次取得できるような場合、
入力が得られる度に処理して、次の処理にデータを送ることができるので、
より効率的に処理を実施することができます。
最後に
より複雑な並行処理を実施したい場合は、
C++やJavaを使った方が良さそうです。
参考資料
MyEnigma Supporters
もしこの記事が参考になり、
ブログをサポートしたいと思われた方は、
こちらからよろしくお願いします。