Skip to content

Streaming Chatbot — Analysis

What this implementation provides

  • Multi-turn history. Each turn appends user and assistant messages; subsequent turns include the full conversation in the prompt.
  • History cap. max_history_messages (default 20) prevents unbounded growth; oldest messages trimmed when the cap is exceeded.
  • System prompt carried at the start of every request.
  • Retry on transient failures with exponential backoff capped at 5s per attempt.
  • Streaming output. Tokens print as they arrive; the full response is appended to history when streaming completes.
  • Graceful CLI shutdown on EOF or Ctrl-C.

The history-cap design

A long conversation accumulates history. After 100 turns, the prompt to the model is enormous — cost climbs linearly, latency climbs (more tokens to process), and the model's attention is diluted.

Cap strategies:

  • Hard cap on message count (this implementation). Trim oldest after threshold. Simple; loses older context entirely.
  • Sliding window with summarisation. When approaching the cap, summarise the oldest N messages into a single context message; keep that plus recent turns. Preserves narrative; more complex.
  • Token-based cap. Trim based on token count, not message count. More accurate budget management; requires a tokenizer.

For a chatbot supporting common chat flows, message-count cap is sufficient. For long-form support agents handling 50+ turn conversations, summarisation is worth the engineering cost.

The retry pattern

for attempt in 1..max_retry_attempts:
    try:
        return stream(messages)
    except TransientAPIError:
        if attempt == max: raise
        sleep(min(attempt, 5))
  • Exponential-ish backoff. sleep(attempt) capped at 5s. Could be smarter (jitter, true exponential 2^attempt), but bounded backoff prevents amplifying provider failures.
  • Distinguished error type. TransientAPIError is retried; other exceptions surface immediately. The provider's SDK should map rate limits and 5xx to this type; auth errors should not be retried.
  • Bounded attempts. Always; otherwise a downstream that's permanently broken produces infinite retries.

What the mock layer is for

The chatbot uses _mock_stream for tests. For production, replace with:

def _mock_stream(self, messages):
    from openai import OpenAI
    client = OpenAI()
    stream = client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        stream=True,
    )
    for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta

Or for Anthropic, the same pattern with client.messages.stream. The interface (yield tokens, raise TransientAPIError on retryable failures) is the contract; the provider is the swap point.

What this exercise tests

The skeleton works, but the production-grade adds are:

  1. History management without unbounded growth.
  2. Retry on transient failures with bounded attempts.
  3. Pluggable streaming layer (mocked for tests, real provider for production).
  4. Graceful failure surfaced to the user.
  5. Test coverage for retry, streaming, and history.

A naive implementation skips 1, 2, 4 — and runs fine in development. Production breaks them all on the first rate-limit, the 100th turn, the first transient downtime.

Interview probes

  • "How do you manage history in a long-running chat?"
  • "What's your retry policy for transient API failures, and why?"
  • "How do you stream tokens to the user?"
  • "What happens when the provider rate-limits in the middle of a turn?"
  • "How would you A/B test two system prompts?"
  • "How do you preserve context when history exceeds the model's window?"

Each has a one-paragraph answer drawn from the design choices in this implementation.