MyEnigma

とある自律移動システムエンジニアのブログです。#Robotics #Programing #C++ #Python #MATLAB #Vim #Mathematics #Book #Movie #Traveling #Mac #iPhone

Pythonにおける並列処理プログラミング入門

目次

はじめに

元々Pythonは、

そこまで処理速度が早い言語ではないので、

システムの処理速度を最適化したい場合は、

C++などで書き直した方が良いのかもしれませんが、

Pythonシステムをあと少し早くしたい時には、

並行処理プログラミングをしてみると、

効果があるかもしれません。

 

最近のコンピューターはマルチコアで、

かつハイパースレッディングなどに対応しているため、

複数の処理を同時並行で処理させることで、

処理時間を早くすることはできます。

 

他の言語では、スレッドなどを使って

比較的カンタンに並列処理を実現することができますが、

実はPythonはグローバルインタープリタロック(GIL)という仕組みがあるので、

普通にマルチスレッドのプログラミングをしても、

一つのスレッドづつしか実行されず、

処理が早くならないことが多いようです。

 

そこで、そんなPythonにおいても

上手く並行処理を実施する方法がいくつかあるので、

これらの手法について

簡単にまとめようと思います。

 

詳しいPythonにおける

並行処理のプログラミングについては、

下記の書籍が詳しいため参照下さい。

 

subprocessによる複数子プロセス処理

並行処理を実施したい処理同士が、

データのやり取りなどが不要な場合、

デフォルトモジュールのsubprocessを使って

複数の子プロセスを起動する方法が

シンプルで利用しやすいと思います。

17.1. subprocess — サブプロセス管理 — Python 2.7.x ドキュメント

Python の subprocess で出力を受け取るときは communicate() を使おう - Qiita

myenigma.hatenablog.com

 

下記のサンプルコードのように、

subprocessのPopen関数を使えば、

任意のコマンドラインツールを起動することができます。

コマンド自体はPopenのコンストラクタで実行され、

その下のcommunicateメソッドは、

それぞれのコマンドの実行を待つようになっています。

 

1秒のスリープを10個実行しているので、

通常の実行だと10秒かかるはずですが、

実行してみると分かる通り、1秒で実行が終了します。

つまり、並列的にコマンドが実行されているのがわかります。

#!/usr/bin/python
# -*- coding: utf-8 -*-
from time import time
import subprocess

start=time()
procs=[]
for i in range(10):
    proc=subprocess.Popen(['sleep',str(1)])
    procs.append(proc)

for proc in procs:
    proc.communicate()

end=time()
print("%f sec" %(end-start))

  もちろん別のPythonスクリプトを起動することも可能です。

#!/usr/bin/python
# -*- coding: utf-8 -*-
from time import time
import subprocess

start=time()
procs=[]
for i in range(10):
    proc=subprocess.Popen(['python','hoge.py'],shell=True)
    procs.append(proc)

for proc in procs:
    proc.communicate()

end=time()
print("%f sec" %(end-start))

 

上記のように、

コマンドラインツールを

複数並列実行させたい場合は、この方法が良いようです。

また標準出力を取得することもできます。

   

concurrent.futuresによるマルチプロセス処理

続いて、

上記のようなコマンドラインツールではなく、

処理の重いPython関数などを、

並列処理したい場合には、

concurrent.futuresというモジュールを使うと便利です。

ちなみに、似たモジュールとして、

multiprocessingモジュールもありますが、

concurrent.futuresはmultiprocessingモジュールの上位モジュールで、

より複雑な処理ができるようになっているようです。

 

concurrent.futuresは

Python 3.2からデフォルトのモジュールになっているので、

Python3.x系を使っている人はインストール不要です。

 

使用方法は簡単で、

下記のように並列処理したい関数を準備し、

あとは、concurrent.futures.ProcessPoolExecutorのオブジェクトの

mapを使って、関数オブジェクトと引数を渡すだけです。

ProcessPoolExecutorの引数の

max_workersは同時並行に処理するプロセスの数で、

使用するCPUのコア数や性能に応じて設定するのが良いようです。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import concurrent.futures
import time

def Calc(number):
    #何か重い処理
    time.sleep(1)

numbers=[14,60,900]
start = time.time()

#multi process
pool=concurrent.futures.ProcessPoolExecutor(max_workers=2)
result=list(pool.map(Calc,numbers))

#single process
#  result=list(map(Calc,numbers))

end = time.time()
print('%.3f' %(end-start))

上記のsingle process版のコードをコメントアウトして

比較するとわかりますが、

処理速度が早くなっていることがわかります。

 

このモジュールはマルチプロセスで処理したあと

それぞれのプロセスのデータをプロセス間通信で

データのやり取りをしているので、

処理のオーバヘッドが多く、

あまりワーカーを増やしすぎても、

処理性能はあまり上がらないことも多いようです。

 

ちなみに、

プログレスバーを表示するtqdmというモジュールと一緒に使うと、

github.com

簡単に、並列実行しながらプログレスバーを表示できます。

(複数入力にも対応しています)

import concurrent.futures
from tqdm import tqdm


def concurrent_progress(func, args, max_workers=None, use_thread=False):
    if use_thread:
        pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
    else:
        pool = concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
    try:
        return list(tqdm(pool.map(func, *zip(*args)), total=len(args)))
    except TypeError:
        return list(tqdm(pool.map(func, args), total=len(args)))


def calc(number):
    time.sleep(1.0)


def calc2(number, x):
    time.sleep(1.0)


def main():
    import time
    numbers = range(30)
    start = time.time()
    concurrent_progress(calc, numbers, use_thread=True)
    end = time.time()
    print('%.3f' % (end - start))

    start = time.time()
    args2 = [(n, 1) for n in numbers]
    concurrent_progress(calc2, args2)
    end = time.time()
    print('%.3f' % (end - start))


if __name__ == '__main__':
    main()

 

Queueによる並行プログラミング

ある一つの計算時間が大きい処理がある時、

その処理が幾つかの処理に分離でき、

それらの処理が、ひとつ前のデータを利用して、

次の処理にデータを渡すような処理の場合、

(処理1 -> 処理2 => 処理3)

マルチプロセスを使って、

効率的に処理することが可能になります。

 

multiprocessモジュールのQueueを使うことにより、

それぞれ別のプロセスで処理されている

処理とその次の処理にプロセス間でデータを送ることができます。

 

下記はQueueを使ったマルチプロセス処理のサンプルコードです。

2つのQueueを使うことにより、

それぞれ別のプロセスで動いている

Calc1,Calc2,Calc3の処理の間のプロセス間通信を実現しています。

Queueはputでデータを追加し、

getでデータを受信しますが、

getする際にデータが無い場合は自動的にプロセスを停止してくれます。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import multiprocessing

q12 = multiprocessing.Queue()
q23 = multiprocessing.Queue()

def Calc1(number):
    time.sleep(1)
    q12.put(number)

def Calc2(number):
    data=q12.get()
    time.sleep(2)
    q23.put(data)

def Calc3(number):
    data=q23.get()
    time.sleep(1)
    print("result"+str(data))


if __name__ == '__main__':
    start = time.time()
    for i in range(10):
        p1 = multiprocessing.Process(target=Calc1, args=(i,))
        p1.start()
        p2 = multiprocessing.Process(target=Calc2, args=(1,))
        p2.start()
        p3 = multiprocessing.Process(target=Calc3, args=(1,))
        p3.start()

    p1.join()
    p2.join()
    p3.join()
    end = time.time()
    print('%.3f' %(end-start))

 

上記のプログラムにより、

10個の重い処理を実施しても

1個の処理とほぼ同じほどの処理時間になります。

 

しかし、これだけですと

前述の並列処理と同じように思えますが、

処理1の入力が逐次取得できるような場合、

入力が得られる度に処理して、次の処理にデータを送ることができるので、

より効率的に処理を実施することができます。

 

最後に

より複雑な並行処理を実施したい場合は、

C++やJavaを使った方が良さそうです。

 

参考資料

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

MyEnigma Supporters

もしこの記事が参考になり、

ブログをサポートしたいと思われた方は、

こちらからよろしくお願いします。

myenigma.hatenablog.com