Better control of your asyncio tasks
APACHE-2.0 License
quattro is an Apache 2 licensed library, written in Python, for task control in asyncio applications. quattro is influenced by structured concurrency concepts from the Trio framework.
quattro supports Python versions 3.9 - 3.11, including PyPy.
To install quattro, simply:
$ pip install quattro
quattro.gather
quattro comes with an independent, simple implementation of asyncio.gather
based on Task Groups.
The quattro version is safer, and uses a task group under the hood to not leak tasks in cases of errors in child tasks.
from quattro import gather
async def my_handler():
res_1, res_2 = await gather(long_query_1(), long_query_2())
The return_exceptions
argument can be used to make gather()
catch and return exceptions as responses instead of letting them bubble out.
from quattro import gather
async def my_handler():
res_1, res_2 = await gather(
long_query_1(),
long_query_2(),
return_exceptions=True,
)
# res_1 and res_2 may be instances of exceptions.
The differences to asyncio.gather()
are:
quattro.gather()
only accepts coroutines and not futures and generators, just like a TaskGroup.return_exceptions
is false (the default), an exception in a child task will cause an ExceptionGroup to bubble out of the top-level gather()
call, just like in a TaskGroup.quattro contains an independent, asyncio implementation of Trio CancelScopes. Due to fundamental differences between asyncio and Trio the actual runtime behavior isn't exactly the same, but close.
from quattro import move_on_after
async def my_handler():
with move_on_after(1.0) as cancel_scope:
await long_query()
# 1 second later, the function continues running
quattro contains the following helpers:
move_on_after
move_on_at
fail_after
fail_at
All helpers produce instances of quattro.CancelScope
, which is largely similar to the Trio variant.
CancelScopes
have the following attributes:
cancel()
- a method through which the scope can be cancelled manually.cancel()
can be called before the scope is entered; entering the scope will cancel it at the first opportunitydeadline
- read/write, an optional deadline for the scope, at which the scope will be cancelledcancelled_caught
- a readonly bool property, whether the scope finished via cancellationquattro also supports retrieving the current effective deadline in a task using quattro.current_effective_deadline
.
The current effective deadline is a float value, with float('inf')
standing in for no deadline.
Python versions 3.11 and higher contain similar helpers, asyncio.timeout
and asyncio.timeout_at
.
The quattro fail_after
and fail_at
helpers are effectively equivalent to the asyncio timeouts, and pass the test suite for them.
The differences are:
with
), asyncio versions are async context managers (using async with
).move_on_at
and move_on_after
helpers.scope.cancel()
, and precancelled before they are enteredfail_after
and fail_at
raise asyncio.Timeout
instead of trio.Cancelled
exceptions when they fail.
asyncio has edge-triggered cancellation semantics, while Trio has level-triggered cancellation semantics. The following example will behave differently in quattro and Trio:
with trio.move_on_after(TIMEOUT):
conn = make_connection()
try:
await conn.send_hello_msg()
finally:
await conn.send_goodbye_msg()
In Trio, if the TIMEOUT
expires while awaiting send_hello_msg()
, send_goodbye_msg()
will also be cancelled.
In quattro, send_goodbye_msg()
will run (and potentially block) anyway.
This is a limitation of the underlying framework.
In quattro, cancellation scopes cannot be shielded.
On Python 3.11 and later, the standard library TaskGroup implementation is used instead. The TaskGroup implementation here can be considered a backport for older Python versions.
quattro contains a TaskGroup implementation. TaskGroups are inspired by Trio nurseries.
from quattro import TaskGroup
async def my_handler():
# We want to spawn some tasks, and ensure they are all handled before we return.
async def task_1():
...
async def task_2():
...
async with TaskGroup() as tg:
t1 = tg.create_task(task_1)
t2 = tg.create_task(task_2)
# The end of the `async with` block awaits the tasks, ensuring they are handled.
TaskGroups are essential building blocks for achieving the concept of structured concurrency. In simple terms, structured concurrency means your code does not leak tasks - when a coroutine finishes, all tasks spawned by that coroutine and all its children are also finished. (In fancy terms, the execution flow becomes a directed acyclic graph.)
Structured concurrency can be achieved by using TaskGroups instead of asyncio.create_task
to start background tasks.
TaskGroups essentially do two things:
async with
block, the TaskGroup awaits all of its children, ensuring they are finished when it exitsThe implementation has been borrowed from the EdgeDB project.
name
keyword-only parameter to TaskGroup.create_task
.fail_after
and fail_at
.name
and the repr
is slightly different, to harmonize with the Python 3.11 standard library implementation.py.typed
to enable typing information.quattro.current_effective_deadline
.The initial TaskGroup implementation has been taken from the EdgeDB project. The CancelScope implementation was heavily influenced by Trio, and inspired by the async_timeout package.