Listening¶
Module: agentspype.agent.listening
AgentListening¶
Base class for agent event listeners. Extends eventspype.sub.multisubscriber.MultiSubscriber with agent integration.
Constructor¶
AgentListening(agent: Agent) -> None
Stores a weakref to agent and calls super().__init__().
Properties¶
agent -> Agent¶
Resolves the weakref to the owning agent. Raises RuntimeError("Agent has been deactivated") if the agent has been garbage-collected.
Methods¶
logger() -> logging.Logger¶
Returns self.agent.logger() — the class-level logger of the owning agent.
get_event_definitions() -> dict[str, Any] (optional classmethod)¶
Not defined on the base class. Subclasses can implement this to support the visualization system. The ListeningVisualization checks for its presence with hasattr() and, if found, calls it to enumerate callback methods.
A typical implementation returns a dict mapping method names to subscription metadata with keys like "event_tag" and "publisher_class".
subscribe() -> None (abstract)¶
Called by AgentStateMachine.on_start() when the start transition fires. Implement to register event subscriptions using the eventspype API.
def subscribe(self) -> None:
# Example using eventspype subscription API
# self.add_subscription(publisher, EventTag, self.on_event)
pass
unsubscribe() -> None (abstract)¶
Called by Agent.teardown() when the agent shuts down. Implement to remove all subscriptions.
def unsubscribe(self) -> None:
# self.remove_all_subscriptions()
pass
Subclassing¶
Every AgentListening subclass must implement both subscribe and unsubscribe. A minimal no-op implementation:
from agentspype.agent.listening import AgentListening
class NoOpListening(AgentListening):
def subscribe(self) -> None:
pass
def unsubscribe(self) -> None:
pass
A real implementation using eventspype:
class WorkerListening(AgentListening):
def subscribe(self) -> None:
# subscribe to events from a publisher
# self.add_subscription(
# publisher=some_publisher_instance,
# event_tag=SomePublisher.Events.SomeTag,
# callback=self.on_some_event,
# )
pass
def unsubscribe(self) -> None:
# self.remove_all_subscriptions()
pass
def on_some_event(self, event_data) -> None:
"""Callback invoked when SomeTag event is received."""
self.agent.machine.pick_job()