Events¶
AgentsPype builds on eventspype for event pub/sub. Two component classes — AgentListening and AgentPublishing — wrap eventspype's MultiSubscriber and MultiPublisher respectively and integrate them into the agent lifecycle.
Overview¶
- Publishing — An agent declares typed event publications as class attributes. It calls
publish()to broadcast an event to all current subscribers. - Listening — An agent subscribes to events from other publishers. Subscriptions are set up in
subscribe()and torn down inunsubscribe(). - State transition events —
StateAgentPublishingis a specialised publisher that broadcasts aStateMachineEventafter every state machine transition.
Agents never hold direct references to each other. Communication goes through the eventspype broker, keeping components loosely coupled.
Event Publishing¶
AgentPublishing¶
AgentPublishing extends eventspype.MultiPublisher. To publish events, declare EventPublication instances as class attributes and call self.publish().
from enum import Enum
from dataclasses import dataclass
from eventspype.pub.publication import EventPublication
from agentspype.agent.publishing import AgentPublishing
class WorkerPublishing(AgentPublishing):
class Events(Enum):
JobCompleted = "job_completed"
JobFailed = "job_failed"
@dataclass
class JobCompletedEvent:
job_id: str
result: str
@dataclass
class JobFailedEvent:
job_id: str
error: str
job_completed = EventPublication(
event_tag=Events.JobCompleted,
event_class=JobCompletedEvent,
)
job_failed = EventPublication(
event_tag=Events.JobFailed,
event_class=JobFailedEvent,
)
def publish_job_completed(self, job_id: str, result: str) -> None:
self.publish(
self.job_completed,
self.JobCompletedEvent(job_id=job_id, result=result),
)
def publish_job_failed(self, job_id: str, error: str) -> None:
self.publish(
self.job_failed,
self.JobFailedEvent(job_id=job_id, error=error),
)
The event_tag is an enum value used by subscribers to filter events. The event_class is the data class type that carries event payload.
Accessing the Agent from Publishing¶
AgentPublishing holds a weakref to the agent and exposes it via the agent property:
def publish_job_completed(self, job_id, result):
self.agent.logger().info(f"Publishing job_completed for {job_id}")
self.publish(self.job_completed, self.JobCompletedEvent(job_id, result))
Discovering Published Events¶
AgentPublishing.get_event_definitions() returns all EventPublication instances declared on the class (used by the visualization system):
definitions = WorkerPublishing.get_event_definitions()
# {'job_completed': <EventPublication>, 'job_failed': <EventPublication>}
State Transition Events with StateAgentPublishing¶
StateAgentPublishing is a subclass of AgentPublishing that adds one pre-built publication: sm_transition_event. It broadcasts a StateMachineEvent dataclass containing the event name and new state after every transition.
from agentspype.agent.publishing import StateAgentPublishing
# StateAgentPublishing.Events.StateMachineTransition — the event tag
# StateAgentPublishing.StateMachineEvent(event, new_state) — the data class
# StateAgentPublishing.sm_transition_event — the EventPublication
To broadcast transitions, call publish_transition from the state machine's after_transition hook:
class MyStateMachine(AgentStateMachine):
...
def after_transition(self, event, state):
if isinstance(self.agent.publishing, StateAgentPublishing):
self.agent.publishing.publish_transition(event, state)
Other agents can subscribe to StateAgentPublishing.Events.StateMachineTransition to react to state changes in this agent.
Event Listening¶
AgentListening¶
AgentListening extends eventspype.MultiSubscriber. You must implement two abstract methods:
subscribe()— called byAgentStateMachine.on_start()when thestarttransition fires.unsubscribe()— called byAgent.teardown()when the agent shuts down.
from agentspype.agent.listening import AgentListening
class WorkerListening(AgentListening):
def subscribe(self):
# Register subscriptions here using eventspype's API.
# For example (API depends on eventspype version):
# self.add_subscription(
# publisher=some_publisher,
# event_tag=SomePublisher.Events.SomeEvent,
# callback=self.on_some_event,
# )
self.agent.logger().debug("Subscribed to events")
def unsubscribe(self):
# Remove all subscriptions
# self.remove_all_subscriptions()
self.agent.logger().debug("Unsubscribed from events")
# --- Callbacks ---
def on_job_available(self, event_data) -> None:
"""Called when a job becomes available."""
self.agent.machine.pick_job()
def on_shutdown_requested(self, event_data) -> None:
"""Called when a shutdown is requested."""
self.agent.machine.safe_stop()
Accessing the Agent from Listening¶
Like publishing, AgentListening exposes the agent via a weakref-backed property:
def on_job_available(self, event_data):
self.agent.status.jobs_pending += 1
self.agent.machine.pick_job()
Discovering Subscriptions¶
get_event_definitions() is an optional classmethod that subclasses can implement to support the visualization system. It should return a dict of subscription metadata. The visualization code checks for its presence with hasattr(), so it is not required on the base class. If implemented, the returned dict values should include event_tag and publisher_class attributes to describe what events the listener subscribes to.
Lifecycle Integration¶
The listening/publishing lifecycle is tied to the state machine:
machine.start()
└── on_start() → listening.subscribe()
machine.stop() or machine entering a final state
└── teardown() → listening.unsubscribe()
This means an agent only receives events while it is running (between start and stop). Before start() is called and after stop() fires, no callbacks will be invoked even if events are published.
Example: Two Communicating Agents¶
# Producer publishes work items
class ProducerPublishing(StateAgentPublishing):
class Events(Enum):
WorkAvailable = "work_available"
@dataclass
class WorkEvent:
item: str
work_available = EventPublication(
event_tag=Events.WorkAvailable,
event_class=WorkEvent,
)
def publish_work(self, item: str) -> None:
self.publish(self.work_available, self.WorkEvent(item=item))
# Consumer listens to the producer
class ConsumerListening(AgentListening):
def subscribe(self):
# Subscribe to the producer's work_available event
# self.add_subscription(producer_instance.publishing, ...)
pass
def unsubscribe(self):
pass
def on_work_available(self, event: ProducerPublishing.WorkEvent) -> None:
self.agent.logger().info(f"Received work: {event.item}")
self.agent.machine.pick_job()