🎏 Python Stream Processing (Faust like!) backed by pydantic.
MIT License
Bot releases are visible (Hide)
First release of flowtastic
!
FlowTastic.subscriber
decorator. Using this decorator the message received will be automatically deserialized to a Python object or a pydantic.BaseModel
.JSONMessage
to serialize/deserialize JSON
messagesPublish
as return type of a subscriber function.import asyncio
from typing import Any
from flowtastic import FlowTastic, JSONMessage, Publish
from pydantic import BaseModel, validator
app = FlowTastic(name="flowtastic", broker="localhost:9092")
class Order(BaseModel):
id: str
cost: float
@validator("cost")
def validate_cost(cls, v):
if v < 0:
raise ValueError("cost must be positive")
return v
@app.subscriber(topic="orders")
async def heavy_processing_operation(
order: Order = JSONMessage(),
) -> Publish(to_topics=["processed_orders"]):
await asyncio.sleep(5)
print("process_orders:", order)
return order
@app.subscriber(topic="orders")
async def very_fast_operation(order: Order = JSONMessage()) -> None:
print("process_order_but_other_way", order)
@app.subscriber(topic="orders")
async def no_base_model_operation(order: dict[str, Any] = JSONMessage()) -> None:
print("order", order)
@app.subscriber(topic="processed_orders")
async def print_order(order: Order = JSONMessage()) -> None:
print("processed_order", order)
app.run()