UW WebSocket client patterns

Architectural lessons distilled from reading the official UW WebSocket examples end-to-end. Codex-handoff substrate for the MK3 WS adapter. When MK3 builds the Nautilus WebSocket adapter, this page + uw-api-websockets + the official Python example is the input set.

The examples give us 80% of the receive-loop scaffolding for free. The remaining 20% is documented here as the gap between the example (research skeleton) and production (24/7 IMPULSE recorder).

1. Protocol confirmations (from running code)

PropertyValue
URLwss://api.unusualwhales.com/socket?token={UW_TOKEN}
AuthToken in URL query param (not header, not first-message)
Subscribe frame{"channel":"NAME","msg_type":"join"}, one frame per channel
Unsubscribe frame{"channel":"NAME","msg_type":"leave"}
Message envelopeTwo-element JSON array [channel, payload] (NOT an object)
Server heartbeatNone - client must ping if quiet
Server backpressureDrops messages silently on its side

The two-element array envelope is non-obvious and lower overhead than a JSON object. Python unpacks it as channel, payload = json.loads(message).

2. Patterns to copy

2.1 Reconnect with exponential backoff + jitter

The official example uses:

TIMEOUT_LENGTH = 30           # Receive timeout
MAX_RECONNECT_ATTEMPTS = 5    # Conservative; production wants infinite
RECONNECT_DELAY = 5           # Base delay (seconds)
RECONNECT_DELAY_MAX = 60      # Cap on exponential growth
 
# In the reconnect loop:
delay = min(
    RECONNECT_DELAY * (2 ** (reconnect_attempt - 1)),
    RECONNECT_DELAY_MAX
)
jitter = delay * 0.1 * random.random()   # 10% random additive
reconnect_delay = delay + jitter

The jitter is critical. Without it, every disconnected client reconnects at the same time after the same backoff, creating a thundering-herd reconnect storm on UW’s side. Always include 10% jitter (or higher).

2.2 Receive loop does ZERO processing

while True:
    message = await asyncio.wait_for(ws.recv(), timeout=TIMEOUT_LENGTH)
    data = json.loads(message)
    channel, payload = data
 
    if 'option_trades' in channel:
        message_buffer_optiontrades.append(formatted_message)
    elif 'flow-alerts' in channel:
        message_buffer_flowalerts.append(formatted_message)
    # ... etc.
 
    # Periodic flush only - no per-message I/O

The receive loop is json.loads + buffer append. No file I/O, no DB writes, no transforms. All sinks happen on the periodic flush.

This is the canonical high-throughput pattern: any work in the receive loop reduces the rate at which messages can be drained.

2.3 Channel routing by SUBSTRING, not equality

if 'option_trades' in channel:    # matches "option_trades:TSLA", "option_trades:SPY"
elif 'flow-alerts' in channel:    # matches "flow-alerts" (global)
elif 'gex' in channel:            # matches "gex:SPY", "gex:QQQ"

The incoming channel field includes the ticker suffix (option_trades:TSLA). Exact-match routing breaks for per-ticker channels. Always substring-match (or split on : and match the prefix).

2.4 Dual flush trigger: time AND size

The DuckDB streaming example (ws-stream-spot-greeks-by-strike-by-expiry) shows the explicit constants:

BATCH_SIZE = 500         # Flush at N records
BATCH_TIMEOUT = 10       # Flush after T seconds
 
if len(buffer) >= BATCH_SIZE or time_since_flush >= BATCH_TIMEOUT:
    flush_buffer_to_db(buffer)
    buffer.clear()
    last_flush_time = current_time

Both triggers are needed. Size-only: low-volume channels never flush their last few records. Time-only: high-volume channels hold seconds of data in memory, risking OOM and losing data on crash.

For MK3 IMPULSE recording: BATCH_SIZE around 500-1000, BATCH_TIMEOUT around 1-5 seconds. Tune by channel volume.

2.5 Flush on EVERY exception path

except asyncio.CancelledError:
    await flush_buffers(files_and_buffers); await ws.close(); raise
 
except websockets.exceptions.ConnectionClosed:
    await flush_buffers(files_and_buffers); raise
 
except Exception as e:
    logger.error(f'Error: {e}')
    await flush_buffers(files_and_buffers); raise

Three different exception classes, identical flush-then-raise body. Never lose buffered data at shutdown or on disconnect.

2.6 Quiet-period flush

try:
    message = await asyncio.wait_for(ws.recv(), timeout=TIMEOUT_LENGTH)
except asyncio.TimeoutError:
    # Probe liveness with ping/pong
    pong = await ws.ping()
    await asyncio.wait_for(pong, timeout=10)
    # ALSO flush during the quiet period
    await flush_buffers(files_and_buffers)

If you only flush on the regular tick, low-volume channels’ messages sit in memory through quiet periods. The 30s timeout branch doubles as a “flush whatever’s queued” trigger.

2.7 Client-initiated liveness check

UW does not emit periodic server pings. The client must probe with ws.ping() + wait_for(pong, timeout=10) after each TIMEOUT_LENGTH quiet period. If pong fails, treat connection as dead and reconnect.

This is in the except asyncio.TimeoutError: branch above. Without this, a half-open TCP connection (router NAT timeout, etc.) would sit silently forever.

2.8 Per-channel buffers, NOT one shared buffer

message_buffer_optiontrades = []
message_buffer_greeks = []
message_buffer_flowalerts = []

One buffer per output sink. Why:

  • Per-channel batching (each sink has its own flush threshold).
  • Routes to different files / tables / Parquet partitions cleanly.
  • Decoupled writer failure: if one writer stalls, others keep flushing.

3. Anti-patterns from the example (must fix for production)

3.1 5-and-done reconnect

MAX_RECONNECT_ATTEMPTS = 5    # Research skeleton, not production

For a 24/7 IMPULSE recorder: infinite reconnect with capped backoff. A 30-minute UW brownout shouldn’t permanently kill the data pipeline.

# Production replacement
reconnect_attempt = 0
while True:
    if reconnect_attempt > 0:
        delay = min(RECONNECT_DELAY * (2 ** (reconnect_attempt - 1)),
                    RECONNECT_DELAY_MAX)
        delay += delay * 0.1 * random.random()
        await asyncio.sleep(delay)
        reconnect_attempt = min(reconnect_attempt + 1, 8)  # cap exp growth
    # ... attempt connection ...
    # on success: reconnect_attempt = 0

3.2 No queue depth tracking

The skills page recommends “queue depth and drop counter logging.” The example skips both. For production: emit metric every flush:

logger.info(
    f"flush channel={c} records={len(buffer)} "
    f"avg_lag={...} drops_since_start={drop_count}"
)

Drops can be detected indirectly: track expected vs actual message rate per channel, flag deviations.

3.3 No bounded queue

If file I/O stalls, the buffer grows unbounded. Use a collections.deque(maxlen=50000) (skills page recommends ~50k). When full, either drop oldest or stop reading - decide explicitly.

3.4 Per-channel writer files end in .txt

The example writes to demo_optiontrades_<ts>.txt. For production: Parquet with per-day partitions (e.g. option_trades/dt=YYYY-MM-DD/part-00001.parquet). The DuckDB-streaming example shows the pattern with PyArrow:

arrow_table = pa.Table.from_pylist(buffer)
# write to Parquet here

4. The Kafka alternative (production-grade ingest)

kafka-stream-flow-alerts example reveals that UW also publishes Kafka topics, not just WebSocket. This is a major find.

PropertyValue
Brokersstream.unusualwhales.com:9095
AuthSASL_SSL with username + password (Dan@UW provisioned)
EncodingProtobuf (NOT JSON), one schema per topic
Confluent clientconfluent_kafka.Consumer

Topics available:

TopicProtobuf schemaEquivalent WS channel
option-statesoption_state_client_pb2.OptionState(no direct WS equivalent)
all-trade-reporttrade_report_client_pb2.TradeReport(broader than option trades)
all-option-tradesoptions_client_pb2.OptionTradeoption_trades:* (firehose)
flow-alertsflow_alert_client_pb2.FlowAlertflow-alerts

Why Kafka may be better than WebSocket for MK3:

ConcernWebSocketKafka
BackpressureServer drops on consumer slowdownNative broker-side buffering
Consumer groupsSingle consumer per socketMulti-worker scaling via group_id
ReplayNone (live only)Topic retention allows backfill from offset
EncodingJSON (verbose)Protobuf (compact + schema-evolution)
AuthToken in URLSASL_SSL credentials
OperationsDIY (reconnect, queue, etc.)Confluent client handles much of it

Tradeoff: Kafka requires confluent_kafka (C library bindings) and protobuf schema management. WebSocket is pip install websockets

  • stdlib json. WS is simpler for prototyping; Kafka is the right production choice if UW grants access.

For the MK3 Codex adapter session: explore whether UW will grant Kafka credentials. If yes, prefer Kafka over WebSocket for the IMPULSE recording pipeline (better backpressure handling, replay capability, less DIY scaffolding).

Synthesized from skills page + examples:

+-----------------------------+
| UW WebSocket  OR  Kafka     |
+-----------------------------+
              | (one connection, many channels OR many topics)
              v
+-----------------------------+
| Receive loop                |
| - json.loads OR protobuf    |
| - tag with receive_ts       |
| - put to bounded queue      |
| - update queue_depth metric |
+-----------------------------+
              | (queue, maxlen=50000)
              v
+-----------------------------+
| Per-channel consumer tasks  |
| - dual-trigger flush        |
| - per-channel Parquet writer|
| - write to Postgres optional|
| - emit to Nautilus bus      |
+-----------------------------+
              | (per-channel)
              v
+-----------------------------+
| Parquet shards              |
| dt=YYYY-MM-DD/part-N.parquet|
+-----------------------------+

Two sinks per message:

  1. Local Parquet archive (the IMPULSE-substrate corpus for future OOS validation).
  2. Nautilus MessageBus (the live signal substrate for the SPY hunter).

6. Codex handoff checklist

When MK3 implements the WS (or Kafka) adapter:

  • URL: wss://api.unusualwhales.com/socket?token={UW_TOKEN} (env var only).
  • Subscribe one frame per channel, after connection open.
  • Resubscribe after every reconnect (UW does not persist).
  • Receive loop: json.loads + queue put, nothing else.
  • Bounded queue (maxlen=50000).
  • Channel routing by substring (channel.startswith('gex:')).
  • Dual-trigger flush: BATCH_SIZE=500 OR BATCH_TIMEOUT=5s.
  • Flush-on-every-exception-path (CancelledError, ConnectionClosed, generic).
  • Quiet-period flush + ping/pong liveness on receive timeout.
  • Reconnect: exponential backoff with 10% jitter, INFINITE (not 5-and-done), capped at RECONNECT_DELAY_MAX=60s.
  • Per-channel buffers, per-channel writers.
  • Queue depth metric + drop counter logged on every flush.
  • Two sinks: local Parquet (archive) + Nautilus bus (live).
  • Consider Kafka alternative; if granted credentials, prefer Kafka.

This list IS the WS adapter design spec. The Codex prompt for the session should reference this page + the official Python example + uw-api-websockets + nautilus-dev-adapters Phase 1-7.

7. Source

  • Example repo: https://github.com/unusual-whales/api-examples
  • Multi-channel Python: https://github.com/unusual-whales/api-examples/tree/main/examples/ws-multi-channel-multi-output
  • Multi-channel Node.js: https://github.com/unusual-whales/api-examples/tree/main/examples/ws-multi-channel-multi-output-nodejs
  • DuckDB streaming (dual-trigger flush): https://github.com/unusual-whales/api-examples/tree/main/examples/ws-stream-spot-greeks-by-strike-by-expiry
  • Kafka consumer: https://github.com/unusual-whales/api-examples/tree/main/examples/kafka-stream-flow-alerts
  • Skills page: https://unusualwhales.com/skills/websocket.md

cortana-north-star uw-api-websockets uw-api-examples-repo 2026-05-15-mk3-data-foundation-constraint 2026-05-15-mk3-setup-hunter-architecture nautilus-dev-adapters nautilus-custom-data nautilus-dev-rust