目次
はじめに
昨今、ネットワークを通して協調的に動くシステムを
作ることが多いため、
並行処理は重要な技術だと思います。
しかし、この並行処理は、様々な専門用語があり、
昔から分かりづらいなと思っていたので、
色々調べながらまとめてみました。
加えて、いくつかのプログラミング言語で、
実際にHTTPリクエストを並行処理で実施する
サンプルコードを紹介したいと思います。
並行処理概要(async/await)
似たような処理として、
並行(Concurrent)処理と並列(Parallel)処理があります。
これらの違いについては、下記の資料を参考にしていただきたいですが、
並行は、1つのCPUが実行する複数の処理を入れ替えて処理するもので、
並列は、複数のCPUで同時に複数の処理を処理するものです。
この説明だと、並行処理は結局処理を同時並行で実施できないため、
あまり用途がなさそうに見えますが、
ネットワーク越しにリクエストをして、相手の処理を待っている時間や、
大きなファイルを書いたり、読んだりする時など、
CPUを使う処理ではない場合は、
並列処理よりも並行処理のほうが適している場合があります。
なぜなら並列処理では、プロセスの立ち上げなどのコストや、
プロセス間通信のコストがかからないため、
並列処理のほうが無駄なリソースを使わないからです。
以上のような特徴から、
並列処理は、HTTPの複数のリクエストを送る時に、
それぞれの処理が帰ってくる間に、
他のリクエストを送る時などに利用されています。
並行並列処理関連用語まとめ
並行並列処理には種類によって、様々な名前がありますが、
下記のリンク先の説明を元に、まとめたのが下記の表になります。
(間違っていたら、コメント欄にて指摘をお願いします。)
softwareengineering.stackexchange.com
名前 | 説明 |
---|---|
Process | OSが管理する並列可能な処理のこと。それぞれのプロセスは独自のアドレス空間内を割り当てられている。複数のCPUが存在している場合、同時並行で処理される。 |
Thread | OSが管理する(おそらく)並列可能な処理のこと。それぞれのスレッドは親のスレッドや他のスレッドと一緒に同じアドレス空間に割り当てられている。スレッドの実行制御はOSが実施する(プリエンプティブ)。処理系によっては複数のスレッドの同時実行が許されておらず(PythonのGILなど)、その場合は並行処理となる。 |
Green Thread | ほぼスレッドと同じ特徴を持つ処理だが、OSが管理するのではなく、ユーザ空間(VMなど)で管理される。但し、複数のワーカースレッドやプロセスが同時に処理するわけではないので、並行処理になる(但しErlangのGreen Threadは並列処理になる)。ネイティブスレッドをサポートしていない環境でも動作することができる。 |
Fibers | OSが管理する並行処理用の軽量スレッド。次に実行される処理(fiber)を、前のfiberが指定する形でスケジューリングされる。 |
Coroutines | 複数のポイントで実行を停止したり、再開できるルーチンのこと。並列処理を実施するときに利用されることが多い。 |
Goroutines | Go言語で実装されているGreen Threadのような機能。オーバヘッドの少ない並列処理を簡単に実現するように実装されたもの。 |
Pythonにおける並行処理
Pythonでは、async/await構文を使いコルーチンやTaskを作成して、
並行処理を実施することができます。
基本的にはasync def でコルーチン関数を定義し、
シーケンシャルにコルーチン関数を実行する場合は、そのままその関数をawaitで呼び、
並行で処理したい場合は、Taskにしてawaitで呼びます。
awaitで指定された部分で、処理が一時停止されたり、再開されたりして、並行処理を実現できます。
asyncio.gatherで複数のコルーチン関数をタスクに変換して並列実行したり、
In [17]: async def main(): ...: await asyncio.gather( ...: say_after(1, "1"), ...: say_after(2, "2"), ...: say_after(3, "3")) ...: In [18]: asyncio.run(main()) 1 2 3
asyncio.wait_forでタイムアウトを設定したり、
In [19]: async def say_after(delay, what): ...: await asyncio.sleep(delay) ...: print(what) ...: In [22]: async def main(): ...: try: ...: await asyncio.wait_for(say_after(1, "1"), timeout=2.0) ...: except asyncio.TimeoutError: ...: print("timeout") ...: ...: ...: In [23]: asyncio.run(main()) 1 In [24]: async def main(): ...: try: ...: await asyncio.wait_for(say_after(3, "3"), timeout=2.0) ...: except asyncio.TimeoutError: ...: print("timeout") ...: In [25]: asyncio.run(main()) timeout
asyncio.to_threadでスレッドとして実行したりできます。
またタスクはcancelで外部からキャンセルすることも可能です。
ちなみにPythonはGIL(Global Interpreter lock)があるため、
スレッド化しても並列処理はされずに、並行処理になってしまいます。
なので、Pythonで並行処理をしたい場合はThreadを使うこともできます。
並行処理によるHTTPリクエストサンプル
下記のコードは複数のURLにHTTPでアクセスするときに、
シーケンシャルに処理するのと、
並行に実行したときの結果を比較したものです。
並行の方が、高速に実行できていることがわかります。
並行処理では、asyncを使ったものと、ThreadPoolのmapを使ったものの
2つの関数を実装しました。
""" Concurrent http request sample Output example: start!! 4.054506063461304 [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>] 1.670924186706543 [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>] 1.6389338970184326 [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>] done!! """ import requests import time import asyncio from concurrent.futures import ThreadPoolExecutor async def request(url): loop = asyncio.get_event_loop() r = await loop.run_in_executor(None, requests.get, url) return r async def gather_requests(urls): loop = asyncio.get_event_loop() tasks = [] for url in urls: tasks.append(request(url)) requests = await asyncio.gather(*tasks) return requests def request_async(urls): loop = asyncio.get_event_loop() return loop.run_until_complete(gather_requests(urls)) def request_async2(urls): with ThreadPoolExecutor() as pool: return list(pool.map(requests.get, urls)) def request_sequentialy(urls): return [requests.get(url) for url in urls] def main(): print("start!!") urls = ["http://yahoo.com/", "http://www.google.co.jp/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", "http://www.google.co.jp/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", ] start = time.time() results = request_sequentialy(urls) print(time.time()-start) print(results) start = time.time() results = request_async(urls) print(time.time()-start) print(results) start = time.time() results = request_async2(urls) print(time.time()-start) print(results) print("done!!") if __name__ == '__main__': main()
Juliaにおける並行処理
Juliaの並行処理に関しては、下記の資料を参照ください。
現状、JuliaのTaskは一つのOSのスレッド上で動くため、
並行処理になります。
並行処理によるHTTPリクエストサンプル
下記のコードは複数のURLにHTTPでアクセスするときに、
シーケンシャルに処理するのと、
並列に実行したときの結果を比較したものです。
並列の方が、高速に実行できていることがわかります。
" Async HTTP request with Julia Output example: Request sequentially 11.335611 seconds (8.97 M allocations: 512.870 MiB, 3.82% gc time, 0.57% compil ation time) requests = Int16[200, 200, 200, 200, 200, 200] Request asynchronouslly 1.693590 seconds (266.30 k allocations: 18.420 MiB, 8.35% compilation time) requests = Int16[200, 200, 200, 200, 200, 200] " using HTTP function http_request(url) r = HTTP.request("GET", url) return r.status end function request_sequentially(urls) return [http_request(url) for url in urls] end function request_async(urls) return asyncmap(http_request, urls) end function main() println(PROGRAM_FILE," start!!") urls = ["http://yahoo.com/", "http://www.google.co.jp/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", "http://www.google.co.jp/", "https://www.w3.org/Protocols/HTTP/Performance/microscape/", ] println("Request sequentially") @time requests = request_sequentially(urls) @show requests println("Request asynchronouslly") @time requests = request_async(urls) @show requests println(PROGRAM_FILE," Done!!") end if abspath(PROGRAM_FILE) == @__FILE__ main() end
参考資料
asyncio, aiohttpを利用した並列処理のサンプルコード · GitHub
MyEnigma Supporters
もしこの記事が参考になり、
ブログをサポートしたいと思われた方は、
こちらからよろしくお願いします。