Communications API Reference¶
processpype.communications
OutgoingMessage¶
class OutgoingMessage(BaseModel):
"""A message to send via a communicator backend."""
| Field | Type | Default | Description |
|---|---|---|---|
message |
str |
(required) | The message body |
label |
str |
"default" |
Routing label (min length 1) |
source |
str \| None |
None |
Identifier of the component that produced the message |
severity |
MessageSeverity |
MessageSeverity.INFO |
Severity level |
metadata |
dict[str, Any] |
{} |
Arbitrary key-value metadata |
subject |
str \| None |
None |
Optional subject line (used by email backend) |
Example¶
from processpype.communications.models import OutgoingMessage, MessageSeverity
msg = OutgoingMessage(
message="Deployment complete",
label="alerts",
source="deploy-service",
severity=MessageSeverity.WARNING,
)
IncomingMessage¶
class IncomingMessage(BaseModel):
"""A message received from a communicator backend (published as event)."""
| Field | Type | Default | Description |
|---|---|---|---|
text |
str |
(required) | Message text |
sender |
str \| None |
None |
Human-readable sender name |
sender_id |
str \| None |
None |
Backend-specific sender identifier |
chat_label |
str |
"default" |
Label of the chat that received the message |
backend_name |
str |
"" |
Name of the communicator that received it |
timestamp |
datetime \| None |
None |
Message timestamp |
metadata |
dict[str, Any] |
{} |
Arbitrary key-value metadata |
raw_event |
Any |
None |
Backend-specific raw event object (excluded from serialization) |
MessageSeverity¶
class MessageSeverity(StrEnum):
A StrEnum representing message severity levels.
| Member | Value |
|---|---|
DEBUG |
"debug" |
INFO |
"info" |
WARNING |
"warning" |
ERROR |
"error" |
CRITICAL |
"critical" |
MessageHandler¶
MessageHandler = Callable[[IncomingMessage], Awaitable[None]]
Type alias for async functions that handle incoming messages.
Communicator¶
processpype.communications.base.Communicator
class Communicator(ABC):
"""Abstract base for communication backends."""
Constructor¶
def __init__(self, name: str, config: CommunicatorBackendConfig) -> None
Parameters:
name--- Unique name for this communicator instanceconfig--- Backend configuration
Properties¶
name¶
@property
def name(self) -> str
The communicator instance name.
is_started¶
@property
def is_started(self) -> bool
True after start() has completed successfully.
supports_receiving¶
@property
def supports_receiving(self) -> bool
False by default. Backends that can receive messages should override this to return True.
Methods¶
set_incoming_handler¶
def set_incoming_handler(self, handler: Callable[[IncomingMessage], None]) -> None
Set by the dispatcher to route incoming messages to the event publisher. The handler is synchronous --- eventspype's EventPublisher.publish() dispatches synchronously. For async processing, use eventspype's QueueEventSubscriber to bridge to an async consumer.
Abstract Methods¶
start¶
async def start(self) -> None
Initialize and connect the backend. Must set self._started = True on success.
stop¶
async def stop(self) -> None
Disconnect and clean up resources. Must set self._started = False.
send¶
async def send(self, message: OutgoingMessage) -> None
Send an outgoing message through the backend.
NoOpCommunicator¶
processpype.communications.base.NoOpCommunicator
class NoOpCommunicator(Communicator):
"""Silent fallback when a backend is unavailable."""
A no-operation communicator returned when a backend dependency is missing or the backend type is unknown. All methods are no-ops. Always reports is_started = True and supports_receiving = False.
CommunicationDispatcher¶
processpype.communications.dispatcher.CommunicationDispatcher
class CommunicationDispatcher:
"""Routes OutgoingMessage to registered Communicator backends by label."""
Outgoing messages are routed by their label field to all communicators registered for that label. Incoming messages from backends are published as eventspype events.
Constructor¶
def __init__(self) -> None
Creates an empty dispatcher with no registered communicators.
Properties¶
incoming_publisher¶
@property
def incoming_publisher(self) -> EventPublisher
The eventspype EventPublisher for incoming messages. Subscribe to this publisher to receive IncomingMessage events from all backends.
Methods¶
register¶
def register(
self, communicator: Communicator, labels: list[str] | None = None
) -> None
Register a communicator backend for the given labels. If labels is None, defaults to ["default"]. If the communicator supports receiving, its incoming handler is automatically wired to the event publisher.
Parameters:
communicator--- The communicator instance to registerlabels--- Labels this communicator handles
unregister¶
def unregister(self, name: str) -> None
Remove a communicator backend by name. Cleans up all label mappings.
emit¶
async def emit(self, message: OutgoingMessage) -> None
Send a message to all communicators registered for message.label. Backends that are not started are skipped. Exceptions from individual backends are logged but do not prevent delivery to remaining backends.
start_all¶
async def start_all(self) -> None
Start all registered communicators. Exceptions from individual backends are logged but do not prevent starting remaining backends.
stop_all¶
async def stop_all(self) -> None
Stop all registered communicators. Exceptions from individual backends are logged but do not prevent stopping remaining backends.
Module Functions¶
processpype.communications.dispatcher
get_dispatcher¶
def get_dispatcher() -> CommunicationDispatcher
Get or create the global CommunicationDispatcher singleton. Thread-safe.
emit_message¶
async def emit_message(message: OutgoingMessage) -> None
Convenience function that emits a message via the global dispatcher.
from processpype.communications.dispatcher import emit_message
from processpype.communications.models import OutgoingMessage
await emit_message(OutgoingMessage(message="Hello", label="alerts"))
init_communications¶
processpype.communications.setup.init_communications
async def init_communications(config: CommunicationsConfig) -> None
Initialize the communication subsystem from application configuration. For each enabled backend in config.backends, creates a communicator via create_communicator(), registers it with the global dispatcher, and starts all communicators.
Does nothing if config.enabled is False.
Backend Factory¶
processpype.communications.backends
create_communicator¶
def create_communicator(name: str, config: CommunicatorBackendConfig) -> Communicator
Create a Communicator instance from a backend configuration. Checks the plugin registry first, then falls back to built-in types ("telegram", "email"). Returns a NoOpCommunicator if the required dependency is not installed or the type is unknown.
Parameters:
name--- Instance name for the communicatorconfig--- Backend configuration (must include atypefield)
Returns: A Communicator instance
register_backend¶
def register_backend(type_name: str, factory: Callable[..., Communicator]) -> None
Register a custom communicator backend factory. The factory must accept (name: str, config: CommunicatorBackendConfig) and return a Communicator.
from processpype.communications.backends import register_backend
def my_factory(name, config):
return MyCustomCommunicator(name, config)
register_backend("my_backend", my_factory)
TelegramCommunicator¶
processpype.communications.backends.telegram.TelegramCommunicator
class TelegramCommunicator(Communicator):
"""Telegram bot communicator with send and optional receive support."""
Requires the telethon package (pip install processpype[telegram]).
Constructor¶
def __init__(self, name: str, config: TelegramCommunicatorConfig) -> None
Parameters:
name--- Communicator instance nameconfig--- Telegram-specific configuration
Configuration¶
TelegramCommunicatorConfig extends CommunicatorBackendConfig:
| Field | Type | Default | Description |
|---|---|---|---|
api_id |
int |
(required) | Telegram API ID |
api_hash |
str |
(required) | Telegram API hash |
token |
str |
(required) | Bot token |
session_string |
str |
"" |
Session string for auth persistence |
listen_to_commands |
bool |
False |
Enable incoming message handling |
chats |
dict[str, TelegramChatConfig] |
{} |
Chat configurations keyed by label |
TelegramChatConfig:
| Field | Type | Default | Description |
|---|---|---|---|
chat_id |
str |
(required) | Chat/channel identifier |
topic_id |
int \| None |
None |
Forum topic ID |
command_authorized |
bool |
False |
Accept commands from this chat |
active |
bool |
True |
Whether this chat is active |
Behavior¶
- Sending: Messages are queued internally and drained by a background task to respect Telegram rate limits. Long messages are split into chunks of 30 lines. Failed sends are retried up to 3 times with exponential backoff. Messages are routed to the chat configured for the message's label, falling back to the
"default"chat. - Receiving: When
listen_to_commandsisTrue,supports_receivingreturnsTrueand a persistent file session is used. Incoming messages are filtered by authorization (command_authorizedon the matching chat config) and published asIncomingMessageevents via the dispatcher. - Sessions: Uses
StringSession(ephemeral) for send-only mode. Ifsession_stringis provided, uses it for auth persistence. Iflisten_to_commandsis enabled without a session string, uses a persistent file session.
EmailCommunicator¶
processpype.communications.backends.email.EmailCommunicator
class EmailCommunicator(Communicator):
"""Async email communicator (send-only)."""
Requires the aiosmtplib package (pip install processpype[email]).
Constructor¶
def __init__(self, name: str, config: EmailCommunicatorConfig) -> None
Parameters:
name--- Communicator instance nameconfig--- Email-specific configuration
Configuration¶
EmailCommunicatorConfig extends CommunicatorBackendConfig:
| Field | Type | Default | Description |
|---|---|---|---|
host |
str |
"localhost" |
SMTP host |
port |
int |
587 |
SMTP port |
username |
str |
"" |
SMTP username |
password |
str |
"" |
SMTP password |
from_address |
str |
(required) | Sender email address |
use_tls |
bool |
True |
Use TLS |
start_tls |
bool |
False |
Use STARTTLS after connecting (for port 587) |
default_recipients |
list[str] |
[] |
Default recipient addresses |
Behavior¶
- Send-only:
supports_receivingalways returnsFalse. - Recipients: Determined by
message.metadata["recipients"]if present, otherwise falls back toconfig.default_recipients. If neither is set, the message is skipped with a warning. - Subject: Uses
message.subjectif set, otherwise generates[SEVERITY] Notification. - Reconnection: Automatically reconnects on
SMTPServerDisconnectedorConnectionErrorand retries the send.
CommunicationsConfig¶
processpype.config.models.CommunicationsConfig
class CommunicationsConfig(ConfigurationModel):
"""Top-level communications configuration."""
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
False |
Enable the communication system |
backends |
dict[str, CommunicatorBackendConfig] |
{} |
Named communicator backends (auto-discriminated by type field) |
The backends dict values are automatically discriminated into TelegramCommunicatorConfig, EmailCommunicatorConfig, or the base CommunicatorBackendConfig based on the type field.
CommunicatorBackendConfig¶
processpype.config.models.CommunicatorBackendConfig
class CommunicatorBackendConfig(ConfigurationModel):
"""Base configuration for a communicator backend instance."""
| Field | Type | Default | Description |
|---|---|---|---|
type |
str |
(required) | Backend type: "telegram", "email", or a custom type |
enabled |
bool |
True |
Whether this backend is active |
labels |
list[str] |
["default"] |
Labels this backend handles (cannot be empty) |