Tutorial of concurrency in Python3 (Multi-threading, Multi-processing, Asynchronous programming)
HTTP
HTTP API
import time
def app(environ, start_response):
time.sleep(1)
start_response('200 OK', [('Content-type', 'text/plain; charset=utf-8')])
return [b'This is a slow web api']
3
$ gunicorn -w 3 server:app
HTTP Pythonrequests
import requests
def main():
urls = ['http://localhost:8000' for _ in range(3)]
for u in urls:
r = requests.get(u)
print(r.text)
if __name__ == '__main__':
main()
$ time python client_sync.py
This is a slow web api
This is a slow web api
This is a slow web api
real 0m3.240s
user 0m0.164s
sys 0m0.037s
3 1
PythonCPU
import requests
from threading import Thread
from queue import Queue
def fetch(url, results_queue):
resp = requests.get(url)
results_queue.put(resp.text)
def main():
results_queue = Queue()
threads = []
urls = ['http://localhost:8000' for _ in range(3)]
for u in urls:
thread = Thread(target=fetch, args=[u, results_queue])
thread.start()
threads.append(thread)
while threads:
threads.pop().join()
while not results_queue.empty():
print(results_queue.get())
if __name__ == '__main__':
main()
GILPython1 I/OGIL
$ time python client_threading.py
This is a slow web api
This is a slow web api
This is a slow web api
real 0m1.199s
user 0m0.167s
sys 0m0.028s
1/3 PyCharm
threading
import aiohttp
import asyncio
async def fetch(l, url):
async with aiohttp.ClientSession(loop=l) as session:
async with session.get(url) as response:
return await response.text()
async def main(l, url, num):
tasks = [asyncio.ensure_future(fetch(l, url)) for _ in range(num)]
return await asyncio.gather(*tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop, 'http://localhost:8000', 3))
for r in results:
print(r)
$ time python client_async.py
This is a slow web api
This is a slow web api
This is a slow web api
real 0m1.415s
user 0m0.333s
sys 0m0.051s
1.415s
multithreading
3
async/await asyncio OS Python OS
PyCharm asyncioPOSIX
9
multithreading
9
worker396 1, 2
3
asyncio
Semaphore
import aiohttp
import asyncio
async def fetch(l, url):
async with aiohttp.ClientSession(loop=l) as session:
async with session.get(url) as response:
return await response.text()
async def bound_fetch(semaphore, l, url):
async with semaphore:
return await fetch(l, url)
async def main(l, url, num):
s = asyncio.Semaphore(3)
tasks = [asyncio.ensure_future(bound_fetch(s, l, url))
for _ in range(num)]
return await asyncio.gather(*tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop, 'http://localhost:8000', 9))
for r in results:
print(r)
$ time python client_async_with_semaphore.py
This is a slow web api
This is a slow web api
: ()
real 0m3.375s
user 0m0.318s
sys 0m0.050s
I/OCPUGILPythonCPU
import requests
from multiprocessing import Pool
def fetch(url):
resp = requests.get(url)
return resp.text
def main():
urls = ['http://localhost:8000' for _ in range(3)]
with Pool(processes=3) as pool:
results = pool.map(fetch, urls)
for r in results:
print(r)
if __name__ == '__main__':
main()
multiprocessing
Pool
$ time python client_multiprocessing.py
This is a slow web api
This is a slow web api
This is a slow web api
real 0m1.347s
user 0m0.234s
sys 0m0.079s
I/OGIL
multithreadingmultiprocessing