Async/await reactive tools for Python 3.11+
MIT License
NEWS: Project rebooted Nov. 2020. Rebuilt using Expression.
Aioreactive is RxPY for asyncio. It's an asynchronous and reactive Python library for asyncio using async and await. Aioreactive is built on the Expression functional library and, integrates naturally with the Python language.
aioreactive is the unification of RxPY and reactive programming with asyncio using async and await.
async
. Sending values is async, subscribing toconcurrent.futures
flat_map()
or similar. Seeparallel.py
With aioreactive you subscribe observers to observables, and the key abstractions of aioreactive can be seen in this single line of code:
subscription = await observable.subscribe_async(observer)
The difference from RxPY can be seen with the await
expression.
Aioreactive is built around the asynchronous duals, or opposites of the
AsyncIterable and AsyncIterator abstract base classes. These async
classes are called AsyncObservable and AsyncObserver.
AsyncObservable is a producer of events. It may be seen as the dual or
opposite of AsyncIterable and provides a single setter method called
subscribe_async()
that is the dual of the __aiter__()
getter method:
from abc import ABC, abstractmethod
class AsyncObservable(ABC):
@abstractmethod
async def subscribe_async(self, observer):
return NotImplemented
AsyncObserver is a consumer of events and is modeled after the
so-called consumer interface, the
enhanced generator interface in
PEP-342 and async
generators in PEP-525. It
is the dual of the AsyncIterator __anext__()
method, and expands to
three async methods asend()
, that is the opposite of __anext__()
,
athrow()
that is the opposite of an raise Exception()
and aclose()
that is the opposite of raise StopAsyncIteration
:
from abc import ABC, abstractmethod
class AsyncObserver(ABC):
@abstractmethod
async def asend(self, value):
return NotImplemented
@abstractmethod
async def athrow(self, error):
return NotImplemented
@abstractmethod
async def aclose(self):
return NotImplemented
An observable becomes hot and starts streaming items by using the
subscribe_async()
method. The subscribe_async()
method takes an
observable and returns a disposable subscription. So the
subscribe_async()
method is used to attach a observer to the
observable.
async def asend(value):
print(value)
disposable = await subscribe_async(source, AsyncAnonymousObserver(asend))
AsyncAnonymousObserver
is an anonymous observer that constructs an
AsyncObserver
out of plain async functions, so you don't have to
implement a new named observer every time you need one.
The subscription returned by subscribe_async()
is disposable, so to
unsubscribe you need to await the dispose_async()
method on the
subscription.
await subscription.dispose_async()
Even more interesting, with to_async_iterable
you can flip around from
AsyncObservable
to an AsyncIterable
and use async-for
to consume
the stream of events.
import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
async for x in xs:
print(x)
They effectively transform us from an async push model to an async pull
model, and lets us use the awesome new language features such as async for
and async-with
. We do this without any queueing, as a push by the
AsyncObservable
will await the pull by the `AsyncIterator. This
effectively applies so-called "back-pressure" up the subscription as the
producer will await the iterator to pick up the item send.
The for-loop may be wrapped with async-with to control the lifetime of the subscription:
import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
result = []
obv = rx.AsyncIteratorObserver(xs)
async with await xs.subscribe_async(obv) as subscription:
async for x in obv:
result.append(x)
assert result == [1, 2, 3]
An async stream is both an async observer and an async observable. Aioreactive lets you create streams explicitly.
import aioreactive as rx
stream = AsyncSubject() # Alias for AsyncMultiStream
sink = rx.AsyncAnonymousObserver()
await stream.subscribe_async(sink)
await stream.asend(42)
You can create streams directly from AsyncMultiStream
or
AsyncSingleStream
. AsyncMultiStream
supports multiple observers, and
is hot in the sense that it will drop any event that is sent if there
are currently no observers attached. AsyncSingleStream
on the other
hand supports a single observer, and is cold in the sense that it will
await any producer until there is an observer attached.
The Rx operators in aioreactive are all plain old functions. You can apply them to an observable and compose it into a transformed, filtered, aggregated or combined observable. This transformed observable can be streamed into an observer.
Observable -> Operator -> Operator -> Operator -> Observer
Aioreactive contains many of the same operators as you know from RxPY. Our goal is not to implement them all, but to provide the most essential ones.
With aioreactive you can choose to program functionally with plain old functions, or object-oriented with classes and methods. Aioreactive supports both method chaining or forward pipe programming styles.
AsyncObservable
may compose operators using forward pipelining with
the pipe
operator provided by the amazing
Expression library. This works
by having the operators partially applied with their arguments before
being given the source stream as the last curried argument.
ys = pipe(xs, filter(predicate), map(mapper), flat_map(request))
Longer pipelines may break lines as for binary operators:
import aioreactve as rx
async def main():
stream = rx.AsyncSubject()
obv = rx.AsyncIteratorObserver()
xs = pipe(
stream,
rx.map(lambda x: x["term"]),
rx.filter(lambda text: len(text) > 2),
rx.debounce(0.75),
rx.distinct_until_changed(),
rx.map(search_wikipedia),
rx.switch_latest(),
)
async with xs.subscribe_async(obv) as ys
async for value in obv:
print(value)
AsyncObservable also supports slicing using the Python slice notation.
@pytest.mark.asyncio
async def test_slice_special():
xs = rx.from_iterable([1, 2, 3, 4, 5])
values = []
async def asend(value):
values.append(value)
ys = xs[1:-1]
result = await run(ys, AsyncAnonymousObserver(asend))
assert result == 4
assert values == [2, 3, 4]
An alternative to pipelining is to use the classic and fluent method chaining as we know from ReactiveX.
An AsyncObservable
created from class methods such as
AsyncRx.from_iterable()
returns a AsyncChainedObservable
.
where we may use methods such as .filter()
and .map()
.
from aioreactive import AsyncRx
@pytest.mark.asyncio
async def test_observable_simple_pipe():
xs = AsyncRx.from_iterable([1, 2, 3])
result = []
async def mapper(value):
await asyncio.sleep(0.1)
return value * 10
async def predicate(value):
await asyncio.sleep(0.1)
return value > 1
ys = xs.filter(predicate).map(mapper)
async def on_next(value):
result.append(value)
subscription = await ys.subscribe_async(AsyncAnonymousObserver(on_next))
await subsubscription
assert result == [20, 30]
Aioreactive also provides a virtual time event loop
(VirtualTimeEventLoop
) that enables you to write asyncio unit-tests
that run in virtual time. Virtual time means that time is emulated, so
tests run as quickly as possible even if they sleep or awaits long-lived
operations. A test using virtual time still gives the same result as it
would have done if it had been run in real-time.
For example the following test still gives the correct result even if it takes 0 seconds to run:
@pytest.fixture()
def event_loop():
loop = VirtualTimeEventLoop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_call_later():
result = []
def action(value):
result.append(value)
loop = asyncio.get_event_loop()
loop.call_later(10, partial(action, 1))
loop.call_later(1, partial(action, 2))
loop.call_later(5, partial(action, 3))
await asyncio.sleep(10)
assert result == [2, 3, 1]
The aioreactive testing module provides a test AsyncSubject
that may
delay sending values, and a test AsyncTestObserver
that records all
events. These two classes helps you with testing in virtual time.
@pytest.fixture()
def event_loop():
loop = VirtualTimeEventLoop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_delay_done():
xs = AsyncTestSubject() # Test stream
ys = pipe(xs, rx.delay(1.0))
obv = AsyncTestObserver() # Test AsyncAnonymousObserver
async with await ys.subscribe_async(obv):
await xs.asend_later(0, 10)
await xs.asend_later(1.0, 20)
await xs.aclose_later(1.0)
await obv
assert obv.values == [
(ca(1), OnNext(10)),
(ca(2), OnNext(20)),
(ca(3), OnCompleted()),
]
AsyncIterable
and AsyncObservable
are closely related (in fact they
are duals). AsyncIterable
is an async iterable (pull) world, while
AsyncObservable
is an async reactive (push) based world. There are
many operations such as map()
and filter()
that may be simpler to
implement using AsyncIterable
, but once we start to include time, then
AsyncObservable
really starts to shine. Operators such as delay()
makes much more sense for AsyncObservable
than for AsyncIterable
.
However, aioreactive makes it easy for you to flip-around to async iterable just before you need to consume the stream, thus giving you the best of both worlds.
Aioreactive will not replace RxPY.
RxPY is an implementation of Observable
. Aioreactive is an
implementation of AsyncObservable
.
Rx and RxPY has hundreds of different query operators, and we currently have no plans to implementing all of them for aioreactive.
Many ideas from aioreactive have already been ported back into "classic" RxPY.
Aioreactive was inspired by:
The MIT License (MIT) Copyright (c) 2016 Børge Lanes, Dag Brattli.