AsyncThrottler¶
networkpype.throttler.throttler
Asynchronous rate limiter for API requests. Manages a shared task log and blocks execution until capacity is available across all relevant rate limits.
class AsyncThrottler
Constructor¶
AsyncThrottler(
rate_limits: list[RateLimit],
retry_interval: float = 0.1,
safety_margin_pct: float = 0.05,
limits_share_percentage: Decimal | None = None,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
rate_limits |
list[RateLimit] |
— | Rate limits to enforce |
retry_interval |
float |
0.1 |
Seconds between capacity checks when limits are full |
safety_margin_pct |
float |
0.05 |
Reserve this fraction of each limit as a buffer (e.g. 0.05 = 5%) |
limits_share_percentage |
Decimal \| None |
None |
Use this percentage of each limit (useful for multi-process deployments); None means 100% |
Properties¶
rate_limits¶
@property
def rate_limits() -> list[RateLimit]
Get the current rate limit list. The setter deep-copies the list, applies limits_share_percentage, and rebuilds the internal ID map.
limits_share_percentage¶
@property
def limits_share_percentage() -> Decimal
The percentage of limits allocated to this throttler instance. Returns Decimal("100") if not set.
Methods¶
execute_task¶
def execute_task(limit_id: str) -> AsyncRequestContext
Return an async context manager that blocks until capacity is available for the specified rate limit, then logs the task.
Parameters:
limit_id— Must match aRateLimit.limit_idregistered with this throttler.
Returns: AsyncRequestContext
Raises: ValueError if limit_id is not found.
Example:
async with throttler.execute_task("ticker"):
response = await make_api_call()
get_related_limits¶
def get_related_limits(
limit_id: str,
) -> tuple[RateLimit | None, list[tuple[RateLimit, int]]]
Return the primary RateLimit for limit_id and a list of (limit, weight) tuples for that limit plus all linked limits.
Returns: (rate_limit, related_limits) — rate_limit is None if not found.
copy¶
def copy() -> AsyncThrottler
Create a new AsyncThrottler with the same configuration. Useful when you need an independent throttler for a different context.
AsyncRequestContext¶
networkpype.throttler.context
The context manager returned by execute_task. Manages capacity checking and task logging.
class AsyncRequestContext
Key Methods¶
flush¶
def flush() -> None
Remove expired entries from the task log (entries older than time_interval * (1 + safety_margin_pct)).
within_capacity¶
def within_capacity() -> bool
Check whether executing a new task would stay within all related rate limits. Returns False if any limit would be exceeded.
acquire¶
async def acquire() -> None
Wait until capacity is available, then log the task. Called automatically by __aenter__.
Constants¶
| Constant | Value | Description |
|---|---|---|
MAX_CAPACITY_REACHED_WARNING_INTERVAL |
30.0 |
Minimum seconds between "capacity reached" log warnings |
Example¶
from decimal import Decimal
from networkpype.throttler.throttler import AsyncThrottler
from networkpype.throttler.rate_limit import RateLimit, LinkedLimitWeightPair
throttler = AsyncThrottler(
rate_limits=[
RateLimit(limit_id="global", limit=1200, time_interval=60.0),
RateLimit(
limit_id="/v1/ticker",
limit=10,
time_interval=1.0,
linked_limits=[LinkedLimitWeightPair(limit_id="global", weight=1)],
),
],
safety_margin_pct=0.05,
limits_share_percentage=Decimal("100"),
)
async with throttler.execute_task("/v1/ticker"):
# guaranteed to run within limits
data = await fetch_ticker()