flowtastic

🎏 Python Stream Processing (Faust like!) backed by pydantic.

MIT License

Downloads
12
Stars
1

Bot releases are hidden (Show)

flowtastic - 0.0.1a0 Latest Release

Published by gabrielmbmb over 2 years ago

First release of flowtastic!

✨ Features

  • Subscribe to topics via FlowTastic.subscriber decorator. Using this decorator the message received will be automatically deserialized to a Python object or a pydantic.BaseModel.
  • Add JSONMessage to serialize/deserialize JSON messages
  • Publish message to topic using Publish as return type of a subscriber function.
import asyncio
from typing import Any
from flowtastic import FlowTastic, JSONMessage, Publish
from pydantic import BaseModel, validator

app = FlowTastic(name="flowtastic", broker="localhost:9092")


class Order(BaseModel):
    id: str
    cost: float

    @validator("cost")
    def validate_cost(cls, v):
        if v < 0:
            raise ValueError("cost must be positive")
        return v


@app.subscriber(topic="orders")
async def heavy_processing_operation(
    order: Order = JSONMessage(),
) -> Publish(to_topics=["processed_orders"]):
    await asyncio.sleep(5)
    print("process_orders:", order)
    return order


@app.subscriber(topic="orders")
async def very_fast_operation(order: Order = JSONMessage()) -> None:
    print("process_order_but_other_way", order)


@app.subscriber(topic="orders")
async def no_base_model_operation(order: dict[str, Any] = JSONMessage()) -> None:
    print("order", order)

@app.subscriber(topic="processed_orders")
async def print_order(order: Order = JSONMessage()) -> None:
    print("processed_order", order)


app.run()