MyEnigma

とある自律移動システムエンジニアのブログです。#Robotics #Programing #C++ #Python #MATLAB #Vim #Mathematics #Book #Movie #Traveling #Mac #iPhone

各プログラミング言語における並行処理入門


Go言語による並行処理

目次

はじめに

昨今、ネットワークを通して協調的に動くシステムを

作ることが多いため、

並行処理は重要な技術だと思います。

 

しかし、この並行処理は、様々な専門用語があり、

昔から分かりづらいなと思っていたので、

色々調べながらまとめてみました。

 

加えて、いくつかのプログラミング言語で、

実際にHTTPリクエストを並行処理で実施する

サンプルコードを紹介したいと思います。

 

並行処理概要(async/await)

似たような処理として、

並行(Concurrent)処理と並列(Parallel)処理があります。

これらの違いについては、下記の資料を参考にしていただきたいですが、

qiita.com

qiita.com

www.rhoboro.com

並行は、1つのCPUが実行する複数の処理を入れ替えて処理するもので、

並列は、複数のCPUで同時に複数の処理を処理するものです。

 

この説明だと、並行処理は結局処理を同時並行で実施できないため、

あまり用途がなさそうに見えますが、

ネットワーク越しにリクエストをして、相手の処理を待っている時間や、

大きなファイルを書いたり、読んだりする時など、

CPUを使う処理ではない場合は、

並列処理よりも並行処理のほうが適している場合があります。

 

なぜなら並列処理では、プロセスの立ち上げなどのコストや、

プロセス間通信のコストがかからないため、

並列処理のほうが無駄なリソースを使わないからです。

 

以上のような特徴から、

並列処理は、HTTPの複数のリクエストを送る時に、

それぞれの処理が帰ってくる間に、

他のリクエストを送る時などに利用されています。

 

並行並列処理関連用語まとめ

並行並列処理には種類によって、様々な名前がありますが、

下記のリンク先の説明を元に、まとめたのが下記の表になります。

(間違っていたら、コメント欄にて指摘をお願いします。)

stackoverflow.com

softwareengineering.stackexchange.com

stackoverflow.com

stackoverflow.com

zenn.dev

 

名前 説明
Process OSが管理する並列可能な処理のこと。それぞれのプロセスは独自のアドレス空間内を割り当てられている。複数のCPUが存在している場合、同時並行で処理される。
Thread OSが管理する(おそらく)並列可能な処理のこと。それぞれのスレッドは親のスレッドや他のスレッドと一緒に同じアドレス空間に割り当てられている。スレッドの実行制御はOSが実施する(プリエンプティブ)。処理系によっては複数のスレッドの同時実行が許されておらず(PythonのGILなど)、その場合は並行処理となる。
Green Thread ほぼスレッドと同じ特徴を持つ処理だが、OSが管理するのではなく、ユーザ空間(VMなど)で管理される。但し、複数のワーカースレッドやプロセスが同時に処理するわけではないので、並行処理になる(但しErlangのGreen Threadは並列処理になる)。ネイティブスレッドをサポートしていない環境でも動作することができる。
Fibers OSが管理する並行処理用の軽量スレッド。次に実行される処理(fiber)を、前のfiberが指定する形でスケジューリングされる。
Coroutines 複数のポイントで実行を停止したり、再開できるルーチンのこと。並列処理を実施するときに利用されることが多い。
Goroutines Go言語で実装されているGreen Threadのような機能。オーバヘッドの少ない並列処理を簡単に実現するように実装されたもの。

 

Pythonにおける並行処理

Pythonでは、async/await構文を使いコルーチンやTaskを作成して、

並行処理を実施することができます。

docs.python.org

www.rhoboro.com

 

基本的にはasync def でコルーチン関数を定義し、

シーケンシャルにコルーチン関数を実行する場合は、そのままその関数をawaitで呼び、

並行で処理したい場合は、Taskにしてawaitで呼びます。

awaitで指定された部分で、処理が一時停止されたり、再開されたりして、並行処理を実現できます。

 

asyncio.gatherで複数のコルーチン関数をタスクに変換して並列実行したり、

In [17]: async def main():
    ...:     await asyncio.gather(
    ...:     say_after(1, "1"),
    ...:     say_after(2, "2"),
    ...:     say_after(3, "3"))
    ...:

In [18]: asyncio.run(main())
1
2
3

asyncio.wait_forでタイムアウトを設定したり、

In [19]: async def say_after(delay, what):
    ...:     await asyncio.sleep(delay)
    ...:     print(what)
    ...:

In [22]: async def main():
    ...:     try:
    ...:         await asyncio.wait_for(say_after(1, "1"), timeout=2.0)
    ...:     except asyncio.TimeoutError:
    ...:         print("timeout")
    ...:
    ...:
    ...:

In [23]: asyncio.run(main())
1

In [24]: async def main():
    ...:     try:
    ...:         await asyncio.wait_for(say_after(3, "3"), timeout=2.0)
    ...:     except asyncio.TimeoutError:
    ...:         print("timeout")
    ...:

In [25]: asyncio.run(main())
timeout

asyncio.to_threadでスレッドとして実行したりできます。

またタスクはcancelで外部からキャンセルすることも可能です。

 

ちなみにPythonはGIL(Global Interpreter lock)があるため、

スレッド化しても並列処理はされずに、並行処理になってしまいます。

なので、Pythonで並行処理をしたい場合はThreadを使うこともできます。

docs.python.org

 

並行処理によるHTTPリクエストサンプル

下記のコードは複数のURLにHTTPでアクセスするときに、

シーケンシャルに処理するのと、

並行に実行したときの結果を比較したものです。

並行の方が、高速に実行できていることがわかります。

 

並行処理では、asyncを使ったものと、ThreadPoolのmapを使ったものの

2つの関数を実装しました。

"""

Concurrent http request sample

Output example:
start!!
4.054506063461304
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
1.670924186706543
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
1.6389338970184326
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
done!!

"""

import requests
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def request(url):
    loop = asyncio.get_event_loop()
    r = await loop.run_in_executor(None, requests.get, url)
    return r

async def gather_requests(urls):
    loop = asyncio.get_event_loop()
    tasks = []
    for url in urls:
        tasks.append(request(url))
    requests = await asyncio.gather(*tasks)
    return requests


def request_async(urls):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(gather_requests(urls))

def request_async2(urls):
    with ThreadPoolExecutor() as pool:
        return list(pool.map(requests.get, urls))

def request_sequentialy(urls):
    return [requests.get(url) for url in urls]

def main():
    print("start!!")
    urls = ["http://yahoo.com/",
            "http://www.google.co.jp/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            "http://www.google.co.jp/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            ]

    start = time.time()
    results = request_sequentialy(urls)
    print(time.time()-start)
    print(results)

    start = time.time()
    results = request_async(urls)
    print(time.time()-start)
    print(results)

    start = time.time()
    results = request_async2(urls)
    print(time.time()-start)
    print(results)

    print("done!!")


if __name__ == '__main__':
    main()

Juliaにおける並行処理

Juliaの並行処理に関しては、下記の資料を参照ください。

現状、JuliaのTaskは一つのOSのスレッド上で動くため、

並行処理になります。

docs.julialang.org

docs.julialang.org

  

並行処理によるHTTPリクエストサンプル

下記のコードは複数のURLにHTTPでアクセスするときに、

シーケンシャルに処理するのと、

並列に実行したときの結果を比較したものです。

並列の方が、高速に実行できていることがわかります。

"
    Async HTTP request with Julia

Output example:
Request sequentially
 11.335611 seconds (8.97 M allocations: 512.870 MiB, 3.82% gc time, 0.57% compil
ation time)
requests = Int16[200, 200, 200, 200, 200, 200]
Request asynchronouslly
  1.693590 seconds (266.30 k allocations: 18.420 MiB, 8.35% compilation time)
requests = Int16[200, 200, 200, 200, 200, 200]

"

using HTTP

function http_request(url)
    r = HTTP.request("GET", url)
    return r.status
end

function request_sequentially(urls)
    return [http_request(url) for url in urls]
end

function request_async(urls)
    return asyncmap(http_request, urls)
end

function main()
    println(PROGRAM_FILE," start!!")

    urls = ["http://yahoo.com/",
            "http://www.google.co.jp/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            "http://www.google.co.jp/",
            "https://www.w3.org/Protocols/HTTP/Performance/microscape/",
            ]

    println("Request sequentially")
    @time requests = request_sequentially(urls)
    @show requests

    println("Request asynchronouslly")
    @time requests = request_async(urls)
    @show requests


    println(PROGRAM_FILE," Done!!")
end

if abspath(PROGRAM_FILE) == @__FILE__
    main()
end

参考資料

asyncio, aiohttpを利用した並列処理のサンプルコード · GitHub

myenigma.hatenablog.com

myenigma.hatenablog.com

myenigma.hatenablog.com

MyEnigma Supporters

もしこの記事が参考になり、

ブログをサポートしたいと思われた方は、

こちらからよろしくお願いします。

myenigma.hatenablog.com