Subscribers¶
Subscribers receive events from publishers. EventsPype provides several subscriber types for different use cases.
EventSubscriber¶
EventSubscriber is the abstract base class. Subclass it and implement call:
from eventspype import EventSubscriber
class PrintSubscriber(EventSubscriber):
def call(self, event, event_tag, caller):
print(f"Event received: {event}")
The call method signature:
| Parameter | Type | Description |
|---|---|---|
event |
Any |
The event object dispatched by the publisher |
event_tag |
int |
The normalized integer event tag |
caller |
Any |
The publisher that dispatched the event |
EventSubscriber is also callable — invoking an instance directly calls call:
subscriber = PrintSubscriber()
subscriber(event_object, event_tag, caller) # same as subscriber.call(...)
OwnedEventSubscriber¶
OwnedEventSubscriber is a subscriber that holds a reference to an owner object. This is useful when a subscriber is created inline but needs to access an enclosing object:
from eventspype import OwnedEventSubscriber
class MyService:
def __init__(self):
self._handler = OwnedEventSubscriber(owner=self)
# Extend by subclassing OwnedEventSubscriber
FunctionalEventSubscriber¶
FunctionalEventSubscriber wraps a plain callable as a subscriber. It supports two modes:
With event info (with_event_info=True, default): the callback receives (event, event_tag, caller):
from eventspype import FunctionalEventSubscriber
def my_handler(event, event_tag, caller):
print(f"Tag {event_tag}: {event}")
subscriber = FunctionalEventSubscriber(my_handler, with_event_info=True)
publisher.add_subscriber(subscriber)
Without event info (with_event_info=False): the callback receives only (event,):
def simple_handler(event):
print(event)
subscriber = FunctionalEventSubscriber(simple_handler, with_event_info=False)
publisher.add_subscriber(subscriber)
The preferred way to use functional subscribers is through MultiPublisher.add_subscriber_with_callback(), which manages the subscriber lifecycle automatically.
MultiSubscriber¶
MultiSubscriber allows declarative subscription wiring. Define EventSubscription class attributes, then call add_subscription with a publisher instance at runtime.
import logging
from eventspype import MultiSubscriber, EventSubscription
class OrderHandler(MultiSubscriber):
on_order_placed = EventSubscription(
publisher_class=OrderService,
event_tag="order_placed",
callback=lambda self, event, tag, caller: self.handle_placed(event),
)
def logger(self) -> logging.Logger:
return logging.getLogger(__name__)
def handle_placed(self, event) -> None:
print(f"Handling order: {event.order_id}")
service = OrderService()
handler = OrderHandler()
handler.add_subscription(handler.on_order_placed, service)
Note
MultiSubscriber requires you to implement the abstract logger() method.
Managing subscriptions¶
# Add a subscription
handler.add_subscription(handler.on_order_placed, service)
# Remove a subscription
handler.remove_subscription(handler.on_order_placed, service)
Introspecting subscriptions¶
# Get all defined subscriptions in the class hierarchy
definitions = OrderHandler.get_event_definitions()
# {'on_order_placed': <EventSubscription>}
# Access active subscriber objects
handler.subscribers # dict[EventPublisher, dict[EventSubscription, list]]
Subscribing to multiple tags¶
EventSubscription supports a list of event tags:
on_order_events = EventSubscription(
publisher_class=OrderService,
event_tag=["order_placed", "order_cancelled"],
callback=lambda self, event, tag, caller: self.handle_order(event, tag),
)
Log decorator¶
MultiSubscriber provides a log_event decorator for adding automatic logging to handler methods:
class OrderHandler(MultiSubscriber):
@MultiSubscriber.log_event(log_level=logging.INFO, log_prefix="Order")
def handle_placed(self, event) -> None:
# Logged automatically before this method runs
process(event)
EventSubscription¶
EventSubscription connects a subscriber method to a specific publisher class and event tag:
from eventspype import EventSubscription
subscription = EventSubscription(
publisher_class=OrderService, # must be EventPublisher or MultiPublisher subclass
event_tag="order_placed", # EventTag or list[EventTag]
callback=my_callback, # callable
callback_with_subscriber=True, # if True, callback receives self as first arg
callback_with_event_info=True, # if True, callback receives (event, tag, caller)
)
PublicationSubscription¶
PublicationSubscription is a typed variant that wires directly to a specific EventPublication instance rather than a tag string. This avoids the tag lookup step and is useful when you have direct access to the publication object:
from eventspype import PublicationSubscription
subscription = PublicationSubscription(
publisher_class=OrderService,
event_publication=OrderService.ORDER_PLACED,
callback=my_callback,
)
TrackingEventSubscriber¶
TrackingEventSubscriber collects received events for inspection or testing, and supports async waiting for specific event types.
from eventspype import TrackingEventSubscriber
tracker = TrackingEventSubscriber(event_source="test", max_len=100)
publisher.add_subscriber(tracker)
publisher.publish(OrderPlacedEvent(1, 49.99))
# Inspect collected events
print(tracker.event_log) # [OrderPlacedEvent(order_id=1, amount=49.99)]
tracker.clear() # clear the log
Async waiting¶
import asyncio
from eventspype import TrackingEventSubscriber
async def main():
tracker = TrackingEventSubscriber()
publisher.add_subscriber(tracker)
# Waits up to 10 seconds for an OrderPlacedEvent
event = await tracker.wait_for(OrderPlacedEvent, timeout_seconds=10)
print(f"Got: {event}")
asyncio.run(main())
wait_for raises TimeoutError if the event does not arrive within the timeout.
ReportingEventSubscriber¶
ReportingEventSubscriber logs events using Python's logging module. It supports dataclasses, NamedTuples, and any object with _asdict():
from eventspype import ReportingEventSubscriber
reporter = ReportingEventSubscriber(event_source="order-service")
publisher.add_subscriber(reporter)
# Each event is logged at INFO level with metadata:
# Event received: {'order_id': 1, 'amount': 49.99,
# 'event_name': 'OrderPlacedEvent',
# 'event_source': 'order-service', 'event_tag': 12345}