faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.

APACHE-2.0 License

Downloads
160.6K
Stars
1.8K
Committers
15

Bot releases are visible (Hide)

faststream - v0.5.17 Latest Release

Published by Lancetnik 2 months ago

What's Changed

Just a hotfix for the following case:

@broker.subscriber(...)
async def handler():
    return NatsResponse(...)
    
await broker.publish(..., rpc=True)

Full Changelog: https://github.com/airtai/faststream/compare/0.5.16...0.5.17

faststream - v0.5.16

Published by Lancetnik 2 months ago

What's Changed

Well, seems like it is the biggest patch release ever 😃

Detail Responses

First of all, thanks to all new contributors, who helps us to improve the project! They made a huge impact to this release by adding new Kafka security mechanisms and extend Response API - now you can use broker.Response to publish detail information from handler

@broker.subscriber("in")
@broker.publisher("out")
async def handler(msg):
    return Response(msg, headers={"response_header": "Hi!"})   # or KafkaResponse, etc

ASGI

Also, we added a new huge feature - ASGI support!

Nope, we are not HTTP-framework now, but it is a little ASGI implementation to provide you with an ability to host documentation, use k8s http-probes and serve metrics in the same with you broker runtime without any dependencies.

You just need to use AsgiFastStream class

from faststream.nats import NatsBroker
from faststream.asgi import AsgiFastStream, make_ping_asgi

from prometheus_client import make_asgi_app
from prometheus_client.registry import CollectorRegistry

broker = NatsBroker()

prometheus_registry = CollectorRegistry()

app = AsgiFastStream(
    broker,
    asyncapi_path="/docs",
    asgi_routes=[
        ("/health", make_ping_asgi(broker, timeout=5.0)),
        ("/metrics", make_asgi_app(registry=prometheus_registry))
    ]
)

And then you can run it like a regular ASGI app

uvicorn main:app

Confluent partitions

One more thing - manual topic partition assignment for Confluent. We have it already for aiokafka, but missed it here... Now it was fixed!

from faststream.confluent import TopicPartition

@broker.subscriber(partitions=[
    TopicPartition("test-topic", partition=0),
])
async def handler():
    ...

Detail changes

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.15...0.5.16

faststream - v0.5.15

Published by Lancetnik 3 months ago

What's Changed

Finally, FastStream has a Kafka pattern subscription! This is another step forward in our Roadmap moving us to 0.6.0 and futher!

from faststream import Path
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

@broker.subscriber(pattern="logs.{level}")
async def base_handler(
    body: str,
    level: str = Path(),
):
    ...

Also, all brokers now supports a new ping method to check real broker connection

is_connected: bool = await broker.ping()

This is a little, but important change for K8S probes support

More other there are a lot of bugfixes and improvements from our contributors! Thanks to all of these amazing people!

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.14...0.5.15

faststream - v0.5.14

Published by kumaranvpl 4 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.5.13...0.5.14

faststream - v0.5.13

Published by Lancetnik 4 months ago

What's Changed

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.12...0.5.13

faststream - v0.5.12

Published by kumaranvpl 4 months ago

What's Changed

Now, FastStream provides users with the ability to pass the config dictionary to confluent-kafka-python for greater customizability. The following example sets the parameter topic.metadata.refresh.fast.interval.ms's value to 300 instead of the default value 100 via the config parameter.

from faststream import FastStream
from faststream.confluent import KafkaBroker

config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)

Full Changelog: https://github.com/airtai/faststream/compare/0.5.11...0.5.12

faststream - v0.5.11

Published by Lancetnik 5 months ago

What's Changed

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.10...0.5.11

faststream - v0.5.10

Published by Lancetnik 5 months ago

What's Changed

Now you can return Response class to set more specific outgoing message parameters:

from faststream import Response

@broker.subscriber("in")
@broker.subscriber("out")
async def handler():
    return Response(body=b"", headers={})

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.9...0.5.10

faststream - v0.5.9

Published by kumaranvpl 5 months ago

What's Changed

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.8...0.5.9

faststream - v0.5.8

Published by Lancetnik 5 months ago

What's Changed

This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!

  1. KeyValue creation and watching API added (you can read updated documentation section for changes):

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("some-key", kv_watch="bucket")
     async def handler(msg: int, logger: Logger):
         logger.info(msg)
     
     @app.after_startup
     async def test():
         kv = await broker.key_value("bucket")
         await kv.put("some-key", b"1")
    
  2. ObjectStore API added as well (you can read updated documentation section for changes):

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")
    
  3. Also now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

     from faststream import FastStream, Logger
     from faststream.nats import NatsBroker
     
     broker = NatsBroker()
     app = FastStream(broker)
     
     @broker.subscriber("test", stream="stream", pull_sub=True)
     async def handler(msg, logger: Logger):
         logger.info(msg)
    

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

@broker.subscriber("tests", no_reply=True)
async def handler():
    ....
 
# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.7...0.5.8

faststream - v0.5.7

Published by Lancetnik 5 months ago

What's Changed

Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!

First of all you need to install required dependencies to support OpenTelemetry:

pip install faststream[otel]

Then you can just add a middleware for your broker and that's it!

from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware

broker = NatsBroker(
    middlewares=(
        NatsTelemetryMiddleware(),
    )
)
app = FastStream(broker)

To find detailt information just visit our documentation aboout telemetry

P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.6...0.5.7

faststream - v0.5.6

Published by Lancetnik 5 months ago

What's Changed

  • feature: add --factory param by @Sehat1137 in https://github.com/airtai/faststream/pull/1440
  • feat: add RMQ channels options, support for prefix for routing_key, a… by @Lancetnik in https://github.com/airtai/faststream/pull/1448
  • feature: Add from faststream.rabbit.annotations import Connection, Channel shortcuts
  • Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
  • Feature (close #1402): add broker.add_middleware public API to append a middleware to already created broker
  • Feature: add RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool) options to setup channel settings
  • Feature (close #1447): add StreamMessage.batch_headers attribute to provide with access to whole batch messages headers

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.5.5...0.5.6

faststream - v0.5.5

Published by Lancetnik 5 months ago

What's Changed

Add support for explicit partition assignment in aiokafka KafkaBroker (special thanks to @spataphore1337):

from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition

broker = KafkaBroker()

topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)

@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
   ...

Full Changelog: https://github.com/airtai/faststream/compare/0.5.4...0.5.5

faststream - v0.5.4

Published by Lancetnik 6 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.5.3...0.5.4

faststream - 0.5.3

Published by davorrunje 6 months ago

What's Changed

Full Changelog: https://github.com/airtai/faststream/compare/0.5.2...0.5.3

faststream - v0.5.2

Published by Lancetnik 6 months ago

What's Changed

Just a little bugfix patch. Fixes #1379 and #1376.

Full Changelog: https://github.com/airtai/faststream/compare/0.5.1...0.5.2

faststream - v0.5.1

Published by Lancetnik 6 months ago

What's Changed

We already have some fixes related to RedisBroker (#1375, #1376) and some new features for you:

  1. Now broke.include_router(...) allows to pass some arguments to setup router at including moment instead of creation
broker.include_router(
   router,
   prefix="test_",
   dependencies=[Depends(...)],
   middlewares=[BrokerMiddleware],
   include_in_schema=False,
)
  1. KafkaBroker().subscriber(...) now consumes aiokafka.ConsumerRebalanceListener object.
    You can find more information about it in the official aiokafka doc

(close #1319)

broker = KafkaBroker()

broker.subscriber(..., listener=MyRebalancer())

pattern option was added too, but it is still experimental and does not support Path

  1. Path feature perfomance was increased. Also, Path is suitable for NATS PullSub batch subscribtion as well now.
from faststream import NatsBroker, PullSub

broker = NastBroker()

@broker.subscriber(
    "logs.{level}",
    steam="test-stream",
    pull_sub=PullSub(batch=True),
)
async def base_handler(
    ...,
    level: str = Path(),
):
  ...

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0...0.5.1

faststream - 0.5.0

Published by davorrunje 6 months ago

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features:

  • suppress any exceptions and pass fall-back message body to publishers, and
  • patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done.
7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0

faststream - v0.5.0rc2

Published by Lancetnik 6 months ago

What's Changed

This is the final API change before stable 0.5.0 release

⚠️ HAS BREAKING CHANGE

In it, we stabilize the behavior of publishers & subscribers middlewares

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allows you two features previously unavailable

  • suppress any exceptions and pas fall-back message body to publishers
  • patch any outgoing message headers and other parameters

Without these features we just can't impelement Observability Middleware or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0rc0...0.5.0rc2

faststream - v0.5.0rc0

Published by Lancetnik 7 months ago

What's Changed

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:

pip install faststream==0.5.0rc0

We look forward to your feedback!

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...
 
@subscriber()
async def handler(msg: dict[str, Any]):
    ...

This is the preferred syntax for filtering now (the old one will be removed in 0.6.0)

  1. The router.publisher() function now returns the correct Publisher object you can use later (after broker startup).
publisher = router.publisher("test")

@router.subscriber("in")
async def handler():
    await publisher.publish("msg")

(Until 0.5.0 you could use it in this way with broker.publisher only)

  1. A list of middlewares can be passed to a broker.publisher as well:
broker = Broker(..., middlewares=())

@broker.subscriber(..., middlewares=())
@broker.publisher(..., middlewares=())  # new feature
async def handler():
    ...
  1. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

  2. ⚠️ BREAKING CHANGE ⚠️ : both subscriber and publisher middlewares should be async context manager type

from contextlib import asynccontextmanager

@asynccontextmanager
async def subscriber_middleware(msg_body):
    yield msg_body

@asynccontextmanager
async def publisher_middleware(
    msg_to_publish,
    **publish_arguments,
):
    yield msg_to_publish

@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
    ...
  1. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  2. All .pyi files are removed, and explicit docstrings and methods options are added.

  3. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

New Contributors

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0rc0