NetworkProcessor

Module: chronopype.processors.network

Inherits: TickProcessor

Abstract base class for processors that depend on network connectivity. Provides automatic connection monitoring, reconnection with exponential backoff, and status transition callbacks.

Constructor

NetworkProcessor(stats_window_size: int = 100)

Properties

Property Type Description
network_status NetworkStatus Current connectivity status
last_connected_timestamp float Timestamp of last successful connection (default: float("nan"))
check_network_interval float Seconds between connectivity checks (default: 10.0, min: 0.1)
check_network_timeout float Timeout for each check_network() call (default: 5.0, min: 0.1)
network_error_wait_time float Wait time after unexpected errors (default: 60.0, min: 0.1)

All configurable properties have setters with minimum value enforcement of 0.1 seconds.

Abstract Methods (must implement)

logger() (classmethod)

Return a logging.Logger instance for this processor. Raises NotImplementedError if not overridden.

@classmethod
def logger(cls) -> logging.Logger:
    ...

check_network()

Check current network connectivity. Must return a NetworkStatus value.

@abstractmethod
async def check_network(self) -> NetworkStatus:
    ...

Optional Override Methods

Method Signature Default Description
start_network async () -> None No-op Called when connection is established
stop_network async () -> None No-op Called when disconnecting
on_connected () -> None Logs info, records _last_connected_timestamp Transition to CONNECTED
on_disconnected () -> None Logs info Transition from CONNECTED
tick (timestamp: float) -> None No-op Sync tick handler
async_tick async (timestamp: float) -> None Calls tick() Async tick handler

NetworkStatus

class NetworkStatus(Enum):
    STOPPED = 0
    NOT_CONNECTED = 1
    CONNECTING = 2
    CONNECTED = 3
    DISCONNECTING = 4
    ERROR = 5

Lifecycle

pause()

Cancels the background network check loop. The processor remains registered but stops monitoring connectivity.

resume()

Restarts the background network check loop if the processor is active.

stop()

Cancels the background check loop, initiates an async fire-and-forget stop_network() call, and sets status to STOPPED. Call await_cleanup() after stop() to ensure network resources are fully released.

await_cleanup(timeout=5.0)

Await pending cleanup tasks (e.g., the stop_network() call initiated by stop()).

async def await_cleanup(self, timeout: float = 5.0) -> None:
    ...
Parameter Type Default Description
timeout float 5.0 Maximum seconds to wait for cleanup to complete

Backoff Behavior

When reconnecting, the processor uses exponential backoff with jitter:

  • Base delay: min(1 * 2^retry_count, 300) seconds
  • Jitter: +/- 10% randomization
  • Min wait: 1 second
  • Max wait: 5 minutes (300 seconds)
  • Error wait: 60 seconds for unexpected errors (configurable)

Class Variable

Variable Default Description
LOGGER_NAME "NetworkProcessor" Override in subclasses for distinct logging

Example

import logging
from chronopype.processors import NetworkProcessor, NetworkStatus


class DatabaseProcessor(NetworkProcessor):
    LOGGER_NAME = "DatabaseProcessor"

    @classmethod
    def logger(cls) -> logging.Logger:
        return logging.getLogger(cls.LOGGER_NAME)

    async def check_network(self) -> NetworkStatus:
        try:
            await self.pool.execute("SELECT 1")
            return NetworkStatus.CONNECTED
        except Exception:
            return NetworkStatus.NOT_CONNECTED

    async def start_network(self) -> None:
        self.pool = await create_pool(dsn="postgres://...")

    async def stop_network(self) -> None:
        await self.pool.close()

    async def async_tick(self, timestamp: float) -> None:
        if self.network_status == NetworkStatus.CONNECTED:
            await self.pool.execute("INSERT INTO ticks ...")