FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
APACHE-2.0 License
Bot releases are hidden (Show)
Published by Lancetnik 7 months ago
apply_types
warning notice to subscription/index.md by @Lancetnik in https://github.com/airtai/faststream/pull/1291
Full Changelog: https://github.com/airtai/faststream/compare/0.4.6...0.4.7
Published by davorrunje 8 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.4.5...0.4.6
Published by Lancetnik 8 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.4.4...0.4.5
Published by Lancetnik 8 months ago
Add RedisStream batch size option
@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
assert len(msgs) <= 3
Full Changelog: https://github.com/airtai/faststream/compare/0.4.3...0.4.4
Published by Lancetnik 8 months ago
Allow to specify Redis Stream maxlen option in publisher:
@broker.publisher(stream=StreamSub("Output", maxlen=10))
async def on_input_data():
....
Full Changelog: https://github.com/airtai/faststream/compare/0.4.2...0.4.3
Published by davorrunje 9 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.4.1...0.4.2
Published by davorrunje 9 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.4.0...0.4.1
Published by davorrunje 9 months ago
This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async
functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async
wrapper around Confluent's Python Client and add full support for it in FastStream.
If you want to try it out, install it first with:
pip install "faststream[confluent]>=0.4.0"
To connect to Kafka using the FastStream KafkaBroker module, follow these steps:
Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.
Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic
Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with @broker.subscriber(...)
and @broker.publisher(...)
decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.
Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
return f"User: {user_id} - {user} registered"
For more information, please visit the documentation at:
https://faststream.airt.ai/latest/confluent/
Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0
Published by davorrunje 9 months ago
This is a preview version of 0.4.0 release introducing support for Confluent-based Kafka broker.
Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)
@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
return f"User: {user_id} - {user} registered"
Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0rc0
Published by davorrunje 10 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.3.12...0.3.13
Published by davorrunje 10 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.3.11...0.3.12
Published by Lancetnik 10 months ago
NATS concurent subscriber:
By default, NATS subscriber consumes messages with a block per subject. So, you can't process multiple messages from the same subject at the same time. But, with the broker.subscriber(..., max_workers=...)
option, you can! It creates an async tasks pool to consume multiple messages from the same subject and allows you to process them concurrently!
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream()
@broker.subscriber("test-subject", max_workers=10)
async def handler(...):
"""Can process up to 10 messages concurrently."""
Full Changelog: https://github.com/airtai/faststream/compare/0.3.10...0.3.11
Published by davorrunje 10 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.3.9...0.3.10
Published by davorrunje 10 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.3.8...0.3.9
Published by Lancetnik 10 months ago
faststream.redis.fastapi.RedisRouter
stream and list subscriptionTestNatsClient
with batch=True
Full Changelog: https://github.com/airtai/faststream/compare/0.3.7...0.3.8
Published by davorrunje 10 months ago
Support regular FastStream Context with FastAPI plugin
from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter, Logger
router = RedisRouter()
@router.subscriber("test")
async def handler(msg, logger: Logger):
logger.info(msg)
app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
Full Changelog: https://github.com/airtai/faststream/compare/0.3.6...0.3.7
Published by Lancetnik 10 months ago
packaging
dependencyFull Changelog: https://github.com/airtai/faststream/compare/0.3.5...0.3.6
Published by davorrunje 10 months ago
A large update by @Lancetnik in https://github.com/airtai/faststream/pull/1048
Provides with the ability to setup graceful_timeout
to wait for consumed messages processed correctly before apllication shutdown - Broker(graceful_timeout=30.0)
(waits up to 30 seconds)
context.get_local("message"
from FastAPI pluginpublish_batch
noticeNackMessage
exceptionFull Changelog: https://github.com/airtai/faststream/compare/0.3.4...0.3.5
Published by davorrunje 10 months ago
Full Changelog: https://github.com/airtai/faststream/compare/0.3.3...0.3.4
Published by davorrunje 10 months ago
Features:
Chores:
Full Changelog: https://github.com/airtai/faststream/compare/0.3.2...0.3.3