Diator - CQRS Library for Python
Diator is a Python library for implementing CQRS pattern in your Python applications. It provides a set of abstractions and utilities to help you separate your read and write concerns, allowing for better scalability, performance, and maintainability of your application.
Features
- Implements the CQRS pattern.
- Simple, yet flexible API.
- Supports multiple message brokers, such as Redis Pub/Sub and Azure Service Bus.
- Supports various di-frameworks, such as di and rodi.
- Easy to integrate with existing codebases.
Installation
Install the Diator library with pip
pip install diator
There are also several installation options:
-
To use Redis as Message Broker
pip install diator[redis]
-
Or Azure Service Bus
pip install diator[azure]
Simple Example
Minimal example of diator usage:
import asyncio
from dataclasses import dataclass, field
from di import Container, bind_by_type
from di.dependent import Dependent
from diator.events import EventMap, Event, EventEmitter
from diator.container.di import DIContainer
from diator.mediator import Mediator
from diator.requests import Request, RequestHandler, RequestMap
@dataclass(frozen=True, kw_only=True)
class JoinMeetingCommand(Request):
meeting_id: int
user_id: int
is_late: bool = field(default=False)
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meeting_api) -> None:
self._meeting_api = meeting_api
self._events: list[Event] = []
@property
def events(self) -> list[Event]:
return self._events
async def handle(self, request: JoinMeetingCommand) -> None:
self._meeting_api.join(request.meeting_id, request.user_id)
if request.is_late:
self._meeting_api.warn(request.user_id)
def setup_di() -> DIContainer:
external_container = Container()
external_container.bind(
bind_by_type(
Dependent(JoinMeetingCommandHandler, scope="request"),
JoinMeetingCommandHandler,
)
)
container = DIContainer()
container.attach_external_container(external_container)
return container
async def main() -> None:
container = setup_di()
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
event_emitter = EventEmitter(
event_map=EventMap(), container=container, message_broker=None
)
mediator = Mediator(
request_map=request_map,
event_emitter=event_emitter,
container=container,
)
await mediator.send(JoinMeetingCommand(user_id=1, meeting_id=1, is_late=True))
if __name__ == "__main__":
asyncio.run(main())
Further reading
- Udi Dahan - Clarified CQRS
- Martin Fowler - CQRS
- Marting Fowler - What do you mean by “Event-Driven”?
- Vlad Khononov - Learning Domain-Driven Design
- Vaughn Vernon - Really Simple CQRS
License
This project is licensed under the terms of the MIT license.