Streaming Chatbot — Analysis¶
What this implementation provides¶
- Multi-turn history. Each turn appends user and assistant messages; subsequent turns include the full conversation in the prompt.
- History cap.
max_history_messages(default 20) prevents unbounded growth; oldest messages trimmed when the cap is exceeded. - System prompt carried at the start of every request.
- Retry on transient failures with exponential backoff capped at 5s per attempt.
- Streaming output. Tokens print as they arrive; the full response is appended to history when streaming completes.
- Graceful CLI shutdown on EOF or Ctrl-C.
The history-cap design¶
A long conversation accumulates history. After 100 turns, the prompt to the model is enormous — cost climbs linearly, latency climbs (more tokens to process), and the model's attention is diluted.
Cap strategies:
- Hard cap on message count (this implementation). Trim oldest after threshold. Simple; loses older context entirely.
- Sliding window with summarisation. When approaching the cap, summarise the oldest N messages into a single context message; keep that plus recent turns. Preserves narrative; more complex.
- Token-based cap. Trim based on token count, not message count. More accurate budget management; requires a tokenizer.
For a chatbot supporting common chat flows, message-count cap is sufficient. For long-form support agents handling 50+ turn conversations, summarisation is worth the engineering cost.
The retry pattern¶
for attempt in 1..max_retry_attempts:
try:
return stream(messages)
except TransientAPIError:
if attempt == max: raise
sleep(min(attempt, 5))
- Exponential-ish backoff.
sleep(attempt)capped at 5s. Could be smarter (jitter, true exponential2^attempt), but bounded backoff prevents amplifying provider failures. - Distinguished error type.
TransientAPIErroris retried; other exceptions surface immediately. The provider's SDK should map rate limits and 5xx to this type; auth errors should not be retried. - Bounded attempts. Always; otherwise a downstream that's permanently broken produces infinite retries.
What the mock layer is for¶
The chatbot uses _mock_stream for tests. For production, replace with:
def _mock_stream(self, messages):
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
Or for Anthropic, the same pattern with client.messages.stream. The interface (yield tokens, raise TransientAPIError on retryable failures) is the contract; the provider is the swap point.
What this exercise tests¶
The skeleton works, but the production-grade adds are:
- History management without unbounded growth.
- Retry on transient failures with bounded attempts.
- Pluggable streaming layer (mocked for tests, real provider for production).
- Graceful failure surfaced to the user.
- Test coverage for retry, streaming, and history.
A naive implementation skips 1, 2, 4 — and runs fine in development. Production breaks them all on the first rate-limit, the 100th turn, the first transient downtime.
Interview probes¶
- "How do you manage history in a long-running chat?"
- "What's your retry policy for transient API failures, and why?"
- "How do you stream tokens to the user?"
- "What happens when the provider rate-limits in the middle of a turn?"
- "How would you A/B test two system prompts?"
- "How do you preserve context when history exceeds the model's window?"
Each has a one-paragraph answer drawn from the design choices in this implementation.