Examples
Minimal example
import asyncio
from dataclasses import dataclass, field
from di import Container, bind_by_type
from di.dependent import Dependent
from diator.container.di import DIContainer
from diator.events import Event, EventEmitter, EventMap
from diator.mediator import Mediator
from diator.requests import Request, RequestHandler, RequestMap
@dataclass(frozen=True, kw_only=True)
class JoinMeetingCommand(Request):
meeting_id: int
user_id: int
is_late: bool = field(default=False)
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meeting_api) -> None:
self._meeting_api = meeting_api
self._events: list[Event] = []
@property
def events(self) -> list[Event]:
return self._events
async def handle(self, request: JoinMeetingCommand) -> None:
self._meeting_api.join(request.meeting_id, request.user_id)
if request.is_late:
self._meeting_api.warn(request.user_id)
def setup_di() -> DIContainer:
external_container = Container()
external_container.bind(
bind_by_type(
Dependent(JoinMeetingCommandHandler, scope="request"),
JoinMeetingCommandHandler,
)
)
container = DIContainer()
container.attach_external_container(external_container)
return container
async def main() -> None:
container = setup_di()
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
event_emitter = EventEmitter(event_map=EventMap(), container=container, message_broker=None)
mediator = Mediator(
request_map=request_map,
event_emitter=event_emitter,
container=container,
)
await mediator.send(JoinMeetingCommand(user_id=1, meeting_id=1, is_late=True))
if __name__ == "__main__":
asyncio.run(main())
Rodi as di framework
import asyncio
import logging
from dataclasses import dataclass
from redis import asyncio as redis
from rodi import Container
from diator.container.rodi import RodiContainer
from diator.events import (
DomainEvent,
EventEmitter,
EventHandler,
EventMap,
NotificationEvent,
)
from diator.mediator import Mediator
from diator.message_brokers.redis import RedisMessageBroker
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware
from diator.requests import Request, RequestHandler, RequestMap
@dataclass(frozen=True, kw_only=True)
class JoinMeetingRoomCommand(Request):
user_id: int
@dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):
user_id: int
@dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):
user_id: int
class JoinMeetingRoomCommandHandler(RequestHandler[JoinMeetingRoomCommand, None]):
def __init__(self) -> None:
self._events = []
@property
def events(self) -> list:
return self._events
async def handle(self, request: JoinMeetingRoomCommand) -> None:
self._events.append(UserJoinedDomainEvent(user_id=request.user_id))
self._events.append(UserJoinedNotificationEvent(user_id=123))
class UserJoinedEventHandler(EventHandler[UserJoinedDomainEvent]):
async def handle(self, event: UserJoinedDomainEvent) -> None:
print("READY", event)
def configure_di() -> RodiContainer:
container = Container()
container.register(UserJoinedEventHandler)
container.register(JoinMeetingRoomCommandHandler)
rodi_container = RodiContainer()
rodi_container.attach_external_container(container)
return rodi_container
class FirstMiddleware:
async def __call__(self, request: Request, handle):
print("Before 1 handling...")
response = await handle(request)
print("After 1 handling...")
return response
class SecondMiddleware:
async def __call__(self, request: Request, handle):
print("Before 2 handling...")
response = await handle(request)
print("After 2 handling...")
return response
async def main() -> None:
logging.basicConfig(level=logging.DEBUG)
middleware_chain = MiddlewareChain()
middleware_chain.add(LoggingMiddleware())
middleware_chain.add(FirstMiddleware())
middleware_chain.add(SecondMiddleware())
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingRoomCommand, JoinMeetingRoomCommandHandler)
container = configure_di()
redis_client = redis.Redis.from_url("redis://localhost:6379/0")
event_emitter = EventEmitter(
message_broker=RedisMessageBroker(redis_client),
event_map=event_map,
container=container,
)
mediator = Mediator(
request_map=request_map,
event_emitter=event_emitter,
container=container,
middleware_chain=middleware_chain,
)
await mediator.send(JoinMeetingRoomCommand(user_id=1))
if __name__ == "__main__":
asyncio.run(main())
DI-lib as di framework
import asyncio
from dataclasses import dataclass
from di import Container, bind_by_type
from di.dependent import Dependent
from redis import asyncio as redis
from diator.container.di import DIContainer
from diator.events import (
DomainEvent,
EventEmitter,
EventHandler,
EventMap,
NotificationEvent,
)
from diator.mediator import Mediator
from diator.message_brokers.redis import RedisMessageBroker
from diator.middlewares import MiddlewareChain
from diator.requests import Request, RequestHandler, RequestMap
@dataclass(frozen=True, kw_only=True)
class JoinMeetingRoomCommand(Request):
user_id: int
@dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):
user_id: int
@dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):
user_id: int
class JoinMeetingRoomCommandHandler(RequestHandler[JoinMeetingRoomCommand, None]):
def __init__(self) -> None:
self._events = []
@property
def events(self) -> list:
return self._events
async def handle(self, request: JoinMeetingRoomCommand) -> None:
self._events.append(UserJoinedDomainEvent(user_id=request.user_id))
self._events.append(UserJoinedNotificationEvent(user_id=123))
class UserJoinedEventHandler(EventHandler[UserJoinedDomainEvent]):
async def handle(self, event: UserJoinedDomainEvent) -> None:
print("READY", event)
class FirstMiddleware:
async def __call__(self, request: Request, handle):
print("Before 1 handling...")
response = await handle(request)
print("After 1 handling...")
return response
class SecondMiddleware:
async def __call__(self, request: Request, handle):
print("Before 2 handling...")
response = await handle(request)
print("After 2 handling...")
return response
def configure_di() -> DIContainer:
container = Container()
container.bind(bind_by_type(Dependent(UserJoinedEventHandler, scope="request"), UserJoinedEventHandler))
container.bind(
bind_by_type(
Dependent(JoinMeetingRoomCommandHandler, scope="request"),
JoinMeetingRoomCommandHandler,
)
)
di_container = DIContainer()
di_container.attach_external_container(container)
return di_container
async def main() -> None:
middleware_chain = MiddlewareChain()
middleware_chain.add(FirstMiddleware())
middleware_chain.add(SecondMiddleware())
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingRoomCommand, JoinMeetingRoomCommandHandler)
container = configure_di()
redis_client: redis.Redis = redis.Redis.from_url("redis://localhost:6379/0")
event_emitter = EventEmitter(
message_broker=RedisMessageBroker(redis_client),
event_map=event_map,
container=container,
)
mediator = Mediator(
request_map=request_map,
event_emitter=event_emitter,
container=container,
middleware_chain=middleware_chain,
)
await mediator.send(JoinMeetingRoomCommand(user_id=1))
if __name__ == "__main__":
asyncio.run(main())
Azure Service Bus
import asyncio
import os
from dataclasses import dataclass
from datetime import timedelta
import rodi
from azure.servicebus.aio import ServiceBusClient
from diator.container.rodi import RodiContainer
from diator.events import EventEmitter, EventMap, NotificationEvent
from diator.mediator import Mediator
from diator.message_brokers.azure import AzureMessageBroker
from diator.requests import Request, RequestHandler, RequestMap
@dataclass(frozen=True, kw_only=True)
class CleanUnactiveUsersCommand(Request):
eta: timedelta
@dataclass(frozen=True, kw_only=True)
class UnactiveUsersCleaned(NotificationEvent):
ids: list
class CleanUnactiveUsersCommandHandler(RequestHandler[CleanUnactiveUsersCommand, None]):
def __init__(self) -> None:
self._events = []
@property
def events(self) -> list:
return self._events
async def handle(self, request: CleanUnactiveUsersCommand) -> None:
self._events.append(UnactiveUsersCleaned(ids=[1, 2, 3, 4, 5]))
def configure_di() -> RodiContainer:
external_container = rodi.Container()
external_container.register(CleanUnactiveUsersCommandHandler)
container = RodiContainer()
container.attach_external_container(external_container)
return container
async def main() -> None:
service_bus_connection_string = os.getenv("CONNECTION_STRING")
topic_name = os.getenv("TOPIC_NAME")
container = configure_di()
request_map = RequestMap()
request_map.bind(CleanUnactiveUsersCommand, CleanUnactiveUsersCommandHandler)
azure_service_bus_client = ServiceBusClient.from_connection_string(service_bus_connection_string)
message_broker = AzureMessageBroker(azure_service_bus_client, topic_name, timeout=15)
event_emitter = EventEmitter(message_broker=message_broker, event_map=EventMap(), container=container)
mediator = Mediator(event_emitter=event_emitter, request_map=request_map, container=container)
await mediator.send(CleanUnactiveUsersCommand(eta=timedelta(days=1)))
if __name__ == "__main__":
asyncio.run(main())