Non-streaming AI is a vending machine. You put in a request, you wait, you get the whole thing at once. SSE-streamed AI is a conversation — words arrive the moment they're generated, at the speed of thought. For a system running eleven workers in parallel, the difference isn't UX polish. It's the difference between a product and a demo.
This is post three of three. We covered how to run it cheap (prompt caching) and how to make it think differently (epistemic fingerprints). Now we cover how to ship it in real time. SSE. Cloudflare Workers. Eleven parallel streams. Production implementation, no filler.
What SSE Actually Is
Server-Sent Events is a browser API — a persistent HTTP connection where the server pushes data to the client continuously, rather than the client polling repeatedly. The client opens one EventSource connection. The server sends chunks formatted as data: ...\n\n. The client receives them as events, one at a time, as they arrive.
For AI responses this is the correct primitive. The model doesn't generate the complete response and then send it — it generates token by token. SSE maps perfectly onto that. Each token (or small chunk of tokens) becomes a data event. The client renders it immediately. First token arrives in under 500ms regardless of how long the full response takes to complete.
The Implementation
Cloudflare Workers support streaming responses natively via TransformStream. The Worker acts as a transparent SSE proxy — it calls Anthropic's API with stream: true, reads the response body as a stream, and pipes it back to the client with the correct headers. Here's the full production pattern:
export default { async fetch(req: Request, env: Env): Promise<Response> { // 1. Get the upstream stream from Anthropic const upstream = await fetch('https://api.anthropic.com/v1/messages', { method: 'POST', headers: { 'x-api-key': env.ANTHROPIC_KEY, 'anthropic-version': '2023-06-01', 'anthropic-beta': 'prompt-caching-2024-07-31', 'content-type': 'application/json', }, body: JSON.stringify({ model: 'claude-sonnet-4-20250514', max_tokens: 2048, stream: true, // ← enables SSE from Anthropic system: [{ type: 'text', text: SYSTEM_PROMPT, cache_control: { type: 'ephemeral' }}], messages: [{ role: 'user', content: userMessage }] }) }); // 2. Pipe through a TransformStream — no buffering, pure passthrough const { readable, writable } = new TransformStream(); upstream.body!.pipeTo(writable).catch((err) => { // If Anthropic drops the connection, close gracefully console.error('Stream error:', err); }); // 3. Return SSE response — headers are the whole trick return new Response(readable, { headers: { 'Content-Type': 'text/event-stream', // tells browser: this is SSE 'Cache-Control': 'no-cache', // never buffer 'Connection': 'keep-alive', // maintain persistent connection 'Access-Control-Allow-Origin': '*', // CORS — adjust for production 'X-Accel-Buffering': 'no', // disable nginx proxy buffering } }); } };
If you're behind a reverse proxy (nginx, Cloudflare itself in certain configs), it may buffer your SSE stream before sending it to the client — defeating the whole point. X-Accel-Buffering: no disables this. Without it you'll see the streaming working in local dev but not in production. This is the most common gotcha.
The Client Side
The native EventSource API only supports GET requests. AI calls need POST with a body. So you use fetch() with res.body.getReader() — same streaming behavior, full control. But here’s the production gotcha nobody mentions in the tutorials: network chunks don’t align with SSE event boundaries.
Anthropic sends SSE as data: {...JSON...}\n\n. Your ReadableStream reader receives raw network chunks — and a single TCP packet boundary can land in the middle of a JSON object. If you blindly split('\n') and JSON.parse each line, you’ll hit malformed JSON on every chunk boundary. In local dev this almost never happens because loopback is fast and packets rarely split. In production over real networks it happens constantly. The fix is a carry-over buffer.
function streamAgent(agentId: string, message: string, onChunk: Function) { fetch(`/api/agent/${agentId}`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message }) }).then(async res => { const reader = res.body!.getReader(); const decoder = new TextDecoder(); // THE FIX: carry-over buffer for partial chunk boundaries let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) { onChunk(null); break; } // Append new bytes to whatever was left from last chunk buffer += decoder.decode(value, { stream: true }); // stream:true handles split UTF-8 // Split on \n\n — the SSE event boundary // Only process COMPLETE events; hold the tail in buffer const events = buffer.split('\n\n'); buffer = events.pop() ?? ''; // last item is incomplete — hold for next chunk for (const event of events) { for (const line of event.split('\n')) { if (line.startsWith('event: error')) { onChunk({ error: true }); break; } if (!line.startsWith('data: ')) continue; const raw = line.slice(6); if (raw === '[DONE]') continue; try { const evt = JSON.parse(raw); // always a complete JSON object now if (evt.type === 'content_block_delta' && evt.delta?.type === 'text_delta') { onChunk(evt.delta.text); } } catch (e) { console.warn('SSE parse error (should be rare with buffer):', raw.slice(0,60)); } } } } }); }
Splitting on '\n\n' gives you complete SSE events — except the last item, which is whatever bytes arrived after the final boundary. That’s either empty string or the start of the next event. .pop() removes it from the array and holds it in buffer. Next chunk prepends it and completes it before parsing. This is the entire pattern. Every robust SSE parser in production does exactly this one thing.
The 11-Worker Fanout
Running one SSE stream is straightforward. Running eleven simultaneously — all firing in parallel, all streaming independently, with an orchestrator coordinating the results — is where the architecture gets interesting.
Each of the ten domain expert Workers is a completely independent Cloudflare Worker — separate deployment, separate cache, separate stream. The client (or the orchestrator Worker) fires all ten simultaneously using Promise.allSettled so a single agent failure never blocks the others.
async function fanoutToAllAgents(message: string) { const agents = [ 'vasquez', 'webb', 'chen', 'okafor', 'mitchell', 'nakamura', 'diallo', 'harlow', 'deleon', 'cross' ]; // Fire all 10 agent Workers simultaneously — not sequentially const results = await Promise.allSettled( agents.map(id => callAgentWorker(id, message)) ); // Separate fulfilled from failed — one bad agent never blocks the rest const responses = results .filter(r => r.status === 'fulfilled') .map(r => (r as PromiseFulfilledResult<any>).value); const failed = results .filter(r => r.status === 'rejected') .map((r, i) => ({ agent: agents[i], reason: (r as PromiseRejectedResult).reason })); if (failed.length > 0) console.warn('Agents failed:', failed); // Pass all responses to orchestrator for tension map generation return generateTensionMap(responses); } async function callAgentWorker(agentId: string, message: string) { // Each worker is a separate CF Worker URL — no shared state const res = await fetch(`https://${agentId}.consilium.workers.dev`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${env.INTERNAL_KEY}` }, body: JSON.stringify({ message }), signal: AbortSignal.timeout(30000) // 30s timeout per agent }); return res.json(); }
Never use Promise.all for multi-agent fanout. If one agent 429s or 5xxs, Promise.all rejects the entire batch immediately — you lose all ten responses because of one failure. Promise.allSettled collects every result regardless of individual failures. Nine working agents is a product. Zero working agents because one failed is an outage.
Securing the Orchestrator
The 11th worker is the most exposed piece of the system. It accepts a message, fans out to ten agent workers, synthesizes the tension map, and returns a structured response. If it’s publicly callable without auth, you’ve built an API that proxies eleven Claude calls for anyone with a browser. That’s not a vulnerability — it’s a billing event.
export default { async fetch(req: Request, env: Env): Promise<Response> { // LAYER 1: CORS preflight — lock origin to your actual domain const origin = req.headers.get('Origin') ?? ''; const allowed = ['https://consilium.proptechusa.ai', 'https://proptechusa.ai']; if (!allowed.includes(origin)) { return new Response('Forbidden', { status: 403 }); } if (req.method === 'OPTIONS') { return new Response(null, { headers: { 'Access-Control-Allow-Origin': origin, 'Access-Control-Allow-Methods': 'POST', 'Access-Control-Allow-Headers': 'Content-Type, Authorization', } }); } // LAYER 2: Shared secret — client sends this, set in Cloudflare env vars const token = req.headers.get('Authorization')?.replace('Bearer ', ''); if (!token || token !== env.CONSILIUM_SECRET) { return new Response('Unauthorized', { status: 401 }); } // LAYER 3: Rate limiting by IP via Cloudflare KV const ip = req.headers.get('CF-Connecting-IP') ?? 'unknown'; const key = `rl:${ip}`; const count = parseInt(await env.KV.get(key) ?? '0'); if (count > 20) { // 20 requests/minute per IP return new Response('Rate limit exceeded', { status: 429 }); } await env.KV.put(key, String(count + 1), { expirationTtl: 60 }); // auto-expire in 60s // All three layers passed — proceed to fanout return fanoutToAllAgents(req, env); } };
The orchestrator calls the ten agent Workers internally — Worker to Worker over Cloudflare’s private network. These internal calls use a separate INTERNAL_KEY env var, not the client-facing CONSILIUM_SECRET. Keep them separate. The agent workers should reject any request that doesn’t carry the internal key — meaning they’re never directly callable from the public internet, only via the orchestrator. The CORS origin check on the agent workers can be set to the orchestrator’s Worker subdomain specifically.
The implementation code in the earlier section uses Access-Control-Allow-Origin: * for simplicity. In production that should be your specific domain. The reason it’s shown as * is that Workers deployed on workers.dev subdomains during development don’t have a fixed origin yet. Once you’re on a custom domain, lock it down. Use CF-Connecting-IP for rate limiting at the Worker level, and Cloudflare’s built-in DDoS protection handles the rest at the edge before requests reach your Worker at all.
Error Handling and Reconnection
SSE connections drop. Cloudflare Workers have CPU time limits. Anthropic rate-limits. A production streaming system needs explicit handling for all three, without breaking the user's experience when any of them happen.
async function streamWithFallback(payload: any, env: Env) { const models = ['claude-sonnet-4-20250514', 'claude-haiku-4-5-20251001']; for (const model of models) { try { const res = await fetch(ANTHROPIC_URL, { ...headers, body: JSON.stringify({ ...payload, model, stream: true }) }); if (res.status === 429 || res.status >= 500) { // Emit error event to client BEFORE trying fallback await emitSSEEvent('retry', { model, status: res.status }); continue; // try next model } if (!res.ok) throw new Error(`Non-retryable: ${res.status}`); return res; // success — return the stream } catch (e) { if (model === models.at(-1)) { // All models exhausted — emit terminal error event await emitSSEEvent('error', { message: 'All models exhausted' }); throw e; } } } } // Emit a named SSE event so the client can handle it explicitly function emitSSEEvent(eventType: string, data: object) { writer.write(encode( `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n` )); }
Swallowing errors silently in a stream is fatal. The stream closes, the client sees nothing, and there's no way to distinguish "finished generating" from "something went wrong." Always emit a named event: error or event: done — never let the stream end without a signal. The client should listen for these explicitly, not just for message events.
How It Stacks With Prompt Caching
This is the architecture that closes the loop on the full trilogy. Prompt caching handles the input side — your 2,200-token system prompt gets cached, saving 90% on every read. SSE handles the output side — the model's response streams token by token the moment it starts generating. Both features are orthogonal and compose cleanly:
body: JSON.stringify({ model: 'claude-sonnet-4-20250514', max_tokens: 2048, stream: true, // ← SSE (post 3) system: [{ type: 'text', text: EPISTEMIC_FINGERPRINT, // ← engineered agent (post 2) cache_control: { type: 'ephemeral' } // ← prompt caching (post 1) }], messages: [{ role: 'user', content: message }] }) // Result: // Input tokens: cached at 90% discount // Agent reasoning: distinct epistemic architecture // Response delivery: token-by-token, <500ms to first word // Total cost: a fraction of what most single-agent systems pay
Frequently Asked
fetch with ReadableStream instead of native EventSource?
EventSource API only supports GET requests. AI API calls require POST with a body (the messages, model config, etc.). So the correct pattern is fetch() with res.body.getReader() — you get the same streaming behavior with full control over the request method and body.
Promise.allSettled to fire all ten simultaneously and collects results as they arrive.
event: error, event: done, event: retry — so the client can distinguish between "finished," "failed," and "retrying with fallback model." Never let the stream close silently. On the server side, implement model fallback before the stream is established: catch 429 and 5xx errors, emit a retry event, then try the next model. The stream the client opened stays open throughout.
AbortSignal.timeout on the upstream fetch and reconnection logic on the client side using the Last-Event-ID header to resume from the last received event.
\n\n-terminated SSE event often spans multiple TCP packets — your ReadableStream reader receives partial JSON. A naive split('\n') + JSON.parse will throw on those partial objects. The fix: carry a buffer across reads, split on '\n\n' (the actual SSE event boundary), and always .pop() the last (potentially incomplete) item back into the buffer for the next chunk.
Authorization: Bearer header, stored in Cloudflare env vars, rotatable without redeployment. (3) IP-based rate limiting via Cloudflare KV — 20 requests/minute per CF-Connecting-IP, auto-expiring keys. The ten agent workers use a separate internal key and only accept requests from the orchestrator's Worker subdomain — unreachable from the public internet directly.
The Consilium runs all ten domain expert workers in parallel — cached, streamed, edge-deployed. Bring a question that deserves ten different minds pushing back on it.
Open The Consilium