Communications¶
The communications module provides a backend-agnostic messaging layer for sending and receiving messages. Outgoing messages are routed by label to one or more backends (Telegram, Email, or custom). Incoming messages from backends that support receiving are published as eventspype events, making them available to any subscriber in your application.
Architecture¶
┌──────────────────────────┐
│ CommunicationDispatcher │
└────────┬─────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
┌────────▼───────┐ ┌──────▼────────┐ ┌───────▼───────┐
│ Telegram │ │ Email │ │ Custom │
│ Communicator │ │ Communicator │ │ Communicator │
└────────┬────────┘ └───────────────┘ └───────────────┘
│
┌──────────┴──────────┐
│ Outgoing │ Incoming
│ label routing │ eventspype events
│ → send to chats │ → publish IncomingMessage
└─────────────────────┘
Outgoing flow: Your code creates an OutgoingMessage with a label. The dispatcher finds all backends registered for that label and calls send() on each.
Incoming flow: Backends that support receiving (e.g. Telegram) forward messages to the dispatcher, which publishes them as eventspype events. Any part of your application can subscribe.
Configuration¶
Enable communications and declare backends under the communications key in your YAML config:
communications:
enabled: true
backends:
my_telegram:
type: telegram
api_id: ${TELEGRAM_API_ID}
api_hash: ${TELEGRAM_API_HASH}
token: ${TELEGRAM_BOT_TOKEN}
labels:
- default
- alerts
listen_to_commands: true
chats:
default:
chat_id: "-1001234567890"
active: true
alerts:
chat_id: "-1009876543210"
topic_id: 42
command_authorized: true
active: true
my_email:
type: email
host: smtp.example.com
port: 587
username: ${SMTP_USER}
password: ${SMTP_PASS}
from_address: noreply@example.com
start_tls: true
default_recipients:
- ops@example.com
labels:
- alerts
Top-level fields¶
| Field | Default | Description |
|---|---|---|
enabled |
false |
Enable the communication system |
backends |
{} |
Named backend instances |
Backend base fields¶
Every backend inherits these fields from CommunicatorBackendConfig:
| Field | Default | Description |
|---|---|---|
type |
--- | Backend type: telegram, email, or a custom registered name |
enabled |
true |
Whether this backend is active |
labels |
["default"] |
Labels this backend handles |
Labels connect outgoing messages to backends. A message with label="alerts" is sent to every backend whose labels list includes "alerts".
Sending Messages¶
Create an OutgoingMessage and emit it through the dispatcher:
from processpype.communications import OutgoingMessage, emit_message
msg = OutgoingMessage(
message="Deployment completed successfully",
label="default",
source="deploy-service",
severity="info",
)
await emit_message(msg)
OutgoingMessage fields¶
| Field | Default | Description |
|---|---|---|
message |
--- | The message text |
label |
"default" |
Routing label (must match a backend's labels) |
source |
None |
Identifier of the sender (for logging/display) |
severity |
"info" |
One of debug, info, warning, error, critical |
metadata |
{} |
Arbitrary data passed to backends |
subject |
None |
Subject line (used by email backend) |
Label routing¶
A single message can reach multiple backends if they share the same label:
# Both my_telegram and my_email are registered for "alerts"
await emit_message(OutgoingMessage(
message="Database connection lost",
label="alerts",
severity="critical",
subject="DB Alert", # used by email backend
))
If no backend is registered for a label, the message is silently dropped and a debug log is emitted.
Using the dispatcher directly¶
For more control, access the dispatcher instance:
from processpype.communications import get_dispatcher
dispatcher = get_dispatcher()
await dispatcher.emit(message)
Receiving Messages¶
Backends that support receiving (currently Telegram) publish incoming messages as eventspype events. Subscribe to them through the dispatcher's incoming_publisher.
IncomingMessage fields¶
| Field | Default | Description |
|---|---|---|
text |
--- | The message text |
sender |
None |
Sender username |
sender_id |
None |
Sender identifier |
chat_label |
"default" |
Label of the chat that received the message |
backend_name |
"" |
Name of the backend that received it |
timestamp |
None |
Message timestamp |
metadata |
{} |
Backend-specific metadata |
raw_event |
None |
Original backend event (excluded from serialization) |
Subscribing with eventspype¶
The dispatcher exposes an incoming_publisher that you can subscribe to using any eventspype subscriber:
from eventspype.subscribers import FunctionalEventSubscriber
from processpype.communications import get_dispatcher, IncomingMessage
dispatcher = get_dispatcher()
# Simple functional subscriber
subscriber = FunctionalEventSubscriber(
name="my_handler",
publication=dispatcher.incoming_publication,
)
@subscriber.on_event
async def handle_message(message: IncomingMessage) -> None:
print(f"[{message.backend_name}] {message.sender}: {message.text}")
subscriber.subscribe(dispatcher.incoming_publisher)
TrackingEventSubscriber¶
Use TrackingEventSubscriber to keep a log of received messages:
from eventspype.subscribers import TrackingEventSubscriber
tracker = TrackingEventSubscriber(
name="message_tracker",
publication=dispatcher.incoming_publication,
)
tracker.subscribe(dispatcher.incoming_publisher)
# Later, inspect received messages
for event in tracker.events:
print(event.data.text)
QueueEventSubscriber¶
Use QueueEventSubscriber to bridge synchronous event delivery to async processing:
from eventspype.subscribers import QueueEventSubscriber
queue_sub = QueueEventSubscriber(
name="message_queue",
publication=dispatcher.incoming_publication,
)
queue_sub.subscribe(dispatcher.incoming_publisher)
# Consume from the queue in an async task
async def process_messages():
while True:
event = await queue_sub.get()
await handle(event.data)
This is the recommended pattern when your handler performs async I/O, since the dispatcher's event delivery is synchronous.
Built-in Backends¶
Telegram¶
The Telegram backend uses Telethon and supports both sending and receiving. Install it with:
pip install processpype[telegram]
Configuration¶
backends:
my_telegram:
type: telegram
api_id: ${TELEGRAM_API_ID}
api_hash: ${TELEGRAM_API_HASH}
token: ${TELEGRAM_BOT_TOKEN}
session_string: ""
listen_to_commands: false
labels:
- default
chats:
default:
chat_id: "-1001234567890"
active: true
alerts:
chat_id: "-1001234567890"
topic_id: 42
command_authorized: true
active: true
| Field | Default | Description |
|---|---|---|
api_id |
--- | Telegram API ID |
api_hash |
--- | Telegram API hash |
token |
--- | Bot token |
session_string |
"" |
Persistent session string for authentication |
listen_to_commands |
false |
Enable incoming message handling |
chats |
{} |
Chat configurations keyed by label |
Each chat entry (TelegramChatConfig):
| Field | Default | Description |
|---|---|---|
chat_id |
--- | Chat or channel identifier |
topic_id |
None |
Forum topic ID (for supergroup topics) |
command_authorized |
false |
Accept incoming commands from this chat |
active |
true |
Whether this chat destination is active |
Sending behavior¶
Outgoing messages are queued internally and drained by a background task. Long messages are automatically split into chunks of 30 lines. Failed sends are retried up to 3 times with exponential backoff.
Receiving behavior¶
When listen_to_commands is true, the backend listens for all new messages on Telegram. Only messages from chats with command_authorized: true are forwarded to the dispatcher. The topic_id field narrows authorization to a specific forum topic within a supergroup.
Email¶
The Email backend uses aiosmtplib and is send-only. Install it with:
pip install processpype[email]
Configuration¶
backends:
my_email:
type: email
host: smtp.example.com
port: 587
username: ${SMTP_USER}
password: ${SMTP_PASS}
from_address: noreply@example.com
use_tls: true
start_tls: false
default_recipients:
- ops@example.com
labels:
- alerts
| Field | Default | Description |
|---|---|---|
host |
"localhost" |
SMTP server hostname |
port |
587 |
SMTP port |
username |
"" |
SMTP username |
password |
"" |
SMTP password |
from_address |
--- | Sender email address |
use_tls |
true |
Connect with TLS |
start_tls |
false |
Upgrade to TLS via STARTTLS after connecting (for port 587) |
default_recipients |
[] |
Default recipient addresses |
Sending behavior¶
Recipients can be set per-message via metadata["recipients"], or fall back to default_recipients from the config. The subject field of OutgoingMessage sets the email subject; if omitted, a default subject is generated from the severity level.
await emit_message(OutgoingMessage(
message="Disk usage at 95%",
label="alerts",
severity="warning",
subject="Disk Alert",
metadata={"recipients": ["admin@example.com"]},
))
The backend automatically reconnects if the SMTP connection is lost.
Custom Backends¶
Implement the Communicator abstract base class to create a custom backend:
from processpype.communications.base import Communicator
from processpype.communications.models import OutgoingMessage
from processpype.config.models import CommunicatorBackendConfig
class SlackCommunicator(Communicator):
def __init__(self, name: str, config: CommunicatorBackendConfig) -> None:
super().__init__(name, config)
self._webhook_url = config.extra_fields.get("webhook_url", "")
async def start(self) -> None:
# Initialize client, open connections
self._started = True
async def stop(self) -> None:
# Clean up resources
self._started = False
async def send(self, message: OutgoingMessage) -> None:
# Send message via Slack webhook
...
To support receiving, override supports_receiving and call self._on_incoming() when a message arrives:
@property
def supports_receiving(self) -> bool:
return True
async def start(self) -> None:
self._started = True
# Start listening for messages, then for each:
# self._on_incoming(IncomingMessage(text=..., backend_name=self._name))
Registering a custom backend¶
Register your backend so it can be referenced by type name in configuration:
from processpype.communications.backends import register_backend
register_backend("slack", lambda name, config: SlackCommunicator(name, config))
Then use it in YAML:
communications:
enabled: true
backends:
my_slack:
type: slack
labels:
- default
webhook_url: ${SLACK_WEBHOOK_URL}
Initialization¶
The communications system is initialized automatically by Application.create() when communications.enabled is true. You can also initialize it manually:
from processpype.communications import init_communications
await init_communications(config.communications)
This creates backends from configuration, registers them with the dispatcher, and starts all backends.
Next Steps¶
- Configuration --- Environment variable substitution and YAML config details
- Services --- Integrate communications into your service lifecycle