Skip to content

04. Running models in the stream — ASR, vision, and embeddings when the unit of work is expensive and fallible

~24 min read. A consumer crashes mid-batch and restarts. It re-reads the last offsets and re-transcribes three calls it already transcribed. You just paid for six ASR calls to do three calls' work — and the vector index now has duplicate transcripts. Whether that is a disaster or a shrug is the whole chapter.

Built on the replay log, derived artifact, modality cost asymmetry, and exactly-once vs at-least-once named in 00-first-principles.md. Chapter 03 built the empty derived zone; chapter 02 warned that consumer crashes cause redelivery. This file fills the derived zone by running models in the stream and confronts the duplicate that redelivery creates when the work is an expensive model call.


What chapters 02 and 03 settled, and what they left unpriced

Chapter 02 gave us a log that redelivers on consumer crash — at-least-once delivery is the default, and we noted "downstream must dedupe" and moved on. Chapter 03 gave us a derived zone waiting to be filled with transcripts, captions, and embeddings, each a model output pointing back to raw. Now those two threads collide. The thing that fills the derived zone is a model call — Whisper transcribing six minutes of audio, a multimodal embedder vectorizing a screenshot, a text embedder turning a chat into 1024 floats. And the log's at-least-once redelivery means that call can run twice for the same input.

For a cheap, idempotent SQL transform, running twice is free and harmless — the second run overwrites with the identical result. For a model call it is neither. A re-transcribe costs real GPU-seconds or real API dollars, and if the write is a naive append, the index gets two copies of the same transcript, so the copilot retrieves the customer's complaint twice and double-counts it. The expensive-and-fallible nature of the model call is what makes "exactly-once vs at-least-once" stop being a textbook phrase and start being a line on your cloud bill.


What this file solves

A streaming consumer that runs an ASR, vision, or embedding model and crashes mid-batch will, on restart, reprocess the same records — paying twice for the model call and writing duplicate artifacts the copilot then double-counts. This file shows how a stateful stream processor (Flink, Spark Structured Streaming) places these model calls in the stream, how windowing groups a multi-event interaction before embedding, and how to choose between true exactly-once and cheaper at-least-once-plus-idempotent-writes when the costly step is a model, not a database write.


1) Why a model call breaks the simple transform mental model — the need to treat it specially

A normal stream transform is a pure function over a record: parse JSON, map a field, filter. Cheap (microseconds), deterministic, side-effect-free. Re-running it is free. The processor can afford strong guarantees almost for free because the work it is guarding is trivial.

A model call breaks every one of those assumptions. It is slow — Whisper-large transcribes near real time, so a 6-minute call is minutes of GPU; a streaming-ASR variant cuts latency to ~3 seconds per utterance but the call still must largely complete. It is expensive — self-hosted GPU time or a per-minute API fee (~$0.0077/min for some streaming ASR APIs). It is non-deterministic in subtle ways — a model version bump changes the transcript, so "the same input" does not always give the same output. And it has a side effect that costs money, not just a write you can replay for free.

So the real need is not "process the stream." It is process the stream while never paying for the same expensive model call twice and never letting a retry put a duplicate artifact in the index. That reframes the entire exactly-once discussion: the thing you are protecting is no longer a database write, it is a billed, slow, occasionally-changing computation.

CHEAP TRANSFORM                          MODEL-CALL TRANSFORM
 record ─▶ parse/map/filter ─▶ out        record ─▶ [ ASR / embed / caption ] ─▶ out
 µs, deterministic, free to redo          seconds–minutes, $$, version-sensitive
 re-run on retry: harmless                re-run on retry: pay again + duplicate artifact
 guarantee is ~free                       guarantee is a cost/correctness tradeoff

Why this rule exists. Delivery guarantees are cheap to provide when the guarded work is cheap and idempotent, and expensive when the work is costly and side-effecting. Exactly-once over a free transform is a nicety; exactly-once (or its idempotent substitute) over a billed model call is the difference between paying for the work once and paying for it on every redelivery. The model call moves the guarantee from "correctness detail" to "the cost lever."


2) The core picture: the stream processor between log and derived zone

   THE LOG (ch.02)            STREAM PROCESSOR (Flink / Spark SS)              DERIVED ZONE (ch.03)
   partitioned, keyed         stateful operators, checkpointed                  lakehouse + vector index

  ┌─ p0 e e e ─┐    source    ┌──────────────────────────────────────┐   sink   ┌──────────────────┐
  ├─ p1 e e e ─┤───offsets───▶│  route by type                        │──upsert─▶│ transcripts table │
  ├─ p2 e e   ─┤   committed  │   audio ─▶ window(call) ─▶ ASR  model  │  by id   │ captions table    │
  └─ p3 e e e ─┘   atomically │   image ─▶ vision/embed model          │          │ vector index      │
        ▲                     │   text  ─▶ embed model                 │          └──────────────────┘
        │  replay from        │                                        │              ▲ idempotent key
        │  committed offset    │  CHECKPOINT: {offsets + operator state}│              │ = dedupe on retry
        └──────────────────────│  snapshot every N seconds ───────────┘              │
           on crash: restore state + rewind offsets to last checkpoint ──────────────┘
                                ──▶ the model call may re-run; the WRITE must not duplicate

The processor sits between the log (source) and the derived zone (sink). It does three jobs the chapter cares about: route each event to the right model by modality, window multi-event interactions (group a call's audio chunks, or a session's messages) before embedding, and checkpoint so a crash restores state and rewinds offsets to a consistent point. The crash-recovery arrow is the heart of it: on restore, offsets rewind, so the model call between the last checkpoint and the crash re-runs — and the sink write is where you stop that re-run from becoming a duplicate.


3) The running example: 88213's call, chat, and screenshot through the processor

Recall the three events on partition 1 from chapter 02. Trace them through the processor.

offset 90412  text  "payment failed again"   ─▶ embed(text) ─▶ upsert vector id=chat:88213:90412
offset 90413  image s3://.../shot.png         ─▶ multimodal-embed(image) ─▶ upsert id=img:88213:90413
offset 90414  audio s3://.../call.wav          ─▶ window until call complete
                                                  ─▶ ASR(call) ─▶ transcript
                                                  ─▶ embed(transcript) ─▶ upsert id=txn:88213:90414
checkpoint at 90414 commits: {offset=90414, operator window state, sink confirmed}

The text and image are single-event transforms: one model call each, immediate. The audio is different — it needs windowing. A single log event marks the call's start, but ASR wants the whole utterance (or chunked utterances). The processor holds windowed state until the call completes, then runs ASR. That window state is exactly what the checkpoint must capture: if the processor crashes mid-call, on restore it must remember it was 4 minutes into accumulating call 90414, not start over.

Now the failure that defines the chapter. Suppose the processor crashes at the instant after it called ASR on 90414 and got the transcript back, but before the checkpoint committed. On restart, offsets rewind to the last checkpoint (say 90413), so 90414 is re-read and ASR runs again on the same 6-minute call. You have paid for two transcriptions. Whether the index now holds two transcripts depends entirely on the sink: if the write key is txn:88213:90414 (an idempotent upsert keyed on offset/event), the second write overwrites the first — one transcript, two ASR bills. If the write is a blind append, the copilot retrieves the complaint twice.


4) Rule: choose the guarantee at the sink, by what a duplicate costs you

The chapter's invariant: at-least-once delivery is the cheap default, so design the sink write to be idempotent on a stable key; reserve true end-to-end exactly-once for the rare sink where you cannot dedupe. The model call between checkpoints may re-run on recovery — you mostly cannot prevent that cheaply — so the duplicate is controlled at the write, by making the same input produce the same addressable artifact. The expensive duplicate (paying for ASR twice) is bounded by how often you crash; the correctness duplicate (two artifacts in the index) is eliminated by idempotent writes regardless of how often you crash.

WHAT GETS DUPLICATED ON RETRY        HOW YOU CONTROL IT
 the model call (cost)                checkpoint frequency + recovery rate
                                       (bounds how much work re-runs, not zero)
 the artifact (correctness)           idempotent sink: key the write on a stable id
                                       (event offset / content hash) → upsert, not append

Teacher voice. Stop trying to make the model call exactly-once. You almost never can — the call has a side effect (a bill, a GPU-second) the processor cannot roll back. What you can make exactly-once is the visible result: the artifact in the index. Key the write so a re-run overwrites instead of appends, and at-least-once delivery becomes effectively-once at the index while you pay only a bounded, occasional extra model bill. This is the single most useful move in streaming-AI plumbing: idempotent writes turn cheap delivery into correct results.


Both run models in the stream and both offer exactly-once. They differ in how work flows, which decides latency and how the model call sits in the pipeline.

Spark Structured Streaming (micro-batch)

Spark collects records into a micro-batch (trigger interval, practically seconds; floor ~100 ms) and runs the batch through the model. Exactly-once comes from write-ahead logs plus offset tracking; recovery replays the failed micro-batch.

Helps: batches model calls naturally — 200 chats per micro-batch is one batched embedding API call, far cheaper than 200 separate calls; mature in Databricks/Spark shops; Delta sink integrates cleanly.

Hurts: latency floored by the trigger interval (1–5 s typical), so per-event freshness is micro-batch-bound; large operator state makes checkpoints heavier and recovery slower (minutes).

Use when: throughput and cost-per-call matter more than sub-second latency, batching the model call is a big saving, and you already live in Spark.

Flink builds a dataflow graph; records flow through operators continuously with no trigger interval. Exactly-once comes from asynchronous distributed snapshots (Chandy–Lamport checkpoints) that run in the background while processing continues; recovery restores the last checkpoint in seconds.

Helps: sub-second latency (the ASR/embedding starts the instant the event arrives); rich windowing and event-time handling for late/out-of-order data; lightweight async checkpoints; fast recovery.

Hurts: you batch model calls yourself (mini-batch within the operator) if you want call-batching economics; more concepts to operate; less turnkey in a Spark-only shop.

The choice for the running platform

Mixed. Chat and image want low per-event latency and benefit from Flink's event-at-a-time flow — a chat embedded the instant it lands hits the seconds-fresh target. Audio is windowed and ASR-bound anyway, so micro-batch latency is noise against minutes of transcription; either engine is fine. The deciding factor is usually the org: a Databricks shop runs Spark SS with a Delta sink; a latency-sensitive, multi-engine shop runs Flink with an Iceberg/vector sink. The 2026 reality is Flink leads for sub-second continuous work; Spark SS leads for throughput-and-cost batched work inside Spark.

Mini-FAQ. "Why not call the model from a plain Kafka consumer and skip the framework?" You can, and for a single stateless embed it is fine. The framework earns its place the moment you need (a) windowing (group a call's chunks), (b) checkpointed state recovery (don't re-window 4 minutes on crash), and (c) coordinated offset+sink commits for the guarantee. A hand-rolled consumer makes you rebuild checkpointing and exactly-once yourself — the classic "we'll just write a loop" that becomes a worse Flink two years later.


6) The property that changes the design: batch the model call, but bound the window

The dominant cost lever for model-call transforms is batching: most embedding and ASR endpoints are far cheaper per item when you send many at once, because the per-request overhead and GPU warm-up amortize. 200 chats embedded in one call can be ~5–10× cheaper than 200 separate calls. So the processor should accumulate a small batch, then call.

But batching fights freshness. Wait for 200 chats and a quiet customer's message sits in the buffer until the batch fills — the freshness gap opens. The fix is a bounded window: flush on size or time, whichever first. "Embed when 200 messages accumulate or 1 second passes." That caps both cost (rarely tiny batches) and latency (never wait more than the window).

NO BATCH                BATCH BY SIZE ONLY        BOUNDED WINDOW (size OR time)
 1 call per event        wait for 200 then call    flush at 200 msgs OR 1 s, whichever first
 + lowest latency        + cheapest per item       + cheap when busy, fresh when quiet
 − highest cost          − a quiet customer waits   − slightly more complex flush logic
   (no amortization)        forever for batch #200    (the right answer)

This is modality cost asymmetry again, now in the transform: text batches beautifully (many tiny cheap calls → one cheap batched call), audio barely batches (each call is large and windowed independently), images batch moderately. The window size and timeout differ per modality — there is no one window for the platform.

Teacher voice. Every batched stream transform has two flush triggers, never one. Size alone starves the quiet path; time alone wastes the busy path. The bounded window — flush on whichever fires first — is the same shape as Nagle's algorithm in TCP and group-commit in a database: amortize when you can, but never make a lone item wait. If you see a freshness gap that only appears at low traffic, suspect a size-only batch with no timeout.


7) Cost and behavior table: guarantee choices under this workload

Order-of-magnitude for the running platform; verify against your provider.

Choice Model-call cost on retry Duplicate in index? Latency When to use
At-least-once, blind append sink pays again (bounded by crash rate) yes — copilot double-counts lowest almost never; the trap
At-least-once + idempotent upsert sink pays again (bounded) no — overwrite by key low default for embeddings/transcripts
End-to-end exactly-once (txn sink) re-run rolled back, paid once no higher (2-phase commit overhead) sink can't dedupe (e.g., emit to another Kafka topic, external charge)
Exactly-once attempt on a non-transactional sink silently broken the misconfiguration that looks safe

The honest default is row two: cheap at-least-once delivery, idempotent writes keyed on event offset or content hash. You eat a bounded, occasional extra model bill (only on the records between the last checkpoint and a crash) and get correct, duplicate-free results. True end-to-end exactly-once (row three) is worth its two-phase-commit latency only when the sink genuinely cannot dedupe — for example, you forward results to another Kafka topic that triggers a billable downstream action. For a vector-index upsert, idempotency is simpler and cheaper than transactional exactly-once.

Concrete: transcription of 12k calls/day × 6 min. Self-hosted Whisper-large on a GPU pool costs cents per call in compute; a streaming ASR API at ~\(0.0077/min costs ~\)0.046/call, ~\(550/day, ~\)200k/year. If a crash re-runs ~0.1% of calls, the duplicate-cost overhead is ~$200/year — negligible if the sink is idempotent so it does not also corrupt the index. The duplicate's correctness cost (double-counted complaints) is what idempotent writes eliminate for free.


8) Operational signals: watching the transform layer

  • Healthy: per-modality processing latency flat (chat embed p50 ~1–2 s, image ~1–3 s, ASR ~call-length + seconds); model-call error rate near zero; checkpoint duration small and stable; batch sizes near target (not chronically tiny or maxed).
  • First metric to degrade: checkpoint duration / barrier alignment time. As operator state (windows, in-flight calls) grows or a model slows, checkpoints take longer; back-pressure builds upstream and consumer lag (chapter 02) climbs because the transform stalled, not because ingest grew. Checkpoint time is the leading indicator of a transform-layer stall.
  • Misleading metric people watch: model-call throughput (calls/sec). High throughput looks healthy while a fraction of calls error and get silently retried or dropped, or while duplicates inflate the count. Throughput counts attempts, not correct distinct artifacts.
  • First graph an expert opens: end-to-end latency per modality overlaid with checkpoint duration and model-call error rate. They look for latency rising in lockstep with checkpoint time (state/back-pressure problem) versus latency rising with error rate (the model endpoint is the bottleneck).

9) Boundary: where in-stream models fit, and where they don't

  • Strong fit: per-event or small-window transforms where freshness matters and the model call is the point — embedding chats and images as they arrive, transcribing calls for a live copilot. The processor's windowing and checkpointing earn their keep.
  • Pathological: running a heavy, long-latency batch model (a multi-minute document-understanding pipeline, a large re-ranking model) inline in the stream. It stalls the operator, blows checkpoint duration, and back-pressures the whole pipeline. Such work belongs in an async side-path: emit a job, process off-stream, write back.
  • Scale/workload limit that breaks intuition: at high volume, the model endpoint — not the stream framework — is the bottleneck. People tune Flink parallelism expecting speedup and get none because the GPU pool is saturated. The intuition "scale the processor to go faster" fails: the model capacity is the floor, and you scale that (more GPU, batching, a smaller model) or shed load (chapter 02), not the operator count.

10) Wrong model to drop: "turn on exactly-once and duplicates are impossible"

The seductive idea is that flipping the processor's exactly-once setting guarantees no duplicate artifacts and no double billing. It feels safe because the framework advertises exactly-once. The correct model: exactly-once is a property of the end-to-end pipeline, and it holds only if the sink participates — a transactional or idempotent sink. Point a Flink "exactly-once" job at a sink that blindly appends and you get duplicates anyway; the guarantee was never end-to-end. And exactly-once never makes the model call exactly-once — the GPU-second is spent and cannot be un-spent on rollback. Get duplicate-free results by making the sink idempotent (key the upsert); use transactional exactly-once only where the sink supports it and you truly can't dedupe.


11) Other transform-layer failure shapes

  • Duplicate artifact — at-least-once redelivery + non-idempotent append sink; copilot double-counts the same complaint.
  • Re-bill on recovery — checkpoint too infrequent, so a crash re-runs many expensive model calls; cost spikes, not correctness.
  • Window state loss — windowing without checkpointed state; crash loses 4 minutes of accumulated call audio and re-windows from scratch.
  • Poison record stall — one undecodable event or a record the model errors on, retried forever, head-of-line-blocks the partition; needs a dead-letter path.
  • Model-version skew — a model upgrade mid-stream produces transcripts/embeddings inconsistent with earlier ones; cross-modal retrieval (chapter 05) degrades silently until re-derive.
  • Back-pressure from a slow model — the endpoint slows, the operator stalls, checkpoints lengthen, upstream lag grows; looks like an ingest problem but the cause is the transform.
  • Tiny-batch cost blowup — size-only batching with no timeout flushes tiny batches at low traffic, paying per-item rates; or no batching at all.
  • Late/out-of-order audio chunks — a call's chunks arrive out of order; without event-time windowing the transcript is assembled wrong.

12) Pattern transfer

  • Idempotency = the universal retry antidote — keying the sink write on a stable id is the same move as an idempotency key on a payment API or a dedupe key on an at-least-once queue consumer. Anywhere delivery is at-least-once, correctness comes from idempotent writes, not from chasing exactly-once delivery.
  • Bounded window = group commit / Nagle — flush-on-size-or-time is the same amortization shape as database group commit and TCP's Nagle algorithm; the freshness-at-low-traffic failure recurs identically.
  • Back-pressure from a slow operator (chapter 02) — a slow model stalling the operator is the same producer-outruns-consumer geometry, now inside the pipeline; consumer lag rises but the cause is downstream of ingest.
  • Derived-is-rebuildable (chapter 03) — model-version skew is survivable precisely because chapter 03 kept raw immutable; the fix is replay-and-re-derive, the storage chapter's invariant paying off.

13) Design test

  1. Is the sink write idempotent on a stable key (event offset or content hash), so a retried model call overwrites instead of appending?
  2. Have you chosen at-least-once + idempotent writes as the default, reserving transactional exactly-once for sinks that genuinely can't dedupe?
  3. Does every batched model call flush on size or time, so a quiet path never starves?
  4. Is checkpoint frequency tuned so a crash re-runs a bounded, affordable amount of expensive model work?
  5. Is any heavy, long-latency model kept out of the inline operator (async side-path), so it can't stall checkpoints and back-pressure the pipeline?

Where this appears in production

Stream processors running models / transforms: - Uber — large Flink deployment for real-time feature computation and event processing feeding ML systems. - Netflix — Flink for real-time event processing and feature generation in personalization pipelines. - Databricks customers — Spark Structured Streaming with Delta sinks, batching embedding/feature computation per micro-batch. - Alibaba — one of the largest Flink users (originated Blink), running stateful stream processing at extreme scale. - Confluent / Kafka Streams shops — in-stream enrichment and transformation keyed for idempotent writes. - Pinterest — Flink for real-time feature and embedding pipelines feeding recommendation.

Models in the stream (ASR / vision / embedding): - OpenAI / Deepgram / AssemblyAI streaming ASR — per-utterance transcription APIs with ~seconds latency, the audio Δt floor. - Self-hosted Whisper-large pools — GPU transcription where cost is GPU-seconds and batching/scaling is the cost lever. - Voyage voyage-multimodal-3.5 / Cohere Embed 4 — single-call multimodal embedding of interleaved text+image (and video frames), invoked per event or batched. - Zoom / Gong / call-intelligence platforms — transcribe and embed calls in near-real-time for search and copilots, windowing per call. - Twelve Labs / video-understanding APIs — embed video segments in-stream for retrieval. - Fraud platforms (Stripe Radar-style) — feature transforms in-stream with idempotent writes so a retry never double-scores a swipe. - Intercom Fin / support copilots — embed each incoming message as it lands so the live conversation is retrievable within seconds. - AWS Kinesis Data Analytics (Flink) / Managed Flink — managed in-stream transform compute for AWS-native pipelines. - Spotify — real-time audio feature and embedding computation feeding recommendation freshness.


Pause and recall

  1. Why does a model call break the "re-running a transform is free" assumption that holds for SQL transforms?
  2. A consumer crashes after the ASR call but before the checkpoint commits. On restart, what re-runs, and what decides whether the index gets a duplicate?
  3. State the chapter's invariant. Where do you control the cost duplicate, and where do you control the correctness duplicate?
  4. Why is idempotent-write-plus-at-least-once usually better than transactional exactly-once for a vector-index sink?
  5. Why must a batched model call flush on size or time, and what fails if you batch on size alone?
  6. Flink vs Spark Structured Streaming: which gives sub-second per-event latency, and which naturally batches the model call?
  7. At high volume the bottleneck is usually not the stream framework. What is it, and why doesn't more operator parallelism help?
  8. Why is model-version skew survivable on this platform — which earlier invariant saves you?

Interview Q&A

Q1. Your ASR consumer crashed and re-transcribed three calls. How do you stop this from corrupting the index, and can you stop the double billing? A. Two different problems. The correctness duplicate is eliminated by an idempotent sink: key the write on the event offset or content hash (txn:88213:90414) so the re-run upserts/overwrites instead of appending — no duplicate transcript regardless of how often you crash. The double billing you can only bound, not eliminate, by checkpointing often enough that a crash re-runs little work; the model call's side effect (GPU-seconds/API fee) can't be rolled back. Common wrong answer to avoid: "Turn on exactly-once." It doesn't make the model call exactly-once, and it's only end-to-end if the sink participates; an idempotent write is the real fix.

Q2. Why choose at-least-once plus idempotent writes over end-to-end exactly-once? A. For a vector-index/lakehouse sink that supports keyed upserts, idempotency makes redelivery harmless at far lower cost and latency than transactional two-phase commit. You pay only a bounded extra model bill on the few records between last checkpoint and crash, and get duplicate-free results. Reserve transactional exactly-once for sinks that can't dedupe — e.g., emitting to another Kafka topic that triggers a billable action. Common wrong answer to avoid: "Exactly-once is strictly better, always use it." It adds commit overhead and latency, and is pointless when an idempotent upsert already gives correct results.

Q3. Embeddings cost spiked but only at night when traffic is low. What's the bug? A. Size-only batching with no timeout. At low traffic, batches never fill, so either you wait forever (freshness gap) or — if there's a fallback — you flush tiny batches and pay per-item rates without amortization. Fix: bounded window, flush on size or time. The symptom "problem only at low traffic" is the signature of a missing time-based flush. Common wrong answer to avoid: "Scale down the cluster at night." That doesn't fix per-item pricing from tiny batches; the window-flush logic is the bug.

Q4. Flink or Spark Structured Streaming for this platform — defend a choice. A. Chat and image want low per-event latency, where Flink's continuous event-at-a-time flow and lightweight async checkpoints win (sub-second, fast recovery). Audio is windowed and ASR-bound, so micro-batch latency is noise — either engine. Spark SS is the pragmatic pick inside a Databricks/Spark shop with a Delta sink and benefits from natural per-micro-batch call batching. The real deciding factor is usually the org's existing stack. Common wrong answer to avoid: "Flink is always faster so always use it." Spark SS's micro-batch can be cheaper for batched model calls and is simpler in a Spark-native shop; latency need drives it.

Q5. End-to-end latency is climbing and so is checkpoint duration, but ingest rate is flat. Where's the problem? A. The transform layer, not ingest. Rising checkpoint duration alongside latency means growing operator state or a slowing model is back-pressuring the pipeline; consumer lag climbs because the processor stalled, not because more events arrived. Check model-call latency/error rate and operator state size, not the producers. Common wrong answer to avoid: "Scale ingest / add partitions." Ingest is flat; the bottleneck is downstream in the transform, often the model endpoint saturating.

Q6. (Cumulative) The copilot retrieves a customer's complaint twice. Is this a chapter-02 backpressure issue, a chapter-04 transform issue, or a chapter-05 index issue? A. Two identical artifacts in the index points at chapter 04: at-least-once redelivery (chapter 02) is expected, but the duplicate appearing means the sink wasn't idempotent — a transform-layer write-keying bug. Confirm by checking whether the two artifacts share a source offset/content hash; if so, they should have collapsed to one upsert. The fix is at the write key, not at ingest or in the index structure. Common wrong answer to avoid: "The log delivered it twice, so it's an ingestion bug." At-least-once delivery is by design; the bug is the non-idempotent write that let redelivery become a duplicate.


Design/debug exercise (10 min)

Step 1 — Modeled example. Transform plan for the audio modality:

Modality: audio
Routing:    event{type:audio} → audio operator
Windowing:  event-time window per call_id; hold until call complete (checkpointed state)
Model:      ASR(call) → transcript; then embed(transcript)
Delivery:   at-least-once from the log
Sink:       UPSERT key = txn:<customer>:<source_offset>   (idempotent → no duplicate)
Batching:   ASR per-call (little batching); embed transcripts batched (size 64 OR 1 s)
Checkpoint: every 30 s → crash re-runs ≤30 s of ASR work (bounded re-bill)
Guarantee:  effectively-once at the index via idempotent key; model call may re-run (paid, bounded)

Step 2 — Your turn. Write the transform plan for the chat modality (80k/day, bursty, cheap to embed). Decide: routing, is there windowing, batch size and timeout for the embedding call, sink idempotency key, checkpoint frequency, and which engine (Flink vs Spark SS) you'd lean toward and why. (Hint: chat is the strongest case for batching and for sub-second freshness.)

Step 3 — Reproduce from memory. Redraw the section-2 diagram (log → stream processor with route/window/checkpoint → idempotent sink), label where the model call re-runs on crash recovery and where the duplicate is stopped, and write one sentence connecting idempotent writes to chapter 03's "derived is rebuildable" and chapter 02's at-least-once delivery.


Operational memory

This chapter explained why running ASR, vision, and embedding models in the stream turns the textbook "exactly-once vs at-least-once" question into a real cost-and-correctness lever: a crash re-reads offsets and re-runs the expensive model call, and a naive sink turns that into double billing plus duplicate artifacts the copilot double-counts. The important idea is that you control the correctness duplicate at the sink with idempotent, keyed writes, and only bound the cost duplicate with checkpoint frequency — you cannot un-spend a GPU-second.

You learned to place model calls in a stateful processor (Flink for sub-second continuous, Spark SS for batched throughput), window multi-event interactions like a call before embedding, batch model calls on a bounded size-or-time window so the quiet path never starves, and key the sink write on event offset or content hash so at-least-once delivery becomes effectively-once at the index. That solves the opening crash-and-duplicate failure cheaply, leaving only a small, bounded extra model bill.

Carry this diagnostic forward: when you see a duplicate artifact, check the sink's write key before blaming the log; when cost spikes only at low traffic, suspect size-only batching with no timeout; when latency and checkpoint duration rise together while ingest is flat, the model endpoint is back-pressuring the transform. Idempotent writes are the universal retry antidote — chase them, not exactly-once delivery.

Remember:

  • A model call is slow, billed, version-sensitive, and side-effecting — re-running it on retry costs money and risks duplicates.
  • Make the sink idempotent on a stable key (offset/content hash); that turns cheap at-least-once delivery into correct, duplicate-free results.
  • Exactly-once is end-to-end only if the sink participates, and it never makes the model call itself exactly-once.
  • Batch model calls on a bounded window — flush on size OR time — or the quiet path starves and freshness opens at low traffic.
  • At high volume the model endpoint is the bottleneck, not the stream framework — scale the model or shed load, not operator count.

Bridge. We can now fill the derived zone correctly — transcripts, captions, and per-modality embeddings landing without duplicates and without paying twice. But an embedding sitting in a sink is not yet retrievable: the copilot asks "what's relevant to this payment failure?" and that query has to find the right chunk across text, audio-derived transcript, and image — fast, fresh, and across modalities. How do you index per-modality embeddings so a text query can surface a screenshot, keep the index updating as new vectors stream in without rebuilding it nightly, and combine vector similarity with metadata filters? The next file builds multimodal indexing and retrieval. → 05-multimodal-indexing-and-retrieval.md