Publishing

Module: agentspype.agent.publishing

AgentPublishing

Base class for agent event publishers. Extends eventspype.pub.multipublisher.MultiPublisher with agent integration.

Constructor

AgentPublishing(agent: Agent) -> None

Stores a weakref to agent and calls super().__init__().

Properties

agent -> Agent

Resolves the weakref to the owning agent. Raises RuntimeError("Agent has been deactivated") if the agent has been garbage-collected.

Methods

get_event_definitions() -> dict[str, EventPublication] (optional classmethod)

Not defined on the base class. Subclasses can implement this to support the visualization system. The PublishingVisualization checks for its presence with hasattr() and, if found, calls it to enumerate EventPublication attributes.

A typical implementation scans class attributes and returns those that are EventPublication instances:

WorkerPublishing.get_event_definitions()
# {"job_completed": <EventPublication>, "job_failed": <EventPublication>}

Declaring Events

Declare EventPublication instances as class attributes. Each publication needs an event_tag (an enum value) and an event_class (a data class or any type).

from enum import Enum
from dataclasses import dataclass
from eventspype.pub.publication import EventPublication
from agentspype.agent.publishing import AgentPublishing


class WorkerPublishing(AgentPublishing):

    class Events(Enum):
        JobCompleted = "job_completed"
        JobFailed = "job_failed"

    @dataclass
    class JobCompletedEvent:
        job_id: str
        result: str

    @dataclass
    class JobFailedEvent:
        job_id: str
        error: str

    job_completed = EventPublication(
        event_tag=Events.JobCompleted,
        event_class=JobCompletedEvent,
    )
    job_failed = EventPublication(
        event_tag=Events.JobFailed,
        event_class=JobFailedEvent,
    )

    def publish_job_completed(self, job_id: str, result: str) -> None:
        self.publish(
            self.job_completed,
            self.JobCompletedEvent(job_id=job_id, result=result),
        )

    def publish_job_failed(self, job_id: str, error: str) -> None:
        self.publish(
            self.job_failed,
            self.JobFailedEvent(job_id=job_id, error=error),
        )

StateAgentPublishing

Module: agentspype.agent.publishing

Extends AgentPublishing with a pre-built state machine transition event. Use this (or a subclass of it) when you want the agent to broadcast its state transitions.

Nested Types

Events (Enum)

class Events(Enum):
    StateMachineTransition = "sm_transition_event"

StateMachineEvent (dataclass)

@dataclass
class StateMachineEvent:
    event: Any       # the transition name (str)
    new_state: State # the state entered

Class Attributes

sm_transition_event

sm_transition_event = EventPublication(
    event_tag=Events.StateMachineTransition,
    event_class=StateMachineEvent,
)

An EventPublication that subscribers can use to listen for state transitions from this agent.

Methods

publish_transition(event, state) -> None

StateAgentPublishing.publish_transition(
    event: Any,
    state: State,
) -> None

Publishes a StateMachineEvent(event, state) via sm_transition_event. Call this from the state machine's after_transition hook:

class MyStateMachine(AgentStateMachine):
    def after_transition(self, event, state):
        if isinstance(self.agent.publishing, StateAgentPublishing):
            self.agent.publishing.publish_transition(event, state)

Extending StateAgentPublishing

You can add custom events on top of the built-in transition event:

class WorkerPublishing(StateAgentPublishing):
    class Events(Enum):
        # Keep the inherited transition event tag
        StateMachineTransition = "sm_transition_event"
        # Add custom events
        JobCompleted = "job_completed"

    @dataclass
    class JobCompletedEvent:
        job_id: str

    job_completed = EventPublication(
        event_tag=Events.JobCompleted,
        event_class=JobCompletedEvent,
    )

    # Inherit sm_transition_event and publish_transition from StateAgentPublishing
    sm_transition_event = StateAgentPublishing.sm_transition_event

    def publish_job_completed(self, job_id: str) -> None:
        self.publish(self.job_completed, self.JobCompletedEvent(job_id=job_id))