Network Processor¶
NetworkProcessor extends TickProcessor with built-in network connectivity management, automatic reconnection, and exponential backoff. Use it for processors that depend on external services.
Creating a Network Processor¶
Subclass NetworkProcessor and implement the required abstract methods:
import logging
from chronopype.processors import NetworkProcessor, NetworkStatus
class ApiProcessor(NetworkProcessor):
LOGGER_NAME = "ApiProcessor"
@classmethod
def logger(cls) -> logging.Logger:
return logging.getLogger(cls.LOGGER_NAME)
async def check_network(self) -> NetworkStatus:
"""Check if the API is reachable."""
try:
response = await self.client.health_check()
if response.ok:
return NetworkStatus.CONNECTED
return NetworkStatus.NOT_CONNECTED
except Exception:
return NetworkStatus.ERROR
async def start_network(self) -> None:
"""Called when connection is established."""
self.client = await create_api_client()
async def stop_network(self) -> None:
"""Called when disconnecting."""
await self.client.close()
def tick(self, timestamp: float) -> None:
"""Only runs when network is connected."""
self.client.process(timestamp)
Required Methods¶
| Method | Purpose |
|---|---|
logger() |
Class method returning a logging.Logger |
check_network() |
Return current NetworkStatus |
Optional Override Methods¶
| Method | Purpose | Default |
|---|---|---|
start_network() |
Setup when connection is established | No-op |
stop_network() |
Cleanup when disconnecting | No-op |
on_connected() |
Callback on CONNECTED transition |
Logs info, records connection timestamp |
on_disconnected() |
Callback on disconnect from CONNECTED |
Logs info |
tick(timestamp) |
Synchronous tick processing | No-op |
async_tick(timestamp) |
Async tick processing | Calls tick() |
Network Status¶
The processor tracks its connectivity state:
class NetworkStatus(Enum):
STOPPED = 0 # Processor not running
NOT_CONNECTED = 1 # Not connected to network
CONNECTING = 2 # Connection in progress
CONNECTED = 3 # Connected and operational
DISCONNECTING = 4 # Disconnection in progress
ERROR = 5 # Error state
Access the current status:
processor = ApiProcessor()
print(processor.network_status) # NetworkStatus.STOPPED initially
Automatic Reconnection¶
The network processor runs a background loop that periodically checks connectivity and manages reconnection with exponential backoff:
- Check interval: 10 seconds (configurable)
- Check timeout: 5 seconds (configurable)
- Backoff: Exponential with jitter, min 1s, max 5 minutes
- Error wait: 60 seconds for unexpected errors (configurable)
The jitter is +/-10% to avoid thundering herd problems when multiple processors reconnect simultaneously.
Configuration¶
Adjust network behavior via properties:
processor = ApiProcessor()
# How often to check connectivity (minimum 0.1s)
processor.check_network_interval = 5.0
# Timeout for each check_network() call (minimum 0.1s)
processor.check_network_timeout = 3.0
# Wait time after unexpected errors (minimum 0.1s)
processor.network_error_wait_time = 30.0
Status Transitions¶
The processor handles status transitions and fires callbacks:
STOPPED ──start()──> NOT_CONNECTED
│
check_network()
│
┌────▼────┐
│CONNECTING│
└────┬────┘
│
┌──────────┼──────────┐
▼ ▼ ▼
CONNECTED NOT_CONNECTED ERROR
│ │
on_connected() backoff + retry
│
──stop()──> DISCONNECTING ──> STOPPED
on_disconnected()
Full Example¶
import asyncio
import logging
from chronopype import ClockConfig, ClockMode
from chronopype.clocks import RealtimeClock
from chronopype.processors import NetworkProcessor, NetworkStatus
class WebSocketProcessor(NetworkProcessor):
LOGGER_NAME = "WebSocketProcessor"
@classmethod
def logger(cls) -> logging.Logger:
return logging.getLogger(cls.LOGGER_NAME)
async def check_network(self) -> NetworkStatus:
if self.ws and self.ws.open:
return NetworkStatus.CONNECTED
return NetworkStatus.NOT_CONNECTED
async def start_network(self) -> None:
self.ws = await websockets.connect("wss://example.com/feed")
async def stop_network(self) -> None:
if self.ws:
await self.ws.close()
def on_connected(self) -> None:
super().on_connected()
self.logger().info("WebSocket connected, subscribing to feed")
async def async_tick(self, timestamp: float) -> None:
if self.network_status == NetworkStatus.CONNECTED:
data = await self.ws.recv()
self.process_message(data, timestamp)
async def main():
config = ClockConfig(
clock_mode=ClockMode.REALTIME,
start_time=asyncio.get_event_loop().time(),
tick_size=0.1,
)
processor = WebSocketProcessor()
processor.check_network_interval = 5.0
async with RealtimeClock(config) as clock:
clock.add_processor(processor)
await clock.run_til(config.start_time + 300)
asyncio.run(main())