Skip to content

Examples

Minimal example

import asyncio
from dataclasses import dataclass, field

from di import Container, bind_by_type
from di.dependent import Dependent

from diator.container.di import DIContainer
from diator.events import Event, EventEmitter, EventMap
from diator.mediator import Mediator
from diator.requests import Request, RequestHandler, RequestMap


@dataclass(frozen=True, kw_only=True)
class JoinMeetingCommand(Request):
    meeting_id: int
    user_id: int
    is_late: bool = field(default=False)


class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meeting_api) -> None:
        self._meeting_api = meeting_api
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        self._meeting_api.join(request.meeting_id, request.user_id)
        if request.is_late:
            self._meeting_api.warn(request.user_id)


def setup_di() -> DIContainer:
    external_container = Container()

    external_container.bind(
        bind_by_type(
            Dependent(JoinMeetingCommandHandler, scope="request"),
            JoinMeetingCommandHandler,
        )
    )

    container = DIContainer()
    container.attach_external_container(external_container)

    return container


async def main() -> None:
    container = setup_di()

    request_map = RequestMap()
    request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)

    event_emitter = EventEmitter(event_map=EventMap(), container=container, message_broker=None)

    mediator = Mediator(
        request_map=request_map,
        event_emitter=event_emitter,
        container=container,
    )

    await mediator.send(JoinMeetingCommand(user_id=1, meeting_id=1, is_late=True))


if __name__ == "__main__":
    asyncio.run(main())

Rodi as di framework

import asyncio
import logging
from dataclasses import dataclass

from redis import asyncio as redis
from rodi import Container

from diator.container.rodi import RodiContainer
from diator.events import (
    DomainEvent,
    EventEmitter,
    EventHandler,
    EventMap,
    NotificationEvent,
)
from diator.mediator import Mediator
from diator.message_brokers.redis import RedisMessageBroker
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware
from diator.requests import Request, RequestHandler, RequestMap


@dataclass(frozen=True, kw_only=True)
class JoinMeetingRoomCommand(Request):
    user_id: int


@dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):
    user_id: int


@dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):
    user_id: int


class JoinMeetingRoomCommandHandler(RequestHandler[JoinMeetingRoomCommand, None]):
    def __init__(self) -> None:
        self._events = []

    @property
    def events(self) -> list:
        return self._events

    async def handle(self, request: JoinMeetingRoomCommand) -> None:
        self._events.append(UserJoinedDomainEvent(user_id=request.user_id))
        self._events.append(UserJoinedNotificationEvent(user_id=123))


class UserJoinedEventHandler(EventHandler[UserJoinedDomainEvent]):
    async def handle(self, event: UserJoinedDomainEvent) -> None:
        print("READY", event)


def configure_di() -> RodiContainer:
    container = Container()

    container.register(UserJoinedEventHandler)
    container.register(JoinMeetingRoomCommandHandler)

    rodi_container = RodiContainer()
    rodi_container.attach_external_container(container)

    return rodi_container


class FirstMiddleware:
    async def __call__(self, request: Request, handle):
        print("Before 1 handling...")
        response = await handle(request)
        print("After 1 handling...")
        return response


class SecondMiddleware:
    async def __call__(self, request: Request, handle):
        print("Before 2  handling...")
        response = await handle(request)
        print("After 2 handling...")
        return response


async def main() -> None:
    logging.basicConfig(level=logging.DEBUG)

    middleware_chain = MiddlewareChain()
    middleware_chain.add(LoggingMiddleware())
    middleware_chain.add(FirstMiddleware())
    middleware_chain.add(SecondMiddleware())
    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingRoomCommand, JoinMeetingRoomCommandHandler)
    container = configure_di()

    redis_client = redis.Redis.from_url("redis://localhost:6379/0")

    event_emitter = EventEmitter(
        message_broker=RedisMessageBroker(redis_client),
        event_map=event_map,
        container=container,
    )

    mediator = Mediator(
        request_map=request_map,
        event_emitter=event_emitter,
        container=container,
        middleware_chain=middleware_chain,
    )

    await mediator.send(JoinMeetingRoomCommand(user_id=1))


if __name__ == "__main__":
    asyncio.run(main())

DI-lib as di framework

import asyncio
from dataclasses import dataclass

from di import Container, bind_by_type
from di.dependent import Dependent
from redis import asyncio as redis

from diator.container.di import DIContainer
from diator.events import (
    DomainEvent,
    EventEmitter,
    EventHandler,
    EventMap,
    NotificationEvent,
)
from diator.mediator import Mediator
from diator.message_brokers.redis import RedisMessageBroker
from diator.middlewares import MiddlewareChain
from diator.requests import Request, RequestHandler, RequestMap


@dataclass(frozen=True, kw_only=True)
class JoinMeetingRoomCommand(Request):
    user_id: int


@dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):
    user_id: int


@dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):
    user_id: int


class JoinMeetingRoomCommandHandler(RequestHandler[JoinMeetingRoomCommand, None]):
    def __init__(self) -> None:
        self._events = []

    @property
    def events(self) -> list:
        return self._events

    async def handle(self, request: JoinMeetingRoomCommand) -> None:
        self._events.append(UserJoinedDomainEvent(user_id=request.user_id))
        self._events.append(UserJoinedNotificationEvent(user_id=123))


class UserJoinedEventHandler(EventHandler[UserJoinedDomainEvent]):
    async def handle(self, event: UserJoinedDomainEvent) -> None:
        print("READY", event)


class FirstMiddleware:
    async def __call__(self, request: Request, handle):
        print("Before 1 handling...")
        response = await handle(request)
        print("After 1 handling...")
        return response


class SecondMiddleware:
    async def __call__(self, request: Request, handle):
        print("Before 2  handling...")
        response = await handle(request)
        print("After 2 handling...")
        return response


def configure_di() -> DIContainer:
    container = Container()

    container.bind(bind_by_type(Dependent(UserJoinedEventHandler, scope="request"), UserJoinedEventHandler))
    container.bind(
        bind_by_type(
            Dependent(JoinMeetingRoomCommandHandler, scope="request"),
            JoinMeetingRoomCommandHandler,
        )
    )

    di_container = DIContainer()
    di_container.attach_external_container(container)

    return di_container


async def main() -> None:
    middleware_chain = MiddlewareChain()
    middleware_chain.add(FirstMiddleware())
    middleware_chain.add(SecondMiddleware())
    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingRoomCommand, JoinMeetingRoomCommandHandler)
    container = configure_di()

    redis_client: redis.Redis = redis.Redis.from_url("redis://localhost:6379/0")

    event_emitter = EventEmitter(
        message_broker=RedisMessageBroker(redis_client),
        event_map=event_map,
        container=container,
    )

    mediator = Mediator(
        request_map=request_map,
        event_emitter=event_emitter,
        container=container,
        middleware_chain=middleware_chain,
    )

    await mediator.send(JoinMeetingRoomCommand(user_id=1))


if __name__ == "__main__":
    asyncio.run(main())

Azure Service Bus

import asyncio
import os
from dataclasses import dataclass
from datetime import timedelta

import rodi
from azure.servicebus.aio import ServiceBusClient

from diator.container.rodi import RodiContainer
from diator.events import EventEmitter, EventMap, NotificationEvent
from diator.mediator import Mediator
from diator.message_brokers.azure import AzureMessageBroker
from diator.requests import Request, RequestHandler, RequestMap


@dataclass(frozen=True, kw_only=True)
class CleanUnactiveUsersCommand(Request):
    eta: timedelta


@dataclass(frozen=True, kw_only=True)
class UnactiveUsersCleaned(NotificationEvent):
    ids: list


class CleanUnactiveUsersCommandHandler(RequestHandler[CleanUnactiveUsersCommand, None]):
    def __init__(self) -> None:
        self._events = []

    @property
    def events(self) -> list:
        return self._events

    async def handle(self, request: CleanUnactiveUsersCommand) -> None:
        self._events.append(UnactiveUsersCleaned(ids=[1, 2, 3, 4, 5]))


def configure_di() -> RodiContainer:
    external_container = rodi.Container()
    external_container.register(CleanUnactiveUsersCommandHandler)

    container = RodiContainer()
    container.attach_external_container(external_container)
    return container


async def main() -> None:
    service_bus_connection_string = os.getenv("CONNECTION_STRING")
    topic_name = os.getenv("TOPIC_NAME")

    container = configure_di()
    request_map = RequestMap()
    request_map.bind(CleanUnactiveUsersCommand, CleanUnactiveUsersCommandHandler)

    azure_service_bus_client = ServiceBusClient.from_connection_string(service_bus_connection_string)
    message_broker = AzureMessageBroker(azure_service_bus_client, topic_name, timeout=15)
    event_emitter = EventEmitter(message_broker=message_broker, event_map=EventMap(), container=container)

    mediator = Mediator(event_emitter=event_emitter, request_map=request_map, container=container)

    await mediator.send(CleanUnactiveUsersCommand(eta=timedelta(days=1)))


if __name__ == "__main__":
    asyncio.run(main())