Combining Coroutines with Threads and Processes
Threads
The run_in_executor() method of the event loop takes an executor instance, a regular callable to invoke, and any arguments to be passed to the callable. It returns a Future that can be used to wait for the function to finish its work and return something. If no executor is passed in, a ThreadPoolExecutor is created. This example explicitly creates an executor to limit the number of worker threads it will have available.
A ThreadPoolExecutor starts its worker threads and then calls each of the provided functions once in a thread. This example shows how to combine run_in_executor() and wait() to have a coroutine yield control to the event loop while blocking functions run in separate threads, and then wake back up when those functions are finished.
#asyncio_executor_thread.py
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
#output
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_1 blocks(4): running
ThreadPoolExecutor-0_2 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(5): done
ThreadPoolExecutor-0_1 blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting
Processes
A ProcessPoolExecutor works in much the same way, creating a set of worker processes instead of threads. Using separate processes requires more system resources, but for computationally-intensive operations it can make sense to run a separate task on each CPU core.
#asyncio_executor_process.py
# changes from asyncio_executor_thread.py
if __name__ == '__main__':
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
#output
PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499 blocks(0): running
PID 40500 blocks(1): running
PID 40501 blocks(2): running
PID 40499 blocks(0): done
PID 40500 blocks(1): done
PID 40501 blocks(2): done
PID 40500 blocks(3): running
PID 40499 blocks(4): running
PID 40501 blocks(5): running
PID 40499 blocks(4): done
PID 40500 blocks(3): done
PID 40501 blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting
source: https://pymotw.com/3/asyncio/executors.html#asyncio-executors