TrackingEventSubscriber¶
Module: eventspype.sub.tracker
TrackingEventSubscriber¶
class TrackingEventSubscriber(EventSubscriber):
def __init__(
self,
event_source: str | None = None,
max_len: int = 50,
) -> None
A subscriber that collects received events and supports async waiting for specific event types. Designed for testing, debugging, and integration scenarios.
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
event_source |
str \| None |
None |
Optional label identifying the event source |
max_len |
int |
50 |
Maximum events retained in the log; older events are dropped when the deque is full |
Properties¶
| Property | Type | Description |
|---|---|---|
event_log |
list[Any] |
All collected events as a list |
event_source |
str \| None |
The label passed at construction |
Methods¶
call¶
def call(
self,
event_object: Any,
current_event_tag: int,
current_event_caller: Any,
) -> None
Invoked by the publisher on each event. Appends the event to the log and notifies any coroutines waiting in wait_for.
clear¶
def clear(self) -> None
Empty the event log and any per-type event deques.
wait_for¶
async def wait_for(
self, event_type: type[Any], timeout_seconds: float = 180
) -> Any
Suspend the current coroutine until an event whose type matches event_type is received, then return it.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
event_type |
type |
— | The exact Python class to wait for |
timeout_seconds |
float |
180 |
How long to wait before raising TimeoutError |
Returns: The matched event object.
Raises: TimeoutError if no matching event arrives within timeout_seconds. Internal state is always cleaned up even if the timeout fires.
Example¶
import asyncio
from dataclasses import dataclass
from eventspype import EventPublisher, EventPublication, TrackingEventSubscriber
@dataclass
class AlertEvent:
level: str
message: str
pub = EventPublication("alert", AlertEvent)
publisher = EventPublisher(pub)
async def main():
tracker = TrackingEventSubscriber(event_source="alerts", max_len=100)
publisher.add_subscriber(tracker)
# Simulate an event arriving after 0.5 s
async def trigger():
await asyncio.sleep(0.5)
publisher.publish(AlertEvent("ERROR", "Disk full"))
asyncio.create_task(trigger())
event = await tracker.wait_for(AlertEvent, timeout_seconds=5)
print(f"Got alert: [{event.level}] {event.message}")
asyncio.run(main())