FastStream example¶
This example shows how to use Dependency Injector with FastStream.
The source code is available on the Github.
Despite FastStream uses FastDepends library for dependency injection, the integration between
Dependency injector and FastStream has a small difference from already existing FastDepends example.
Since FastStream also leverages function signatures to determine input data types you have to use Depends() function
with cast=False argument to make FastStream ignore your injected dependency argument in the function signature.
Example below shows how to inject Counter class into FastStream redis handler so that it will distinguish between
message schema (User) and injected dependency (Counter) and use them both correctly.
Listing of consumer.py:
import asyncio
from typing import Annotated
from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
from faststream import Depends, FastStream
from faststream.redis import RedisBroker, RedisRouter
from pydantic import BaseModel
class Counter:
def __init__(self):
self.count = 0
def next(self) -> int:
self.count += 1
return self.count
class Container(containers.DeclarativeContainer):
counter = providers.Singleton(Counter)
config = providers.Configuration()
broker = providers.Singleton(RedisBroker, config.redis_url, logger=None)
app = providers.Factory(FastStream, broker, logger=None)
class Message(BaseModel):
user: str
text: str
router = RedisRouter()
@router.subscriber("messages")
@inject
async def handle_user_message(
message: Message,
counter: Annotated[
Counter,
Depends(
Provide[Container.counter],
cast=False, # <-- this is the key part
),
],
) -> None:
count = counter.next()
print(f"Message #{count} from {message.user}: '{message.text}'")
async def main() -> None:
container = Container()
container.wire(modules=[__name__])
container.config.redis_url.from_env("REDIS_URL")
broker = container.broker()
broker.include_router(router)
app = container.app()
await app.run()
if __name__ == "__main__":
asyncio.run(main())
Listing of producer.py:
import json
import time
from dependency_injector import containers, providers
from redis import Redis
class Container(containers.DeclarativeContainer):
config = providers.Configuration()
redis = providers.Singleton(Redis, config.redis_host, config.redis_port.as_int())
def main():
container = Container()
container.wire(modules=[__name__])
container.config.redis_host.from_env("REDIS_HOST")
container.config.redis_port.from_env("REDIS_PORT")
redis = container.redis()
for text in (
"As you can see",
"messages are counted correctly",
"by the counter that is injected",
"into faststream handler",
"via awesome dependency_injector library.",
):
time.sleep(2)
message = {"user": "John", "text": text}
redis.publish("messages", json.dumps(message))
if __name__ == "__main__":
main()
Sources¶
Explore the sources on the Github.
Sponsor the project on GitHub: |