EventSubscription / PublicationSubscription

Module: eventspype.sub.subscription


EventSubscription

class EventSubscription:
    def __init__(
        self,
        publisher_class: type[EventPublisher] | type[MultiPublisher],
        event_tag: EventTag | list[EventTag],
        callback: EventSubscriptionCallback,
        callback_with_subscriber: bool = True,
        callback_with_event_info: bool = True,
    ) -> None

Declarative subscription descriptor. Used as a class attribute on MultiSubscriber subclasses to define the wiring between a subscriber method and a publisher's event channel.

Parameters

Parameter Type Description
publisher_class type[EventPublisher] \| type[MultiPublisher] The publisher class this subscription targets
event_tag EventTag \| list[EventTag] One or more event tags to subscribe to
callback EventSubscriptionCallback The handler callable
callback_with_subscriber bool If True (default), the callback is looked up on the subscriber instance by name (getattr(subscriber, callback.__name__)) or bound via partial.
callback_with_event_info bool If True (default), callback receives (event, event_tag, caller). If False, only (event,).

Callback types

# With subscriber and event info (default)
callback = lambda self, event, tag, caller: self.handle(event)

# With subscriber, without event info
EventSubscription(..., callback_with_event_info=False,
                  callback=lambda self, event: self.handle(event))

# Without subscriber (callback_with_subscriber=False)
EventSubscription(..., callback_with_subscriber=False,
                  callback=lambda event, tag, caller: handle(event))

Properties

Property Type Description
publisher_class type The declared publisher class
event_tag EventTag \| list[EventTag] The declared event tag(s)
callback Callable The handler callable
callback_with_subscriber bool Whether callback is bound to the subscriber
event_tag_str str String representation of the tag(s) for display

Methods

subscribe

def subscribe(
    self,
    publisher: EventPublisher | MultiPublisher,
    subscriber: Any,
) -> list[FunctionalEventSubscriber]

Create and register FunctionalEventSubscriber instances on the publisher. Returns the list of created subscriber objects (one per tag).


unsubscribe

def unsubscribe(
    self,
    publisher: EventPublisher | MultiPublisher,
    subscriber: FunctionalEventSubscriber,
) -> None

Remove a subscriber from the publisher.

Example

import logging
from eventspype import MultiSubscriber, EventSubscription, MultiPublisher, EventPublication
from dataclasses import dataclass

@dataclass
class PaymentEvent:
    payment_id: int
    amount: float

class PaymentService(MultiPublisher):
    PAYMENT_RECEIVED = EventPublication("payment_received", PaymentEvent)

class PaymentHandler(MultiSubscriber):
    # Single tag
    on_payment = EventSubscription(
        publisher_class=PaymentService,
        event_tag="payment_received",
        callback=lambda self, event, tag, caller: self.handle(event),
    )

    def logger(self) -> logging.Logger:
        return logging.getLogger(__name__)

    def handle(self, event: PaymentEvent) -> None:
        print(f"Payment {event.payment_id}: ${event.amount}")

PublicationSubscription

class PublicationSubscription(EventSubscription):
    def __init__(
        self,
        publisher_class: type[MultiPublisher],
        event_publication: EventPublication,
        callback: Callable[..., Any],
        callback_with_subscriber: bool = True,
        callback_with_event_info: bool = True,
    ) -> None

A typed variant of EventSubscription that wires directly to a specific EventPublication instance rather than performing a tag lookup. Use this when you have a direct reference to the publication object.

Additional parameters

Parameter Type Description
publisher_class type[MultiPublisher] Must be a MultiPublisher subclass
event_publication EventPublication The exact publication to subscribe to (bypasses tag lookup)

Example

from eventspype import PublicationSubscription, MultiSubscriber
import logging

class PaymentHandler(MultiSubscriber):
    on_payment = PublicationSubscription(
        publisher_class=PaymentService,
        event_publication=PaymentService.PAYMENT_RECEIVED,
        callback=lambda self, event, tag, caller: self.handle(event),
    )

    def logger(self) -> logging.Logger:
        return logging.getLogger(__name__)

    def handle(self, event: PaymentEvent) -> None:
        print(f"Payment: {event}")