faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.

APACHE-2.0 License

Downloads
160.6K
Stars
1.8K
Committers
15

Bot releases are visible (Hide)

faststream - v0.4.7

Published by Lancetnik 8 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.4.6...0.4.7

faststream - v0.4.6

Published by davorrunje 8 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.4.5...0.4.6

faststream - v0.4.5

Published by Lancetnik 8 months ago

What's Changed

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.4.4...0.4.5

faststream - v0.4.4

Published by Lancetnik 8 months ago

What's Changed

Add RedisStream batch size option

@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
    assert len(msgs) <= 3

Full Changelog: https://github.com/airtai/faststream/compare/0.4.3...0.4.4

faststream - v0.4.3

Published by Lancetnik 8 months ago

What's Changed

Allow to specify Redis Stream maxlen option in publisher:

@broker.publisher(stream=StreamSub("Output", maxlen=10))
async def on_input_data():
    ....

Full Changelog: https://github.com/airtai/faststream/compare/0.4.2...0.4.3

faststream - v0.4.2

Published by davorrunje 9 months ago

What's Changed

Bug fixes

Full Changelog: https://github.com/airtai/faststream/compare/0.4.1...0.4.2

faststream - v0.4.1

Published by davorrunje 9 months ago

What's Changed

Bug fixes

Documentation

Full Changelog: https://github.com/airtai/faststream/compare/0.4.0...0.4.1

faststream - v0.4.0

Published by davorrunje 9 months ago

What's Changed

This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async wrapper around Confluent's Python Client and add full support for it in FastStream.

If you want to try it out, install it first with:

pip install "faststream[confluent]>=0.4.0"

To connect to Kafka using the FastStream KafkaBroker module, follow these steps:

  1. Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.

  2. Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic

  3. Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with @broker.subscriber(...) and @broker.publisher(...) decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

For more information, please visit the documentation at:

https://faststream.airt.ai/latest/confluent/

List of Changes

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0

faststream - v0.4.0rc0

Published by davorrunje 9 months ago

What's Changed

This is a preview version of 0.4.0 release introducing support for Confluent-based Kafka broker.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Changes

Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0rc0

faststream - v0.3.13

Published by davorrunje 10 months ago

What's Changed

New features

Bug fixes

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.3.12...0.3.13

faststream - v0.3.12

Published by davorrunje 10 months ago

What's Changed

Bug fixes

Misc

Full Changelog: https://github.com/airtai/faststream/compare/0.3.11...0.3.12

faststream - v0.3.11

Published by Lancetnik 10 months ago

What's Changed

NATS concurent subscriber:

By default, NATS subscriber consumes messages with a block per subject. So, you can't process multiple messages from the same subject at the same time. But, with the broker.subscriber(..., max_workers=...) option, you can! It creates an async tasks pool to consume multiple messages from the same subject and allows you to process them concurrently!

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream()

@broker.subscriber("test-subject", max_workers=10)
async def handler(...):
   """Can process up to 10 messages concurrently."""

Full Changelog: https://github.com/airtai/faststream/compare/0.3.10...0.3.11

faststream - v0.3.10

Published by davorrunje 10 months ago

What's Changed

New features

Bug fixes

Documentation

Other

Full Changelog: https://github.com/airtai/faststream/compare/0.3.9...0.3.10

faststream - v0.3.9

Published by davorrunje 10 months ago

What's Changed

Bug fixes:

Chore:

Full Changelog: https://github.com/airtai/faststream/compare/0.3.8...0.3.9

faststream - v0.3.8

Published by Lancetnik 10 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.3.7...0.3.8

faststream - v0.3.7

Published by davorrunje 10 months ago

What's Changed

Support regular FastStream Context with FastAPI plugin

from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter()

@router.subscriber("test")
async def handler(msg, logger: Logger):
    logger.info(msg)

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Full Changelog: https://github.com/airtai/faststream/compare/0.3.6...0.3.7

faststream - v0.3.6

Published by Lancetnik 10 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.3.5...0.3.6

faststream - v0.3.5

Published by davorrunje 10 months ago

What's Changed

A large update by @Lancetnik in https://github.com/airtai/faststream/pull/1048

Provides with the ability to setup graceful_timeout to wait for consumed messages processed correctly before apllication shutdown - Broker(graceful_timeout=30.0) (waits up to 30 seconds)

  • allows to get acces to context.get_local("message" from FastAPI plugin
  • docs: fix Avro custom serialization example
  • docs: add KafkaBroker publish_batch notice
  • docs: add RabbitMQ security page
  • fix: respect retry attempts with NackMessage exception
  • test Kafka nack and reject behavior
  • bug: fix import error with anyio version 4.x by @davorrunje in https://github.com/airtai/faststream/pull/1049

Full Changelog: https://github.com/airtai/faststream/compare/0.3.4...0.3.5

faststream - v0.3.4

Published by davorrunje 10 months ago

What's Changed

Features:

Documentation

Chore

Full Changelog: https://github.com/airtai/faststream/compare/0.3.3...0.3.4

faststream - v0.3.3

Published by davorrunje 10 months ago

What's Changed

Features:

Chores:

Full Changelog: https://github.com/airtai/faststream/compare/0.3.2...0.3.3