WebSocketManager¶
networkpype.websocket.manager
High-level manager for WebSocket communication. Orchestrates connection lifecycle, pre/post processing, and authentication. Obtain instances via ConnectionManagersFactory.get_ws_manager().
class WebSocketManager
Constructor¶
WebSocketManager(
connection: WebSocketConnection,
ws_pre_processors: list[WebSocketPreProcessor] | None = None,
ws_post_processors: list[WebSocketPostProcessor] | None = None,
auth: Auth | None = None,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
connection |
WebSocketConnection |
— | Underlying connection |
ws_pre_processors |
list[WebSocketPreProcessor] \| None |
None |
Pre-processors for outgoing messages |
ws_post_processors |
list[WebSocketPostProcessor] \| None |
None |
Post-processors for incoming messages |
auth |
Auth \| None |
None |
Authentication handler |
Properties¶
last_recv_time¶
@property
def last_recv_time() -> float
Unix timestamp (seconds) of the last received data frame from the underlying connection.
Methods¶
connect¶
async def connect(
ws_url: str,
*,
ping_timeout: float = 10,
auto_ping: bool = False,
message_timeout: float | None = None,
ws_headers: dict[str, Any] | None = None,
verify_ssl: bool = True,
) -> None
Establish the WebSocket connection. All keyword parameters are forwarded to WebSocketConnection.connect.
disconnect¶
async def disconnect() -> None
Close the WebSocket connection.
subscribe¶
async def subscribe(request: WebSocketRequest) -> None
Send a subscription request. Currently an alias for send. Future versions will support automatic re-subscription on reconnection.
send¶
async def send(request: WebSocketRequest) -> None
Send a WebSocket message through the full pipeline: pre-process → authenticate (if is_auth_required) → send.
Parameters:
request— The message to send.
ping¶
async def ping() -> None
Send a WebSocket ping frame to keep the connection alive or check health.
iter_messages¶
async def iter_messages() -> AsyncGenerator[WebSocketResponse | None]
Async generator that yields messages as they arrive. Runs post-processors on each message. Stops when the connection closes.
Yields: WebSocketResponse | None
Example:
async for message in ws_manager.iter_messages():
if message:
handle(message.data)
receive¶
async def receive() -> WebSocketResponse | None
Receive and post-process a single message.
Returns: WebSocketResponse | None
Example¶
from networkpype.factory import ConnectionManagersFactory
from networkpype.throttler.rate_limit import RateLimit
from networkpype.throttler.throttler import AsyncThrottler
from networkpype.websocket.request import WebSocketJSONRequest
throttler = AsyncThrottler(
rate_limits=[RateLimit(limit_id="default", limit=10, time_interval=1.0)]
)
factory = ConnectionManagersFactory(throttler=throttler)
ws_manager = await factory.get_ws_manager()
await ws_manager.connect("wss://stream.example.com/ws")
await ws_manager.subscribe(
WebSocketJSONRequest(
payload={"method": "SUBSCRIBE", "params": ["btcusdt@trade"]}
)
)
async for message in ws_manager.iter_messages():
if message:
print(message.data)
await ws_manager.disconnect()
await factory.close()
WebSocket Processors¶
WebSocketPreProcessor¶
class WebSocketPreProcessor(abc.ABC)
@abstractmethod
async def pre_process(request: WebSocketRequest) -> WebSocketRequest
Modify outgoing messages before they are sent.
WebSocketPostProcessor¶
class WebSocketPostProcessor(abc.ABC)
@abstractmethod
async def post_process(response: WebSocketResponse) -> WebSocketResponse
Transform incoming messages before they reach the application.