MultiSubscriber

Module: eventspype.sub.multisubscriber


MultiSubscriber

class MultiSubscriber:
    def __init__(self) -> None

Base class for declarative subscription wiring. Define EventSubscription class attributes; at runtime call add_subscription to connect them to publisher instances.

Properties

Property Type Description
subscribers dict[EventPublisher, dict[EventSubscription, Any]] Internal map of active subscriber objects keyed by publisher and subscription

Abstract methods

logger (abstract)

@abstractmethod
def logger(self) -> logging.Logger

Return the logger for this subscriber. Required by all MultiSubscriber subclasses.

Class methods

get_event_definitions

@classmethod
def get_event_definitions(cls) -> dict[str, EventSubscription]

Return all EventSubscription attributes defined in the class and its parent classes (MRO order, child class attributes take precedence).

Instance methods

add_subscription

def add_subscription(
    self, subscription: EventSubscription, publisher: EventPublisher
) -> None

Activate a subscription by connecting it to a specific publisher instance.

Raises: ValueError if subscription is not defined on this class.

Does nothing if the subscription is already active for that publisher.


remove_subscription

def remove_subscription(
    self, subscription: EventSubscription, publisher: EventPublisher
) -> None

Deactivate a subscription for a specific publisher.

Raises: ValueError if subscription is not defined on this class.

Static methods

log_event

@staticmethod
def log_event(
    log_level: int = logging.INFO, log_prefix: str = "Event"
) -> Callable

Decorator that logs the event before calling the decorated handler method. Uses self.logger().

class MyHandler(MultiSubscriber):
    @MultiSubscriber.log_event(log_level=logging.DEBUG, log_prefix="Order")
    def handle_placed(self, event) -> None:
        process(event)

Example

import logging
from eventspype import MultiSubscriber, EventSubscription, MultiPublisher, EventPublication
from dataclasses import dataclass

@dataclass
class ShipmentEvent:
    shipment_id: int

class ShipmentService(MultiPublisher):
    SHIPPED = EventPublication("shipped", ShipmentEvent)

class ShipmentHandler(MultiSubscriber):
    on_shipped = EventSubscription(
        publisher_class=ShipmentService,
        event_tag="shipped",
        callback=lambda self, event, tag, caller: self.handle(event),
    )

    def logger(self) -> logging.Logger:
        return logging.getLogger(__name__)

    def handle(self, event: ShipmentEvent) -> None:
        print(f"Shipment {event.shipment_id} sent")

service = ShipmentService()
handler = ShipmentHandler()
handler.add_subscription(handler.on_shipped, service)

service.publish(ShipmentService.SHIPPED, ShipmentEvent(shipment_id=99))
# Shipment 99 sent