Skip to content

Events

Event represents a fact that has occurred in the application. It typically represents a significant change in the application's state that is of interest to other parts of the application or external systems.

There are several types of events:

  • Domain event
  • Notification event
  • Event-carried state transfer (ECST)

Domain events are handled by specialized handlers, whereas Notification and ECST events are sent to message brokers.

Publishing Event

Events are published in the CommandHandler side like below:

from diator.requests import RequestHandler
from diator.events import EventHandler


class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meeting_api: MeetingAPI) -> None:
        self._meeting_api = meeting_api
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        await self._meeting_api.join(request.meeting_id, request.user_id)
        self._events.append(
            UserJoinedDomainEvent(user_id=request.user_id, timestamp=datetime.utcnow(), meeting_id=request.meeting_id)
        )
        self._events.append(
            UserJoinedNotificationEvent(user_id=request.user_id)
        )

Domain Event

Domain event is a message describing a significant event that has occurred in the business domain.

Example:

from diator.events import DomainEvent


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):
    user_id: int = dataclasses.field()
    meeting_id: int = dataclasses.field()
    timestamp: datetime = dataclasses.field()

This event type is handled by its event handler.

Event Handler

Event Handler is a component responsible for processing an Domain Event that has occurred in the application:

from diator.events import EventHandler


class UserJoinedDomainEventHandler(EventHandler[UserJoinedDomainEvent]):
    def __init__(self, meeting_api: MeetingAPI) -> None:
        self._meeting_api = meeting_api

    async def handle(self, event: UserJoinedDomainEvent) -> None:
        await self._meeting_api.notify(event.meeting_id, "New user joined!")

Mapping

In order to map each domain event to its handler, you can use EventMap as below:

from diator.requests import EventMap


event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
event_map.bind(UserJoinedDomainEvent, AnotherUserJoinedDomainEventHandler)

Notification Event

Notification Event is a message regarding a change in the business domain that other components will react to.

Example:

from diator.events import NotificationEvent


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):
    user_id: int = dataclasses.field()

ECST Event

Event-carried state transfer (ECST) is a message that notifies subscribers about changes in the producer’s internal state.

Example:

from diator.events import ECSTEvent


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserChangedECSTEvent(ECSTEvent):
    user_id: int = dataclasses.field()
    new_username: str = dataclasses.field()

Event Emitter

EventEmitter is a component, that is responsible for events dispatching. It decides whether to send event to message broker or dispatch it using handler.

Here is a simple EventEmitter usage:

from diator.events import EventMap, EventEmitter
from diator.mediator import Mediator


event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
event_map.bind(UserJoinedDomainEvent, AnotherUserJoinedDomainEventHandler)

event_emitter = EventEmitter(event_map=event_map, container=container)
mediator = Mediator(
    event_emitter=event_emitter,
    request_map=request_map,
    container=container
)

Message Broker

Diator supports several message brokers to publish Notification and ECST events. Supported message brokers:

Redis

To use Redis Pub/Sub as message broker, simply import it and put to EventEmitter:

from redis.asyncio import Redis
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker


redis_client = Redis()

message_broker = RedisMessageBroker(client=redis_client)

event_emitter = EventEmitter(
    event_map=event_map, 
    container=container,
    message_broker=message_broker
)

As a result, it will produce events in the channel with default prefix python_diator_channel.

Example of published event:

{
   "message_type":"notification_event",
   "message_name":"UserJoinedNotificationEvent",
   "message_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
   "payload":{
      "event_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
      "event_timestamp":"2023-03-07T09:26:02.588855",
      "user_id":123
   }
}

Channel name:

python_diator_channel:notification_event:9f62e977-73f7-462b-92cb-8ea658d3bcb5

So, you can listen to specific event types by defining pattern of channel:

PSUBSCRIBE python_diator_channel:notification_event:*

Azure Service Bus

To use Azure Service Bus as message broker, simply import it and put to EventEmitter:

from azure.servicebus.aio import ServiceBusClient
from diator.events import EventMap, EventEmitter
from diator.message_brokers.azure import AzureMessageBroker


azure_service_bus_client = ServiceBusClient.from_connection_string(
    service_bus_connection_string
)
message_broker = AzureMessageBroker(
    azure_service_bus_client,
    topic_name,
    timeout=15
)

event_emitter = EventEmitter(
    event_map=event_map, 
    container=container,
    message_broker=message_broker
)