Processors¶
Processors define the work that executes on each clock tick. They are the primary extension point in chronopype.
Creating a Processor¶
Subclass TickProcessor and override async_tick (or tick for synchronous work):
from chronopype.processors import TickProcessor
class MyProcessor(TickProcessor):
async def async_tick(self, timestamp: float) -> None:
# Your logic here --- runs on every clock tick
await self.fetch_data(timestamp)
await self.process(timestamp)
For synchronous processors, override tick instead:
class SyncProcessor(TickProcessor):
def tick(self, timestamp: float) -> None:
print(f"Processing at {timestamp}")
Note
The default async_tick calls tick(), so you only need to override one.
Processor Lifecycle¶
A processor goes through the following states:
- Created --- instantiated but not yet attached to a clock
- Started ---
start(timestamp)is called when the clock context opens - Active --- receiving ticks from the clock
- Paused --- registered but skipping ticks
- Stopped ---
stop()is called when the clock context closes
processor = MyProcessor()
# The clock manages the lifecycle automatically:
async with RealtimeClock(config) as clock:
clock.add_processor(processor) # start() is called
# ... processor receives ticks ...
# stop() is called on context exit
State Tracking¶
Every processor maintains a ProcessorState (a frozen Pydantic model) that tracks execution statistics:
processor = MyProcessor()
# After running with a clock:
state = processor.state
# Execution stats
state.total_ticks # total tick attempts
state.successful_ticks # successful executions
state.failed_ticks # failed executions
state.avg_execution_time # average execution time
state.max_execution_time # maximum execution time
state.error_rate # error percentage
# Error tracking
state.error_count # total errors
state.consecutive_errors # current error streak
state.last_error # last error message
state.last_error_time # when last error occurred
# Retry tracking
state.retry_count # current retry count
state.max_consecutive_retries # highest retry streak
Error Handling and Retries¶
The clock automatically retries failed processors with exponential backoff:
config = ClockConfig(
clock_mode=ClockMode.REALTIME,
start_time=time.time(),
tick_size=1.0,
max_retries=3, # retry up to 3 times
processor_timeout=2.0, # timeout per execution
)
The backoff formula is 0.1 * 2^(retry - 1) seconds. If all retries are exhausted, the error callback is invoked (if set) and execution continues to the next tick.
Concurrent Execution¶
By default, processors execute sequentially. Enable concurrent execution for independent processors:
config = ClockConfig(
clock_mode=ClockMode.REALTIME,
start_time=time.time(),
tick_size=1.0,
concurrent_processors=True,
)
With concurrent execution, all active processors run in parallel using asyncio.gather.
Warning
Ensure your processors are safe for concurrent execution. Avoid shared mutable state between processors without proper synchronization.
Custom Initialization¶
Pass stats_window_size to control the rolling window for execution time statistics:
class MyProcessor(TickProcessor):
def __init__(self) -> None:
super().__init__(stats_window_size=200) # track last 200 ticks
async def async_tick(self, timestamp: float) -> None:
...
Accessing Timestamp¶
The current_timestamp property returns the last timestamp the processor was called with:
processor = MyProcessor()
# After ticks have been processed:
print(processor.current_timestamp) # last tick timestamp