Classic Algorithm Rounds — Interview Questions¶
The "DSA round, but you're an AI engineer" file. Even at AI engineer loops, you'll hit one classic algorithms round — LRU cache, trie, union-find, heap problems, sliding window. Distinct from ml-coding-rounds.md (attention, sampling, LoRA) and practical-takehomes.md (multi-hour realistic problems).
What's different about AI-loop DSA: the problems are often framed in AI-adjacent ways. LRU → token cache. Trie → tokenizer prefix tree. Union-find → embedding cluster merging. Heap → top-K retrieval. Sliding window → context window over a token stream. The interviewer wants you to (a) solve the DSA, (b) name the AI context where it appears, (c) discuss what changes when this code runs at production scale (millions of QPS, GBs of embeddings).
The senior tell: state complexity and memory before coding. Discuss what changes at scale. Mention the production library that does this (Redis, FAISS, RocksDB) rather than reimplementing — show you know when to write it and when to import it.
Caches¶
Q: "Implement an LRU cache. Framed as: cache the last N LLM responses keyed by prompt hash."¶
Tags: senior · very-common · coding · source: 2026 AI engineer loops; classic DSA + AI framing
Answer outline: - Reference (HashMap + doubly linked list):
class Node:
def __init__(self, k, v):
self.k, self.v = k, v
self.prev = self.next = None
class LRUCache:
def __init__(self, capacity):
self.cap = capacity
self.cache = {}
self.head = Node(None, None) # MRU sentinel
self.tail = Node(None, None) # LRU sentinel
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
node.prev.next = node.next
node.next.prev = node.prev
def _add_front(self, node):
node.next = self.head.next
node.prev = self.head
self.head.next.prev = node
self.head.next = node
def get(self, k):
if k not in self.cache:
return None
node = self.cache[k]
self._remove(node)
self._add_front(node)
return node.v
def put(self, k, v):
if k in self.cache:
self._remove(self.cache[k])
node = Node(k, v)
self.cache[k] = node
self._add_front(node)
if len(self.cache) > self.cap:
evict = self.tail.prev
self._remove(evict)
del self.cache[evict.k]
maxmemory-policy allkeys-lru, not roll your own.
- Key design: hash of the canonicalized prompt (normalize whitespace, strip volatile fields like timestamps). For semantic caching, key by embedding bucket — see follow-ups.
- TTL matters: an LLM response can go stale (model update, RAG corpus change). Most prompt-cache systems combine LRU eviction and TTL invalidation.
- Numbers to drop: "Redis LRU eviction: configurable per-key; typical TTL 1h-24h for LLM caches", "cache hit rate target: 30-60% on chat workloads, 60-80% on stable system prompts"
Common follow-ups: - "Make it thread-safe." - "Extend to LFU (least frequently used)." - "How would you do semantic caching (similar prompts hit the same entry)?"
Traps:
- Using OrderedDict and claiming it's from-scratch. Acknowledge the shortcut: "in real code I'd use collections.OrderedDict with move_to_end() and popitem(last=False); from-scratch is HashMap + DLL for the interview."
- Forgetting to remove the evicted key from the hash map.
Related cross-cutting: Cost & latency
Related module: learning/02_ai_infrastructure/05_agent_performance_economics/
Q: "Implement a TTL cache."¶
Tags: mid · common · coding · source: 2026 AI engineer loops; foundational cache probe
Answer outline:
- Two designs:
- Lazy expiration: store (value, expiry_ts). On get, check expiry; if past, evict and return miss. Simple, but expired entries linger until accessed.
- Active expiration: heap of (expiry_ts, key); background sweep evicts expired entries. Bounded memory but more complex.
- Lazy version:
import time
class TTLCache:
def __init__(self):
self.store = {} # k -> (v, expiry)
def get(self, k):
if k not in self.store:
return None
v, exp = self.store[k]
if time.monotonic() > exp:
del self.store[k]
return None
return v
def put(self, k, v, ttl_seconds):
self.store[k] = (v, time.monotonic() + ttl_seconds)
Common follow-ups: - "Add LRU eviction on top." - "How does Redis combine TTL and LRU?"
Traps:
- Using time.time() instead of time.monotonic(). NTP jumps can break expiry.
Related cross-cutting: Cost & latency
Related module: learning/02_ai_infrastructure/05_agent_performance_economics/
Tries¶
Q: "Implement a trie. Frame: tokenizer prefix lookup."¶
Tags: senior · common · coding · source: 2026 AI engineer loops; classic DSA + tokenization framing
Answer outline: - Reference:
class TrieNode:
__slots__ = ('children', 'is_end', 'token_id')
def __init__(self):
self.children = {}
self.is_end = False
self.token_id = None
class Trie:
def __init__(self):
self.root = TrieNode()
def insert(self, word, token_id=None):
node = self.root
for ch in word:
if ch not in node.children:
node.children[ch] = TrieNode()
node = node.children[ch]
node.is_end = True
node.token_id = token_id
def longest_prefix_match(self, text, start=0):
# returns (matched_end, token_id) of the longest token that's a prefix of text[start:]
node = self.root
best_end, best_id = start, None
i = start
while i < len(text) and text[i] in node.children:
node = node.children[text[i]]
i += 1
if node.is_end:
best_end, best_id = i, node.token_id
return best_end, best_id
tokenizers, OpenAI tiktoken) are Rust + DAWG/finite-state automaton, not Python tries. Bring it up to show awareness.
- Use __slots__ to cut memory for million-node tries. Bring it up if asked about memory.
- Numbers to drop: "memory per node: ~200 bytes in pure Python without slots, ~50 with", "GPT-4 tokenizer: 100k tokens, trie depth ~10-20 bytes typical"
Common follow-ups: - "Make it memory-efficient for a million words." - "Implement autocomplete on top." - "What's a DAWG and why is it better?"
Traps:
- Using a list of 26 children. Assumes ASCII lowercase; breaks on tokenization.
- Forgetting is_end. Can't distinguish "an" from "ant" prefix.
Related cross-cutting: Architecture choices
Related module: learning/00_ai_foundation/02_tokens_embeddings_context/
Q: "Autocomplete on a trie: given a prefix, return the top-K completions by frequency."¶
Tags: senior · common · coding · source: 2026 AI engineer loops; search/autocomplete-flavored probe
Answer outline:
- Augment each trie node with the top-K completions through it (precomputed) or use a heap traversal at query time.
- Precomputed version (faster query, more memory):
- Each node stores top_k: list of (frequency, word) for the top-K most frequent words in its subtree.
- Build bottom-up after all insertions.
- Query: walk to the prefix node, return its top_k.
- On-the-fly version (less memory, slower query):
- Walk to the prefix node.
- DFS the subtree, collect (freq, word) for all complete words.
- Heap-select top K.
- Reference (on-the-fly):
import heapq
def autocomplete(trie, prefix, k):
node = trie.root
for ch in prefix:
if ch not in node.children:
return []
node = node.children[ch]
results = []
def dfs(n, path):
if n.is_end:
heapq.heappush(results, (n.freq, ''.join(path)))
if len(results) > k:
heapq.heappop(results)
for c, child in n.children.items():
dfs(child, path + [c])
dfs(node, list(prefix))
return sorted(results, reverse=True)
Common follow-ups: - "Handle fuzzy autocomplete (one typo)." - "Update frequencies online."
Traps: - Returning all completions then sorting. O(N log N); for large subtrees, use a heap.
Related cross-cutting: Cost & latency
Related module: learning/01_ai_engineering/14_retrieval_ranking/
Heap / top-K¶
Q: "Top-K most similar embeddings to a query embedding, from N candidates."¶
Tags: senior · very-common · coding · source: 2026 AI engineer loops; bedrock RAG-coding probe
Answer outline: - Brute-force baseline (fine for the round):
import heapq
import numpy as np
def top_k_similar(query, candidates, k):
# query: (D,), candidates: (N, D), both L2-normalized
scores = candidates @ query # cosine since normalized
# use a min-heap of size k
heap = []
for i, s in enumerate(scores):
if len(heap) < k:
heapq.heappush(heap, (s, i))
elif s > heap[0][0]:
heapq.heapreplace(heap, (s, i))
return sorted(heap, reverse=True)
np.argpartition(scores, -k)[-k:]: O(N) selection, faster than heap for small K.
- torch.topk(scores, k): GPU-friendly.
- Heap variant above: best when you stream candidates rather than have them in memory at once.
- The senior callout: for N > 10^6, this brute force is too slow. Use ANN: FAISS / HNSW / IVF / Pinecone. Mention the recall/latency tradeoff: ANN typically 95-99% recall at 10-100× speedup. See vector-databases.md.
- Numbers to drop: "brute force breakpoint: ~10^5-10^6 vectors before ANN wins", "ANN recall typical: 95-99% at 10-100× speedup over exact", "FAISS IVF-PQ for billion-scale"
Common follow-ups: - "How would FAISS do this?" - "What if N is too large to fit in memory?" - "Streaming top-K (candidates arrive one at a time)."
Traps: - Sorting all N scores. O(N log N) when O(N + K log N) is enough. - Not normalizing the embeddings and calling it "cosine similarity".
Related cross-cutting: Cost & latency, Architecture choices
Related module: learning/01_ai_engineering/14_retrieval_ranking/, learning/01_ai_engineering/15_vector_databases/
Q: "K-way merge: combine top-K results from multiple shards."¶
Tags: senior · common · coding · source: 2026 AI engineer loops; sharded-retrieval probe
Answer outline: - Setup: you have M sorted lists of top-K results from M shards (each list sorted by score, descending). Want the global top-K. - Min-heap of (-score, shard_id, list_position):
import heapq
def merge_top_k(shards_results, k):
# shards_results: list of lists, each pre-sorted descending by score
heap = []
for shard_id, lst in enumerate(shards_results):
if lst:
s, item = lst[0]
heapq.heappush(heap, (-s, shard_id, 0, item))
result = []
while heap and len(result) < k:
neg_s, sid, pos, item = heapq.heappop(heap)
result.append((-neg_s, item))
if pos + 1 < len(shards_results[sid]):
ns, nitem = shards_results[sid][pos + 1]
heapq.heappush(heap, (-ns, sid, pos + 1, nitem))
return result
Common follow-ups: - "What if scores are on different scales across shards?" - "Handle a slow shard (partial results)."
Traps: - Concatenating all results and sorting. Wastes the shards' work.
Related cross-cutting: Architecture choices
Related module: learning/01_ai_engineering/14_retrieval_ranking/, learning/01_ai_engineering/15_vector_databases/
Union-find¶
Q: "Cluster embeddings: group items that are within similarity threshold τ of each other."¶
Tags: senior · occasional · coding · source: 2026 AI engineer loops; dedup / clustering probe
Answer outline:
- Build edges: for each pair (i, j) with sim(i, j) > τ, union them.
- Naive: O(N²) similarity comps. For large N, use ANN to get candidate pairs.
- Union-find with path compression + union by rank:
class UnionFind:
def __init__(self, n):
self.parent = list(range(n))
self.rank = [0] * n
def find(self, x):
while self.parent[x] != x:
self.parent[x] = self.parent[self.parent[x]] # path compression
x = self.parent[x]
return x
def union(self, x, y):
rx, ry = self.find(x), self.find(y)
if rx == ry:
return False
if self.rank[rx] < self.rank[ry]:
rx, ry = ry, rx
self.parent[ry] = rx
if self.rank[rx] == self.rank[ry]:
self.rank[rx] += 1
return True
def cluster(embeddings, threshold):
n = len(embeddings)
uf = UnionFind(n)
# use ANN to find candidate pairs at scale; brute force shown here
for i in range(n):
for j in range(i+1, n):
if cosine(embeddings[i], embeddings[j]) > threshold:
uf.union(i, j)
clusters = {}
for i in range(n):
r = uf.find(i)
clusters.setdefault(r, []).append(i)
return list(clusters.values())
Common follow-ups: - "What about transitive closure issues (A~B, B~C, A≠C)?" - "Pick DBSCAN vs union-find here."
Traps: - No path compression. Degrades to O(N) find on long chains. - Brute-force pair check at million-scale. Use ANN.
Related cross-cutting: Architecture choices
Related module: learning/01_ai_engineering/08_rag_system_design/
Sliding window¶
Q: "Streaming context window: process a token stream, maintain a window of the last N tokens, compute a stat per step."¶
Tags: mid · common · coding · source: 2026 AI engineer loops; streaming-flavored probe
Answer outline: - Generic deque-based pattern:
from collections import deque
def sliding_window_stat(stream, window_size, stat_fn):
buf = deque()
for tok in stream:
buf.append(tok)
if len(buf) > window_size:
buf.popleft()
yield stat_fn(buf)
Common follow-ups: - "Maintain a rolling max — what data structure?" - "Stream comes faster than you can process — how do you handle backpressure?"
Traps:
- Using a list with pop(0). O(N) per step.
Related cross-cutting: Cost & latency
Related module: learning/01_ai_engineering/24_safety_guardrails/, learning/01_ai_engineering/22_evals_production/
Q: "Detect repeated n-grams in a generation stream (for repetition penalty / detection)."¶
Tags: senior · occasional · coding · source: 2026 AI engineer loops; decoding-quality probe
Answer outline: - Maintain a counter (or set) of all n-grams seen so far; on each new token, form the latest n-gram and check. - Reference:
from collections import deque
def has_repeat_ngram(stream, n):
seen = set()
window = deque(maxlen=n)
for tok in stream:
window.append(tok)
if len(window) == n:
gram = tuple(window)
if gram in seen:
yield True
else:
seen.add(gram)
yield False
else:
yield False
RepetitionPenaltyLogitsProcessor.
- AI framing: degenerate generation (the same phrase looping) is a common production failure mode, especially with small models or high beam widths. n-gram detection drives both penalty samplers and degenerate-output classifiers.
- Numbers to drop: "n typically 3-5", "HF default repetition penalty: 1.0 (off); 1.1-1.3 to mitigate looping", "stronger penalty hurts coherence"
Common follow-ups: - "Implement repetition penalty in a sampler." - "Detect copy-from-prompt regurgitation."
Traps: - Using a list for the n-gram (unhashable). Use tuples for set membership.
Related cross-cutting: Evaluation & quality
Related module: learning/01_ai_engineering/03_decoding_strategies/
Rate limiting¶
Q: "Implement a token bucket rate limiter, framed as per-tenant LLM-call rate limiting."¶
Tags: senior · common · coding · source: 2026 AI engineer loops; production-flavored probe
Answer outline: - Token bucket: capacity C, refill rate R tokens/sec. Each request consumes K tokens. Allow if bucket has K; else reject. - Reference:
import time
class TokenBucket:
def __init__(self, capacity, refill_per_sec):
self.cap = capacity
self.rate = refill_per_sec
self.tokens = capacity
self.last = time.monotonic()
def try_consume(self, n):
now = time.monotonic()
elapsed = now - self.last
self.tokens = min(self.cap, self.tokens + elapsed * self.rate)
self.last = now
if self.tokens >= n:
self.tokens -= n
return True
return False
tenant_id -> TokenBucket. Lazy-create on first call.
- AI framing callouts:
- LLM cost is non-uniform: cost ≈ input_tokens + output_tokens × multiplier. Sometimes consume token count rather than 1-per-request.
- Two-tier limiter: per-tenant + per-global. Prevents one tenant from saturating shared capacity.
- Output is unknown at consume time. Either reserve worst-case (max_tokens) on the way in and refund on the way out, or charge on the way out (riskier — overage possible).
- At scale: distributed token bucket via Redis with INCRBY + Lua, or rely on the LLM provider's rate-limit headers (x-ratelimit-remaining-tokens).
- Numbers to drop: "per-tenant typical: 100-1000 req/min for paid tiers", "OpenAI / Anthropic expose remaining quota in response headers — backoff on those", "reserve worst-case to prevent overrun"
Common follow-ups: - "Make it distributed across instances." - "How do you handle a burst that fits the capacity but starves regular traffic?"
Traps: - Refilling per request only. Idle buckets never catch up to capacity. - Using wall clock instead of monotonic. NTP jumps break refill math.
Related cross-cutting: Cost & latency, Production patterns
Related module: learning/01_ai_engineering/04_resilient_agent_systems/
Graph traversal¶
Q: "Detect a cycle in a tool-call dependency graph (agent planning)."¶
Tags: senior · occasional · coding · source: 2026 AI engineer loops; agent-flavored DSA probe
Answer outline: - Standard DFS with three colors (white = unvisited, gray = on current path, black = done):
WHITE, GRAY, BLACK = 0, 1, 2
def has_cycle(graph):
color = {node: WHITE for node in graph}
def dfs(u):
color[u] = GRAY
for v in graph.get(u, []):
if color[v] == GRAY:
return True
if color[v] == WHITE and dfs(v):
return True
color[u] = BLACK
return False
return any(dfs(u) for u in graph if color[u] == WHITE)
Common follow-ups: - "Topo-sort the graph after confirming no cycle." - "What if cycles are allowed but bounded?"
Traps: - Using a single visited set. Can't distinguish cross-edges from back-edges; false positives.
Related cross-cutting: Production patterns
Related module: learning/01_ai_engineering/17_agents_design/, learning/01_ai_engineering/19_multi_agent_orchestration/
Q: "Shortest path: route a multi-step query through cheapest model chain that satisfies quality threshold."¶
Tags: staff · occasional · coding · source: 2026 senior AI engineer loops; router-design probe
Answer outline: - Model as a weighted graph: each node is a (step, model) state; edges represent step → step transitions; weights are cost. Edge exists only if (model A → model B) handoff preserves quality above a threshold. - Run Dijkstra to find the minimum-cost path that satisfies the threshold. - Reference skeleton:
import heapq
def cheapest_chain(steps, models, cost, quality, threshold):
# state = (step_idx, model_idx)
# transitions: step i, model m -> step i+1, model m' if quality_pair ok
start = (0, 'START')
end = (len(steps), 'END')
dist = {start: 0}
pq = [(0, start)]
while pq:
d, u = heapq.heappop(pq)
if u == end:
return d
if d > dist.get(u, float('inf')):
continue
step_i, m = u
if step_i == len(steps):
continue
for m_next in models[step_i]:
# quality check
if quality(m, m_next, step_i) < threshold:
continue
v = (step_i + 1, m_next)
nd = d + cost(steps[step_i], m_next)
if nd < dist.get(v, float('inf')):
dist[v] = nd
heapq.heappush(pq, (nd, v))
return None
Common follow-ups: - "When does greedy per-step suffice?" - "Bandit instead of fixed routing?"
Traps: - Ignoring the threshold — picking cheapest path without quality constraint.
Related cross-cutting: Cost & latency
Related module: learning/02_ai_infrastructure/05_agent_performance_economics/
Bit manipulation / hashing¶
Q: "Implement MinHash for near-duplicate document detection."¶
Tags: staff · occasional · coding · source: 2026 AI engineer loops; dedup-pipeline probe
Answer outline:
- MinHash: estimate Jaccard similarity between two sets cheaply.
- For each of K hash functions h_k, compute min(h_k(x) for x in set_A). The signature is the K-tuple of mins. P(signatures agree on position k) ≈ Jaccard(A, B).
- Reference:
import hashlib
import random
class MinHash:
def __init__(self, num_perms=128, seed=0):
random.seed(seed)
self.num_perms = num_perms
# K independent hash functions via (a*x + b) mod p tricks
self.a = [random.randint(1, 2**32) for _ in range(num_perms)]
self.b = [random.randint(0, 2**32) for _ in range(num_perms)]
self.p = (1 << 61) - 1
def signature(self, tokens):
mins = [float('inf')] * self.num_perms
for t in tokens:
h = int.from_bytes(hashlib.sha1(t.encode()).digest()[:8], 'big')
for k in range(self.num_perms):
hk = (self.a[k] * h + self.b[k]) % self.p
if hk < mins[k]:
mins[k] = hk
return mins
@staticmethod
def estimate_jaccard(sig_a, sig_b):
return sum(x == y for x, y in zip(sig_a, sig_b)) / len(sig_a)
Common follow-ups: - "Walk through LSH bands and the S-curve." - "When do you prefer SimHash?"
Traps:
- Implementing K independent hashes via different seeds and SHA. Slow. Use the (a·h + b) mod p family over a single base hash.
Related cross-cutting: Architecture choices
Related module: learning/01_ai_engineering/08_rag_system_design/
Discussion-flavored¶
Q: "Walk me through how you'd design a deduper for ingested RAG documents."¶
Tags: staff · common · design · source: 2026 AI engineer loops; senior-tier pipeline probe
Answer outline: - This is a "DSA framing → systems design" round. Expect to discuss data structures and operations, not pure code. - Stages: - Exact dedup: hash full document. Trivial. Catches re-uploads. - Near-duplicate dedup: MinHash + LSH bands for ~Jaccard. Threshold ~0.7-0.9 on shingles depending on tolerance for paraphrases. - Semantic dedup: embed each doc; cluster via union-find with cosine threshold (~0.92-0.95). Heavier; use ANN for candidate generation. Catches paraphrases that share little surface form. - Chunk-level dedup post-chunking: same ideas applied after splitting; prevents the same paragraph appearing 100× via different docs. - Storage: persistent dedup tables in Redis / RocksDB keyed by hash; periodic rebuild from canonical source. - Order of operations: cheap → expensive. Exact first (Bloom filter for membership), then MinHash, then semantic if budget allows. - Numbers to drop: "exact dedup: trivial cost, catches re-uploads", "MinHash signature: ~1KB per doc", "semantic dedup: ~$0.0001 per doc on standard embedding APIs in 2026", "duplicate rate in real corpora: often 20-40%"
Common follow-ups: - "What threshold do you pick and why?" - "How do you handle incremental ingestion (don't redo all pairs)?"
Traps: - Skipping the cheap stages and going straight to semantic. Wastes compute.
Related cross-cutting: Architecture choices, Cost & latency
Related module: learning/01_ai_engineering/08_rag_system_design/, learning/01_ai_engineering/14_retrieval_ranking/