Quick Start

This guide walks through building a functional agent step by step. By the end you will have an agent with a custom state machine that publishes transition events.

Step 1 — Define a Configuration

Configuration is a Pydantic BaseModel. Add fields for anything the agent needs at startup.

from agentspype.agent.configuration import AgentConfiguration


class WorkerConfiguration(AgentConfiguration):
    worker_id: str
    max_retries: int = 3

Step 2 — Define a Status

Status tracks runtime information. It is mutable and updated by the agent during execution.

from agentspype.agent.status import AgentStatus


class WorkerStatus(AgentStatus):
    jobs_processed: int = 0
    errors_encountered: int = 0

Step 3 — Define a State Machine

Subclass AgentStateMachine and declare states and transitions as class attributes.

from agentspype.fsm import State
from agentspype.agent.state_machine import AgentStateMachine
from agentspype.agent.publishing import StateAgentPublishing


class WorkerStateMachine(AgentStateMachine):
    # States
    starting = State("Starting", initial=True)
    idle = State("Idle")
    processing = State("Processing")
    end = State("End", final=True)

    # Transitions
    start = starting.to(idle)
    pick_job = idle.to(processing)
    finish_job = processing.to(idle)
    stop = starting.to(end) | idle.to(end) | processing.to(end)

    def after_transition(self, event, state):
        # Publish the transition so other agents can react
        if isinstance(self.agent.publishing, StateAgentPublishing):
            self.agent.publishing.publish_transition(event, state)

The after_transition hook is called after every successful transition. Here it forwards the transition as an event to any subscribers.

Note

after_transition is abstract in AgentStateMachine. Every concrete subclass must implement it.

Step 4 — Define Event Listening

Subclass AgentListening and implement subscribe and unsubscribe. These are called automatically when the state machine enters and leaves the idle state (via the on_start / teardown hooks on the base class).

from agentspype.agent.listening import AgentListening


class WorkerListening(AgentListening):
    def subscribe(self):
        # Register subscriptions using eventspype's subscription API
        # Example (requires an actual publisher to exist):
        # self.add_subscription(some_publisher, SomeEvent.Tag, self.on_job_received)
        self.agent.logger().info("WorkerListening: subscribed")

    def unsubscribe(self):
        # self.remove_all_subscriptions()
        self.agent.logger().info("WorkerListening: unsubscribed")

    def on_job_received(self, event_data):
        """Called when a job event is received."""
        self.agent.machine.pick_job()

Step 5 — Define Event Publishing

For simple agents, StateAgentPublishing is often sufficient. It provides a single sm_transition_event publication that broadcasts every state transition.

For custom events, extend AgentPublishing (or StateAgentPublishing) and declare EventPublication attributes:

from enum import Enum
from dataclasses import dataclass
from eventspype.pub.publication import EventPublication
from agentspype.agent.publishing import StateAgentPublishing


class WorkerPublishing(StateAgentPublishing):
    class Events(Enum):
        JobCompleted = "job_completed"

    @dataclass
    class JobCompletedEvent:
        job_id: str
        result: str

    job_completed = EventPublication(
        event_tag=Events.JobCompleted,
        event_class=JobCompletedEvent,
    )

    def publish_job_completed(self, job_id: str, result: str) -> None:
        self.publish(
            self.job_completed,
            self.JobCompletedEvent(job_id=job_id, result=result),
        )

Step 6 — Assemble the Agent

Bind all components together in an AgentDefinition and declare it as a class variable on the Agent subclass.

from agentspype.agent.agent import Agent
from agentspype.agent.definition import AgentDefinition


class WorkerAgent(Agent):
    definition = AgentDefinition(
        configuration_class=WorkerConfiguration,
        events_publishing_class=WorkerPublishing,
        events_listening_class=WorkerListening,
        state_machine_class=WorkerStateMachine,
        status_class=WorkerStatus,
    )

    def initialize(self):
        """Called at the end of __init__. Set up any initial state here."""
        super().initialize()
        self.logger().info(f"Worker {self.configuration.worker_id} initialized")

Step 7 — Run the Agent

# Create from a dict — fields are validated by Pydantic
agent = WorkerAgent({"worker_id": "w-001", "max_retries": 5})

# Or from a configuration object
config = WorkerConfiguration(worker_id="w-002")
agent2 = WorkerAgent(config)

# Drive the state machine
agent.machine.start()        # starting -> idle, triggers subscribe()
agent.machine.pick_job()     # idle -> processing
agent.machine.finish_job()   # processing -> idle
agent.machine.stop()         # idle -> end, triggers teardown()

Step 8 — Visualize (Optional)

# Create and save a comprehensive diagram
agent = WorkerAgent({"worker_id": "w-001"})
agent.visualize(save_file=True, output_dir=".diagrams")

# Or just the state machine
agent.visualize_state_machine(save_file=True)

Diagrams are saved as PNG files in the output_dir directory.

Putting It All Together

from agentspype.fsm import State
from dataclasses import dataclass
from enum import Enum

from agentspype.agent.agent import Agent
from agentspype.agent.configuration import AgentConfiguration
from agentspype.agent.definition import AgentDefinition
from agentspype.agent.listening import AgentListening
from agentspype.agent.publishing import StateAgentPublishing
from agentspype.agent.state_machine import AgentStateMachine
from agentspype.agent.status import AgentStatus
from eventspype.pub.publication import EventPublication


class WorkerConfiguration(AgentConfiguration):
    worker_id: str
    max_retries: int = 3


class WorkerStatus(AgentStatus):
    jobs_processed: int = 0


class WorkerStateMachine(AgentStateMachine):
    starting = State("Starting", initial=True)
    idle = State("Idle")
    processing = State("Processing")
    end = State("End", final=True)

    start = starting.to(idle)
    pick_job = idle.to(processing)
    finish_job = processing.to(idle)
    stop = starting.to(end) | idle.to(end) | processing.to(end)

    def after_transition(self, event, state):
        if isinstance(self.agent.publishing, StateAgentPublishing):
            self.agent.publishing.publish_transition(event, state)


class WorkerListening(AgentListening):
    def subscribe(self):
        pass

    def unsubscribe(self):
        pass


class WorkerPublishing(StateAgentPublishing):
    class Events(Enum):
        StateMachineTransition = "sm_transition_event"
        JobCompleted = "job_completed"

    @dataclass
    class JobCompletedEvent:
        job_id: str
        result: str

    sm_transition_event = StateAgentPublishing.sm_transition_event

    job_completed = EventPublication(
        event_tag=Events.JobCompleted,
        event_class=JobCompletedEvent,
    )

    def publish_job_completed(self, job_id: str, result: str) -> None:
        self.publish(
            self.job_completed,
            self.JobCompletedEvent(job_id=job_id, result=result),
        )


class WorkerAgent(Agent):
    definition = AgentDefinition(
        configuration_class=WorkerConfiguration,
        events_publishing_class=WorkerPublishing,
        events_listening_class=WorkerListening,
        state_machine_class=WorkerStateMachine,
        status_class=WorkerStatus,
    )


agent = WorkerAgent({"worker_id": "w-001"})
agent.machine.start()
agent.machine.pick_job()
agent.machine.finish_job()
agent.machine.stop()