MultiPublisher¶
Module: eventspype.pub.multipublisher
MultiPublisher¶
class MultiPublisher:
def __init__(self, broker: MessageBroker | None = None) -> None
A publisher that manages multiple EventPublication channels. Define publications as class-level EventPublication attributes; MultiPublisher creates one EventPublisher per publication on demand.
Parameters¶
| Parameter | Type | Description |
|---|---|---|
broker |
MessageBroker \| None |
Optional broker applied to all internal publishers |
Properties¶
| Property | Type | Description |
|---|---|---|
broker |
MessageBroker \| None |
Current broker. Settable; propagates to all internal publishers and migrates subscribers. |
Class methods¶
get_event_definitions¶
@classmethod
def get_event_definitions(cls) -> dict[str, EventPublication]
Return all EventPublication attributes defined in the class and its parent classes (MRO order, child class attributes take precedence).
is_publication_valid¶
@classmethod
def is_publication_valid(
cls, publication: EventPublication, raise_error: bool = True
) -> bool
Check whether a publication belongs to this class.
Raises: ValueError if invalid and raise_error=True (default).
get_event_definition_by_tag¶
@classmethod
def get_event_definition_by_tag(cls, event_tag: EventTag) -> EventPublication
Return the EventPublication whose tag matches event_tag.
Raises: ValueError if no matching publication is found.
Instance methods¶
add_subscriber¶
def add_subscriber(
self, publication: EventPublication, subscriber: EventSubscriber
) -> None
Register a subscriber for a specific publication. Creates the internal EventPublisher for that publication if it does not exist yet.
Raises: ValueError if publication is not defined on this class.
remove_subscriber¶
def remove_subscriber(
self, publication: EventPublication, subscriber: EventSubscriber
) -> None
Unregister a subscriber. Removes the internal publisher if it has no remaining subscribers.
add_subscriber_with_callback¶
def add_subscriber_with_callback(
self, publication: EventPublication, callback: Any, with_event_info: bool = True
) -> None
Wrap callback in a FunctionalEventSubscriber and register it. The publisher keeps a strong reference to the subscriber to prevent garbage collection.
Parameters:
| Parameter | Type | Description |
|---|---|---|
publication |
EventPublication |
Target publication |
callback |
Callable |
Function to call on each event |
with_event_info |
bool |
If True (default), callback receives (event, event_tag, caller). If False, callback receives only (event,). |
remove_subscriber_with_callback¶
def remove_subscriber_with_callback(
self, publication: EventPublication, callback: Any
) -> None
Remove a previously registered callback subscriber.
publish¶
def publish(
self, publication: EventPublication, event: Any, caller: Any | None = None
) -> None
Dispatch an event on a specific publication channel. Does nothing if no subscribers are registered for that publication.
Raises: ValueError if publication is not defined on this class, or if event is the wrong type.
Example¶
from dataclasses import dataclass
from eventspype import MultiPublisher, EventPublication, EventSubscriber
@dataclass
class UserCreatedEvent:
user_id: int
username: str
@dataclass
class UserDeletedEvent:
user_id: int
class UserService(MultiPublisher):
USER_CREATED = EventPublication("user_created", UserCreatedEvent)
USER_DELETED = EventPublication("user_deleted", UserDeletedEvent)
def create_user(self, user_id: int, username: str) -> None:
self.publish(self.USER_CREATED, UserCreatedEvent(user_id, username))
def delete_user(self, user_id: int) -> None:
self.publish(self.USER_DELETED, UserDeletedEvent(user_id))
service = UserService()
class AuditLog(EventSubscriber):
def call(self, event, event_tag, caller):
print(f"Audit: {type(event).__name__} — {event}")
audit = AuditLog()
service.add_subscriber(UserService.USER_CREATED, audit)
service.add_subscriber(UserService.USER_DELETED, audit)
service.create_user(1, "alice")
service.delete_user(1)