Message Brokers¶
A message broker is the transport layer that delivers events from publishers to subscribers. EventsPype ships with two broker implementations and provides a base class for custom brokers.
Overview¶
By default, EventPublisher dispatches events directly in-process without any broker. Providing a broker changes how events are routed:
from eventspype import EventPublisher, EventPublication, LocalBroker
publication = EventPublication("order_placed", OrderPlacedEvent)
# No broker: in-process, direct dispatch
publisher = EventPublisher(publication)
# With LocalBroker: functionally identical, but routed through the broker
broker = LocalBroker()
publisher = EventPublisher(publication, broker=broker)
MessageBroker (abstract)¶
MessageBroker defines the interface all brokers must implement:
from eventspype import MessageBroker
class MessageBroker:
def publish(self, channel: str, event, event_tag: int, caller) -> None: ...
def subscribe(self, channel: str, subscriber) -> None: ...
def unsubscribe(self, channel: str, subscriber) -> None: ...
The channel string is derived from the publication's event tag (str(publication.event_tag)).
LocalBroker¶
LocalBroker is an in-process broker that uses weak references, matching the default behavior of EventPublisher without a broker:
from eventspype import LocalBroker
broker = LocalBroker()
publisher = EventPublisher(publication, broker=broker)
Use LocalBroker when you want to decouple the transport layer from your publisher while keeping everything in the same process.
RedisBroker¶
RedisBroker routes events through Redis Pub/Sub, enabling cross-process event delivery. It requires the redis package:
pip install redis
Basic usage¶
import redis
from eventspype.broker.redis import RedisBroker
from eventspype import EventPublisher, EventPublication
client = redis.Redis(host="localhost", port=6379)
broker = RedisBroker(client)
publication = EventPublication("order_placed", OrderPlacedEvent)
publisher = EventPublisher(publication, broker=broker)
# Publish an event — it is serialized and sent to Redis
publisher.publish(OrderPlacedEvent(order_id=1, amount=49.99))
# Clean up Redis resources when done
broker.close()
How it works¶
- On
publish, the event is serialized and sent to a Redis channel namedeventspype:<tag>. - A background listener thread receives messages from Redis and deserializes them back into event objects.
- The deserialized event is dispatched to all registered local subscribers.
Channel prefix¶
The default channel prefix is "eventspype:". Override it:
broker = RedisBroker(client, channel_prefix="myapp:")
Custom serializer¶
RedisBroker defaults to JsonEventSerializer. Provide a custom serializer by subclassing EventSerializer:
from eventspype import EventSerializer, JsonEventSerializer
class MySerializer(EventSerializer):
def serialize(self, event) -> bytes:
...
def deserialize(self, data: bytes, event_class: type):
...
broker = RedisBroker(client, serializer=MySerializer())
EventSerializer¶
EventSerializer is the abstract base for event serialization used by RedisBroker:
class EventSerializer:
def serialize(self, event) -> bytes: ...
def deserialize(self, data: bytes, event_class: type): ...
JsonEventSerializer¶
JsonEventSerializer supports dataclasses, NamedTuples, and objects with a to_dict()/from_dict() protocol:
from eventspype import JsonEventSerializer
serializer = JsonEventSerializer()
data = serializer.serialize(OrderPlacedEvent(order_id=1, amount=49.99))
event = serializer.deserialize(data, OrderPlacedEvent)
Swapping Brokers at Runtime¶
You can change a publisher's broker after construction. The publisher automatically migrates existing subscribers to the new broker:
publisher.broker = new_broker # old broker unsubscribes; new broker subscribes
publisher.broker = None # revert to direct in-process dispatch
Writing a Custom Broker¶
Subclass MessageBroker and implement the three abstract methods:
from eventspype import MessageBroker
from eventspype.sub.subscriber import EventSubscriber
class MyBroker(MessageBroker):
def publish(self, channel: str, event, event_tag: int, caller) -> None:
# Deliver the event
...
def subscribe(self, channel: str, subscriber: EventSubscriber) -> None:
# Register the subscriber for the channel
...
def unsubscribe(self, channel: str, subscriber: EventSubscriber) -> None:
# Remove the subscriber from the channel
...