FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
APACHE-2.0 License
Bot releases are visible (Hide)
Just a hotfix for the following case:
@broker.subscriber(...)
async def handler():
return NatsResponse(...)
await broker.publish(..., rpc=True)
Full Changelog: https://github.com/airtai/faststream/compare/0.5.16...0.5.17
Published by Lancetnik 2 months ago
Well, seems like it is the biggest patch release ever 😃
First of all, thanks to all new contributors, who helps us to improve the project! They made a huge impact to this release by adding new Kafka security mechanisms and extend Response API - now you can use broker.Response
to publish detail information from handler
@broker.subscriber("in")
@broker.publisher("out")
async def handler(msg):
return Response(msg, headers={"response_header": "Hi!"}) # or KafkaResponse, etc
Also, we added a new huge feature - ASGI support!
Nope, we are not HTTP-framework now, but it is a little ASGI implementation to provide you with an ability to host documentation, use k8s http-probes and serve metrics in the same with you broker runtime without any dependencies.
You just need to use AsgiFastStream class
from faststream.nats import NatsBroker
from faststream.asgi import AsgiFastStream, make_ping_asgi
from prometheus_client import make_asgi_app
from prometheus_client.registry import CollectorRegistry
broker = NatsBroker()
prometheus_registry = CollectorRegistry()
app = AsgiFastStream(
broker,
asyncapi_path="/docs",
asgi_routes=[
("/health", make_ping_asgi(broker, timeout=5.0)),
("/metrics", make_asgi_app(registry=prometheus_registry))
]
)
And then you can run it like a regular ASGI app
uvicorn main:app
One more thing - manual topic partition assignment for Confluent. We have it already for aiokafka, but missed it here... Now it was fixed!
from faststream.confluent import TopicPartition
@broker.subscriber(partitions=[
TopicPartition("test-topic", partition=0),
])
async def handler():
...
fail_fast
option in #1647NatsRouter
subjects prefixes behaviorFull Changelog: https://github.com/airtai/faststream/compare/0.5.15...0.5.16
Published by Lancetnik 3 months ago
Finally, FastStream has a Kafka pattern subscription! This is another step forward in our Roadmap moving us to 0.6.0 and futher!
from faststream import Path
from faststream.kafka import KafkaBroker
broker = KafkaBroker()
@broker.subscriber(pattern="logs.{level}")
async def base_handler(
body: str,
level: str = Path(),
):
...
Also, all brokers now supports a new ping
method to check real broker connection
is_connected: bool = await broker.ping()
This is a little, but important change for K8S probes support
More other there are a lot of bugfixes and improvements from our contributors! Thanks to all of these amazing people!
Full Changelog: https://github.com/airtai/faststream/compare/0.5.14...0.5.15
Published by kumaranvpl 4 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.13...0.5.14
Published by Lancetnik 4 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.12...0.5.13
Published by kumaranvpl 4 months ago
Now, FastStream
provides users with the ability to pass the config
dictionary to confluent-kafka-python
for greater customizability. The following example sets the parameter topic.metadata.refresh.fast.interval.ms
's value to 300
instead of the default value 100
via the config
parameter.
from faststream import FastStream
from faststream.confluent import KafkaBroker
config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)
Full Changelog: https://github.com/airtai/faststream/compare/0.5.11...0.5.12
Published by Lancetnik 4 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.10...0.5.11
Published by Lancetnik 5 months ago
Now you can return Response class to set more specific outgoing message parameters:
from faststream import Response
@broker.subscriber("in")
@broker.subscriber("out")
async def handler():
return Response(body=b"", headers={})
TestKafkaBroker
behaviour where Consumer Groups weren't being respected by @sifex in https://github.com/airtai/faststream/pull/1413
Full Changelog: https://github.com/airtai/faststream/compare/0.5.9...0.5.10
Published by kumaranvpl 5 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.8...0.5.9
Published by Lancetnik 5 months ago
This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!
KeyValue creation and watching API added (you can read updated documentation section for changes):
from faststream import FastStream, Logger
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("some-key", kv_watch="bucket")
async def handler(msg: int, logger: Logger):
logger.info(msg)
@app.after_startup
async def test():
kv = await broker.key_value("bucket")
await kv.put("some-key", b"1")
ObjectStore API added as well (you can read updated documentation section for changes):
from faststream import FastStream, Logger
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("file-bucket", obj_watch=True)
async def handler(filename: str, logger: Logger):
logger.info(filename)
@app.after_startup
async def test():
object_store = await broker.object_storage("file-bucket")
await object_store.put("some-file.txt", b"1")
Also now you can use just pull_sub=True
instead of pull_sub=PullSub()
in basic case:
from faststream import FastStream, Logger
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
logger.info(msg)
Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:
@broker.subscriber("tests", no_reply=True)
async def handler():
....
# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
Full Changelog: https://github.com/airtai/faststream/compare/0.5.7...0.5.8
Published by Lancetnik 5 months ago
Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!
First of all you need to install required dependencies to support OpenTelemetry:
pip install faststream[otel]
Then you can just add a middleware for your broker and that's it!
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware
broker = NatsBroker(
middlewares=(
NatsTelemetryMiddleware(),
)
)
app = FastStream(broker)
To find detailt information just visit our documentation aboout telemetry
P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.
Full Changelog: https://github.com/airtai/faststream/compare/0.5.6...0.5.7
Published by Lancetnik 5 months ago
from faststream.rabbit.annotations import Connection, Channel
shortcutsbroker.add_middleware
public API to append a middleware to already created brokerRabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool)
options to setup channel settingsStreamMessage.batch_headers
attribute to provide with access to whole batch messages headersFull Changelog: https://github.com/airtai/faststream/compare/0.5.5...0.5.6
Published by Lancetnik 5 months ago
Add support for explicit partition assignment in aiokafka KafkaBroker
(special thanks to @spataphore1337):
from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition
broker = KafkaBroker()
topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)
@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
...
Full Changelog: https://github.com/airtai/faststream/compare/0.5.4...0.5.5
Published by Lancetnik 6 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.3...0.5.4
Published by davorrunje 6 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.5.2...0.5.3
Published by Lancetnik 6 months ago
Just a little bugfix patch. Fixes #1379 and #1376.
Full Changelog: https://github.com/airtai/faststream/compare/0.5.1...0.5.2
Published by Lancetnik 6 months ago
We already have some fixes related to RedisBroker
(#1375, #1376) and some new features for you:
broke.include_router(...)
allows to pass some arguments to setup router at including moment instead of creationbroker.include_router(
router,
prefix="test_",
dependencies=[Depends(...)],
middlewares=[BrokerMiddleware],
include_in_schema=False,
)
KafkaBroker().subscriber(...)
now consumes aiokafka.ConsumerRebalanceListener
object.(close #1319)
broker = KafkaBroker()
broker.subscriber(..., listener=MyRebalancer())
pattern
option was added too, but it is still experimental and does not support Path
Path
feature perfomance was increased. Also, Path
is suitable for NATS PullSub
batch subscribtion as well now.from faststream import NatsBroker, PullSub
broker = NastBroker()
@broker.subscriber(
"logs.{level}",
steam="test-stream",
pull_sub=PullSub(batch=True),
)
async def base_handler(
...,
level: str = Path(),
):
...
Full Changelog: https://github.com/airtai/faststream/compare/0.5.0...0.5.1
Published by davorrunje 6 months ago
This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.
New features:
await FastStream.stop()
method and StopApplication
exception to stop a FastStream
worker are added.
broker.subscriber()
and router.subscriber()
functions now return a Subscriber
object you can use later.
subscriber = broker.subscriber("test")
@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
...
@subscriber()
async def handler(msg: dict[str, Any]):
...
This is the preferred syntax for filtering now (the old one will be removed in 0.6.0
)
router.publisher()
function now returns the correct Publisher
object you can use later (after broker startup).publisher = router.publisher("test")
@router.subscriber("in")
async def handler():
await publisher.publish("msg")
(Until 0.5.0
you could use it in this way with broker.publisher
only)
middlewares
can be passed to a broker.publisher
as well:broker = Broker(..., middlewares=())
@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=()) # new feature
async def handler():
...
Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.
⚠️ BREAKING CHANGE ⚠️ : both subscriber
and publisher
middlewares should be async context manager type
async def subscriber_middleware(call_next, msg):
return await call_next(msg)
async def publisher_middleware(call_next, msg, **kwargs):
return await call_next(msg, **kwargs)
@broker.subscriber(
"in",
middlewares=(subscriber_middleware,),
)
@broker.publisher(
"out",
middlewares=(publisher_middleware,),
)
async def handler(msg):
return msg
Such changes allow you two previously unavailable features:
Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks
and response_class
subscriber option are supported.
All .pyi
files are removed, and explicit docstrings and methods options are added.
New subscribers can be registered in runtime (with an already-started broker):
subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
faststream[docs]
distribution is removed.Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0
Published by Lancetnik 6 months ago
This is the final API change before stable 0.5.0
release
⚠️ HAS BREAKING CHANGE
In it, we stabilize the behavior of publishers & subscribers middlewares
async def subscriber_middleware(call_next, msg):
return await call_next(msg)
async def publisher_middleware(call_next, msg, **kwargs):
return await call_next(msg, **kwargs)
@broker.subscriber(
"in",
middlewares=(subscriber_middleware,),
)
@broker.publisher(
"out",
middlewares=(publisher_middleware,),
)
async def handler(msg):
return msg
Such changes allows you two features previously unavailable
Without these features we just can't impelement Observability Middleware or any similar tool, so it is the job to be done.
Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!
Full Changelog: https://github.com/airtai/faststream/compare/0.5.0rc0...0.5.0rc2
Published by Lancetnik 7 months ago
This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.
This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:
pip install faststream==0.5.0rc0
We look forward to your feedback!
New features:
await FastStream.stop()
method and StopApplication
exception to stop a FastStream
worker are added.
broker.subscriber()
and router.subscriber()
functions now return a Subscriber
object you can use later.
subscriber = broker.subscriber("test")
@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
...
@subscriber()
async def handler(msg: dict[str, Any]):
...
This is the preferred syntax for filtering now (the old one will be removed in 0.6.0
)
router.publisher()
function now returns the correct Publisher
object you can use later (after broker startup).publisher = router.publisher("test")
@router.subscriber("in")
async def handler():
await publisher.publish("msg")
(Until 0.5.0
you could use it in this way with broker.publisher
only)
middlewares
can be passed to a broker.publisher
as well:broker = Broker(..., middlewares=())
@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=()) # new feature
async def handler():
...
Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.
⚠️ BREAKING CHANGE ⚠️ : both subscriber
and publisher
middlewares should be async context manager type
from contextlib import asynccontextmanager
@asynccontextmanager
async def subscriber_middleware(msg_body):
yield msg_body
@asynccontextmanager
async def publisher_middleware(
msg_to_publish,
**publish_arguments,
):
yield msg_to_publish
@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
...
A better FastAPI compatibility: fastapi.BackgroundTasks
and response_class
subscriber option are supported.
All .pyi
files are removed, and explicit docstrings and methods options are added.
New subscribers can be registered in runtime (with an already-started broker):
subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
faststream[docs]
distribution is removed.Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0rc0