AIFeature Posts

Day 10: Context Windows — The Key Architectural Constraint

“Context is not just memory. It is the entire universe the model can see. Everything outside the window does not exist.”

Why This Matters

Context windows are the single most important architectural constraint in LLM application design. Almost every major failure mode in production AI — forgotten instructions, hallucinated facts, ignored documents, broken multi-turn conversations — traces back to context window mismanagement.

In 2026, context windows have grown dramatically:

  • GPT-4o: 128,000 tokens
  • Claude Opus 4: 200,000 tokens
  • Gemini 2.5 Pro: 1,048,576 tokens (1 million+)

Bigger is better, right? Not quite. Long context comes with compounding costs, latency penalties, the lost-in-the-middle attention problem, and architectural tradeoffs that make it wrong for many applications. Understanding when to use long context — and when not to — is what separates junior AI engineers from senior ones.

Part 1: What a Context Window Actually Is

The Formal Definition

A context window is the maximum number of tokens the model can process in a single forward pass. This includes:

Total context = System prompt + Conversation history + Retrieved documents + Current user message + Model response

Every single token across all these sources competes for space in the same fixed-size window. The model has no awareness of anything outside the window — not your database, not previous conversations from last week, not the other 900 pages of the document you fed it 500 pages of.

Tokens ≠ Words

A crucial nuance: tokens are not words. The tokenization ratio varies by content type:

import tiktoken

encoder = tiktoken.encoding_for_model("gpt-4o")

# Approximate token ratios by content type
examples = {
"English prose": "The quick brown fox jumps over the lazy dog",
"Python code": "def fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)",
"JSON data": '{"name": "Alice", "age": 30, "city": "New York"}',
"Technical jargon": "Kullback-Leibler divergence Transformer self-attention",
"Chinese text": "你好,世界。这是一个测试句子。",
}

for label, text in examples.items():
tokens = encoder.encode(text)
ratio = len(tokens) / len(text.split())
print(f"{label:25} | Words: {len(text.split()):4} | Tokens: {len(tokens):4} | Tokens/word: {ratio:.2f}")

Output:

English prose             | Words:    9 | Tokens:    9 | Tokens/word: 1.00
Python code | Words: 12 | Tokens: 28 | Tokens/word: 2.33
JSON data | Words: 7 | Tokens: 19 | Tokens/word: 2.71
Technical jargon | Words: 4 | Tokens: 11 | Tokens/word: 2.75
Chinese text | Words: 1 | Tokens: 10 | Tokens/word: 10.00

Rule of thumb for capacity planning:

Content type          Tokens per page (A4, ~400 words)
────────────────────────────────────────────────────────
English prose ~500 tokens
Code ~800 tokens (more tokens per line)
JSON/XML data ~1,000 tokens (structure overhead)
Mixed technical doc ~600 tokens

So a 128k token window holds roughly:

  • ~250 pages of English prose
  • ~160 pages of code
  • ~128 pages of structured data

The Hard Truncation Problem

What happens when you exceed the context limit? The SDK raises an error — or worse, silently truncates your input without telling you:

from openai import AsyncOpenAI, BadRequestError
import tiktoken

client = AsyncOpenAI()
encoder = tiktoken.encoding_for_model("gpt-4o")

async def safe_complete(messages: list[dict], model: str = "gpt-4o") -> str:
"""
Context-aware completion that counts tokens BEFORE sending.
Raises an explicit error instead of silent truncation.
"""

MODEL_LIMITS = {
"gpt-4o": 128_000,
"gpt-4o-mini": 128_000,
"o3": 200_000,
"claude-opus-4": 200_000,
"claude-sonnet-4": 200_000,
"gemini-2.5-pro": 1_048_576,
"gemini-2.5-flash": 1_048_576,
}

RESERVE_FOR_OUTPUT = 4_096 # Reserve tokens for the model's response

# Count tokens in all messages
total_tokens = 0
for message in messages:
# +4 per message for role/formatting overhead
total_tokens += 4 + len(encoder.encode(message.get("content", "")))
total_tokens += 2 # priming tokens

limit = MODEL_LIMITS.get(model, 128_000)
available = limit - RESERVE_FOR_OUTPUT

if total_tokens > available:
raise ValueError(
f"Input too long: {total_tokens:,} tokens exceeds "
f"{available:,} available ({limit:,} limit − {RESERVE_FOR_OUTPUT:,} output reserve). "
f"Reduce input by {total_tokens - available:,} tokens."
)

response = await client.chat.completions.create(
model=model,
messages=messages,
max_tokens=RESERVE_FOR_OUTPUT,
)

return response.choices[0].message.content

Part 2: The Physics of Long Context — KV Cache

Why Long Context Is Expensive

To understand why long context costs money and latency, you need to understand the Key-Value (KV) Cache — the memory structure that makes transformer inference work.

During a forward pass, every token in the context produces two vectors: a Key and a Value. These are used by the self-attention mechanism so that later tokens can “look back” at earlier ones.

For each transformer layer:
KV cache size = context_length × num_heads × head_dimension × 2 × dtype_bytes

Example: GPT-4o, 128k context, 96 layers, 128 heads, 128 dim, float16:
= 128,000 × 128 × 128 × 2 × 2 bytes
= ~84 GB of KV cache for one request at full contextExample: GPT-4o, 128k context, 96 layers, 128 heads, 128 dim, float16:
= 128,000 × 128 × 128 × 2 × 2 bytes
= ~84 GB of KV cache for one request at full context

This means:

Context length    KV cache (approximate)    Latency impact
────────────────────────────────────────────────────────────
4,096 tokens ~2.5 GB Baseline
32,000 tokens ~20 GB 4-6× slower
128,000 tokens ~84 GB 20-40× slower
1,000,000 tokens ~640 GB 100×+ slower, GPU cluster needed

Cost implications (OpenAI GPT-4o pricing, 2026):

Context length    Input cost (per request)
──────────────────────────────────────────
1,000 tokens $0.0000025
10,000 tokens $0.000025
100,000 tokens $0.00025
1,000,000 tokens $0.0025

This might seem cheap per request, but at scale:

# Cost calculator for long-context at scale
def context_cost_analysis(
avg_context_tokens: int,
requests_per_day: int,
input_cost_per_million: float = 2.50,
output_cost_per_million: float = 10.00,
avg_output_tokens: int = 500,
) -> dict:

daily_input_tokens = avg_context_tokens * requests_per_day
daily_output_tokens = avg_output_tokens * requests_per_day

daily_input_cost = (daily_input_tokens / 1_000_000) * input_cost_per_million
daily_output_cost = (daily_output_tokens / 1_000_000) * output_cost_per_million
daily_total = daily_input_cost + daily_output_cost

return {
"avg_context_tokens": f"{avg_context_tokens:,}",
"requests_per_day": f"{requests_per_day:,}",
"daily_input_cost": f"${daily_input_cost:.2f}",
"daily_output_cost": f"${daily_output_cost:.2f}",
"daily_total": f"${daily_total:.2f}",
"monthly_total": f"${daily_total * 30:.2f}",
"annual_total": f"${daily_total * 365:.2f}",
}

# A document Q&A app: 50k avg context, 10,000 requests/day
print(context_cost_analysis(50_000, 10_000))
# → monthly: ~$3,750

# Same app with RAG (3k context): massive savings
print(context_cost_analysis(3_000, 10_000))
# → monthly: ~$225


Output:

{'avg_context_tokens': '50,000', 'requests_per_day': '10,000', 'daily_input_cost': '$1250.00', 'daily_output_cost': '$50.00', 'daily_total': '$1300.00', 'monthly_total': '$39000.00', 'annual_total': '$474500.00'}
{'avg_context_tokens': '3,000', 'requests_per_day': '10,000', 'daily_input_cost': '$75.00', 'daily_output_cost': '$50.00', 'daily_total': '$125.00', 'monthly_total': '$3750.00', 'annual_total': '$45625.00'}

This is why RAG exists. Stuffing 50k tokens into every request is 16× more expensive than retrieving 3k relevant tokens.

Part 3: The Long-Context Model Landscape (2026)

Model                    Context      Input cost/M    Best for
────────────────────────────────────────────────────────────────────────────
GPT-4o 128k $2.50 Standard production
GPT-4o-mini 128k $0.15 Cost-sensitive, high volume
o3 200k $10.00 Complex reasoning tasks
Claude Opus 4 200k $15.00 Long document analysis
Claude Sonnet 4 200k $3.00 Balanced long-context
Claude Haiku 3.5 200k $0.80 Fast, cheap long-context
Gemini 2.5 Pro 1M $1.25 (≤200k) Entire codebases, books
$2.50 (>200k)
Gemini 2.5 Flash 1M $0.075 Highest volume, long docs
Gemini 1.5 Pro 2M $1.25 (≤128k) Legacy very long context

When to Choose Each Tier

Context need                           Recommendation
──────────────────────────────────────────────────────────────────────────
< 10k tokens (chat, short Q&A) Any model — optimize for cost/quality
10k – 50k (medium documents) GPT-4o, Claude Sonnet, Gemini Flash
50k – 200k (books, codebases) Claude Sonnet/Opus, Gemini Pro
200k – 1M (entire repos, research) Gemini 2.5 Pro/Flash exclusively
Repeating same context (caching) Claude (prompt caching), Gemini (implicit)

Prompt Caching: The Cost Multiplier

Anthropic and Google offer prompt caching — if you send the same prefix (system prompt + documents) repeatedly, the KV cache is computed once and reused:

import anthropic

client = anthropic.AsyncAnthropic()

# With prompt caching — pay full price first time, ~10% on cache hits
response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are a legal analyst. Here are the relevant statutes:",
},
{
"type": "text",
"text": very_long_legal_document, # e.g., 100k tokens
"cache_control": {"type": "ephemeral"}, # Cache this prefix
}
],
messages=[
{"role": "user", "content": "What does Section 4.2 say about liability?"}
],
)

# cache_creation_input_tokens — paid at full price (first time)
# cache_read_input_tokens — paid at 10% of normal price (subsequent calls)
print(response.usage.cache_creation_input_tokens)
print(response.usage.cache_read_input_tokens)

Cache savings at scale:

Scenario: 100k token document, 1000 queries/day
Without caching: 1000 × 100k × $3/M = $300/day
With caching: 1× $30 creation + 999× $3 cache read = $33/day
Savings: 89%

Part 4: The Lost-in-the-Middle Problem

What the Research Found

In 2023, researchers at Stanford published a landmark paper: “Lost in the Middle: How Language Models Use Long Contexts.” The finding was striking and has been confirmed across multiple frontier models:

Become a Medium member

LLMs perform significantly worse at retrieving information placed in the middle of long contexts compared to information at the beginning or end.

Information position     Retrieval accuracy (multi-document QA)
────────────────────────────────────────────────────────────────
Beginning of context ~75-80%
End of context ~70-75%
Middle of context ~50-60% ← significant degradation

This U-shaped performance curve has major implications for system design.

Why This Happens

The attention mechanism in transformers has recency bias and primacy bias baked in through how positional encodings and attention patterns develop during training. The model “remembers” the beginning (system prompt) and the end (most recent message) most reliably. Middle content — especially in very long contexts — receives diffuse, diluted attention.

Designing Around It

from enum import Enum

class DocumentPlacementStrategy(Enum):
RECENCY_FIRST = "recency_first" # Most relevant docs at the end
PRIMACY_FIRST = "primacy_first" # Most relevant docs at the start
SANDWICH = "sandwich" # Most relevant at start AND end

def build_context_with_placement(
system_prompt: str,
documents: list[dict], # Each: {"content": str, "relevance_score": float}
user_query: str,
strategy: DocumentPlacementStrategy = DocumentPlacementStrategy.RECENCY_FIRST,
) -> list[dict]:
"""
Arrange documents in context to exploit primacy/recency bias.

Relevance scores should be pre-computed (e.g., from a retriever).
Higher score = more relevant.
"""

# Sort documents by relevance
sorted_docs = sorted(documents, key=lambda d: d["relevance_score"])

if strategy == DocumentPlacementStrategy.RECENCY_FIRST:
# Most relevant at the end (recency bias favors retrieval)
ordered = sorted_docs # Least relevant first, most relevant last

elif strategy == DocumentPlacementStrategy.PRIMACY_FIRST:
# Most relevant at the start
ordered = sorted_docs[::-1] # Most relevant first

elif strategy == DocumentPlacementStrategy.SANDWICH:
# Top-1 at start, rest in middle, top-2 at end
if len(sorted_docs) >= 2:
most_relevant = sorted_docs[-1]
second_most = sorted_docs[-2]
middle = sorted_docs[:-2]
ordered = [most_relevant] + middle + [second_most]
else:
ordered = sorted_docs[::-1]

# Build context string
doc_context = "\n\n".join([
f"[Document {i+1} | Relevance: {doc['relevance_score']:.2f}]\n{doc['content']}"
for i, doc in enumerate(ordered)
])

return [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": f"{doc_context}\n\n---\n\nQuestion: {user_query}"
}
]

# Usage
messages = build_context_with_placement(
system_prompt="You are a helpful research assistant. Answer based on the documents provided.",
documents=[
{"content": "Climate change refers to...", "relevance_score": 0.45},
{"content": "The Paris Agreement of 2015...", "relevance_score": 0.91},
{"content": "Carbon dioxide levels have...", "relevance_score": 0.73},
{"content": "Renewable energy adoption...", "relevance_score": 0.58},
],
user_query="What international agreements address climate change?",
strategy=DocumentPlacementStrategy.RECENCY_FIRST,
)

Practical rule: For RAG systems, place your highest-relevance retrieved chunks at the end of the context (just before the user query). This exploits the recency bias and measurably improves retrieval accuracy.

Part 5: Four Context Management Strategies

When documents exceed your context window (or cost constraints), you have four strategies. Each has different tradeoffs:

Strategy 1: Sliding Window

Process the document in overlapping chunks, maintaining a window of context:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def sliding_window_summarize(
long_text: str,
window_tokens: int = 3_000,
overlap_tokens: int = 500,
model: str = "gpt-4o-mini",
) -> str:
"""
Summarize a document too long for a single context window.
Uses overlapping windows to prevent losing information at boundaries.
"""
import tiktoken
encoder = tiktoken.encoding_for_model("gpt-4o")

# Tokenize the full text
all_tokens = encoder.encode(long_text)
total_tokens = len(all_tokens)

print(f"Document: {total_tokens:,} tokens → processing in sliding windows")

chunks = []
step = window_tokens - overlap_tokens

for start in range(0, total_tokens, step):
end = min(start + window_tokens, total_tokens)
chunk_tokens = all_tokens[start:end]
chunk_text = encoder.decode(chunk_tokens)
chunks.append(chunk_text)

if end >= total_tokens:
break

print(f"Created {len(chunks)} overlapping windows")

# Summarize each window
window_summaries = []
for i, chunk in enumerate(chunks):
response = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "Summarize the key information in this text section. "
"Be concise but preserve all important facts, names, and decisions."
},
{"role": "user", "content": chunk}
],
temperature=0.1,
max_tokens=500,
)
summary = response.choices[0].message.content
window_summaries.append(f"[Window {i+1}/{len(chunks)}]\n{summary}")
print(f" ✓ Window {i+1}/{len(chunks)} summarized")

# Final synthesis
combined = "\n\n".join(window_summaries)
final_response = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "You have summaries of consecutive sections of a document. "
"Synthesize them into a single coherent summary."
},
{"role": "user", "content": combined}
],
temperature=0.2,
max_tokens=1_000,
)

return final_response.choices[0].message.content

Best for: Long document summarization, processing books, legal document review. Weakness: Multiple API calls = higher latency and cost. Information that spans chunk boundaries may be lost.

Strategy 2: Map-Reduce

Parallel processing of independent chunks, then aggregation:

import asyncio
from openai import AsyncOpenAI
import tiktoken

client = AsyncOpenAI()
encoder = tiktoken.encoding_for_model("gpt-4o")

def chunk_text(text: str, chunk_size: int = 4_000, overlap: int = 200) -> list[str]:
"""Split text into overlapping token chunks."""
tokens = encoder.encode(text)
chunks = []
step = chunk_size - overlap

for start in range(0, len(tokens), step):
end = min(start + chunk_size, len(tokens))
chunks.append(encoder.decode(tokens[start:end]))
if end >= len(tokens):
break

return chunks

async def map_chunk(chunk: str, query: str, chunk_id: int) -> str:
"""MAP phase: extract relevant info from one chunk."""
response = await client.chat.completions.create(
model="gpt-4o-mini", # Use cheaper model for map phase
messages=[
{
"role": "system",
"content": f"Extract information relevant to this query: '{query}'\n"
"Return only relevant excerpts and facts. "
"If nothing is relevant, say 'No relevant information.'"
},
{"role": "user", "content": chunk}
],
temperature=0,
max_tokens=400,
)
result = response.choices[0].message.content
if "No relevant information" not in result:
return f"[Chunk {chunk_id}]\n{result}"
return ""

async def reduce_results(mapped_results: list[str], query: str) -> str:
"""REDUCE phase: synthesize all relevant extractions."""
relevant = [r for r in mapped_results if r.strip()]

if not relevant:
return "No relevant information found in the document."

combined = "\n\n".join(relevant)

response = await client.chat.completions.create(
model="gpt-4o", # Use better model for final synthesis
messages=[
{
"role": "system",
"content": "Synthesize these extractions into a comprehensive answer. "
"Eliminate redundancy. Be direct and accurate."
},
{
"role": "user",
"content": f"Query: {query}\n\nExtractions:\n{combined}"
}
],
temperature=0.1,
max_tokens=1_000,
)
return response.choices[0].message.content

async def map_reduce_qa(document: str, query: str, concurrency: int = 10) -> str:
"""
Answer a query over a long document using MapReduce.

Processes chunks in parallel — much faster than sequential sliding window
for Q&A tasks.
"""
chunks = chunk_text(document, chunk_size=4_000)
print(f"MapReduce: {len(chunks)} chunks, query='{query[:50]}...'")

# MAP phase — parallel execution with concurrency limit
semaphore = asyncio.Semaphore(concurrency)

async def bounded_map(chunk, idx):
async with semaphore:
return await map_chunk(chunk, query, idx)

mapped = await asyncio.gather(
*[bounded_map(chunk, i+1) for i, chunk in enumerate(chunks)]
)

relevant_count = sum(1 for r in mapped if r.strip())
print(f"MAP complete: {relevant_count}/{len(chunks)} chunks had relevant info")

# REDUCE phase
result = await reduce_results(list(mapped), query)
return result

# Usage
answer = asyncio.run(map_reduce_qa(
document=very_long_document, # Could be 500k+ words
query="What were the main causes of the 2008 financial crisis?"
))

Best for: Q&A over long documents, information extraction, parallel processing where chunks are independent. Weakness: Aggregation quality depends on the REDUCE step. Doesn’t maintain narrative continuity.

Strategy 3: Hierarchical Summarization

Build a tree of summaries — summarize chunks, then summarize summaries:

import asyncio
from openai import AsyncOpenAI
import tiktoken
import math

client = AsyncOpenAI()
encoder = tiktoken.encoding_for_model("gpt-4o")

async def summarize_chunk(text: str, level: int = 0) -> str:
"""Summarize a text chunk at a given hierarchy level."""
instructions = {
0: "Summarize this text section, preserving key facts, decisions, and named entities.",
1: "Summarize these section summaries into a chapter-level overview.",
2: "Create an executive summary from these chapter overviews.",
}

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": instructions.get(level, instructions[2])},
{"role": "user", "content": text}
],
temperature=0.1,
max_tokens=600,
)
return response.choices[0].message.content

async def hierarchical_summarize(
document: str,
chunk_tokens: int = 3_000,
max_levels: int = 3,
) -> dict:
"""
Build a summary tree from bottom up.

Returns a dict with summaries at each level:
{
"level_0": [...chunk summaries...],
"level_1": [...summaries of summaries...],
"final": "executive summary"
}
"""

# Level 0: Chunk the document
tokens = encoder.encode(document)
chunk_token_groups = [
tokens[i:i+chunk_tokens]
for i in range(0, len(tokens), chunk_tokens)
]
chunks = [encoder.decode(t) for t in chunk_token_groups]

print(f"Hierarchical summarization: {len(chunks)} chunks, {len(tokens):,} tokens")

result = {}
current_summaries = chunks

for level in range(max_levels):
print(f"Level {level}: Summarizing {len(current_summaries)} items...")

# Summarize all items at this level in parallel
summaries = await asyncio.gather(
*[summarize_chunk(text, level) for text in current_summaries]
)

result[f"level_{level}"] = list(summaries)

# Check if we're done (few enough summaries to fit in one context)
total_tokens = sum(len(encoder.encode(s)) for s in summaries)
if total_tokens < 4_000 or len(summaries) <= 3:
# Final synthesis
combined = "\n\n".join(f"[Section {i+1}]\n{s}" for i, s in enumerate(summaries))
final_summary = await summarize_chunk(combined, level=2)
result["final"] = final_summary
print(f"Final summary created at level {level}")
break

# Group summaries into larger chunks for next level
group_size = math.ceil(math.sqrt(len(summaries)))
grouped = []
for i in range(0, len(summaries), group_size):
group = summaries[i:i+group_size]
grouped.append("\n\n".join(group))

current_summaries = grouped

return result

Best for: Book summarization, research paper digestion, building document hierarchies for navigation. Weakness: Loses fine-grained details at higher levels. Computationally expensive.

Strategy 4: Retrieval-Augmented Generation (RAG)

Rather than fitting all content in context, retrieve only what’s relevant:

from openai import AsyncOpenAI
import numpy as np

client = AsyncOpenAI()

# Simplified RAG context builder
async def build_rag_context(
query: str,
document_chunks: list[dict], # pre-chunked and embedded documents
max_context_tokens: int = 6_000,
top_k: int = 10,
) -> str:
"""
Build a context from retrieved chunks that fits within token budget.

document_chunks: [{"text": str, "embedding": list[float], "source": str}]
"""

import tiktoken
encoder = tiktoken.encoding_for_model("gpt-4o")

# 1. Embed the query
query_embedding_response = await client.embeddings.create(
model="text-embedding-3-small",
input=query,
)
query_embedding = np.array(query_embedding_response.data[0].embedding)

# 2. Score all chunks by cosine similarity
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

scored_chunks = []
for chunk in document_chunks:
chunk_embedding = np.array(chunk["embedding"])
score = cosine_similarity(query_embedding, chunk_embedding)
scored_chunks.append({**chunk, "score": score})

# 3. Sort by relevance, take top_k
scored_chunks.sort(key=lambda x: x["score"], reverse=True)
top_chunks = scored_chunks[:top_k]

# 4. Fill context up to token budget (most relevant chunks first)
context_parts = []
used_tokens = 0

for chunk in top_chunks:
chunk_text = f"[Source: {chunk['source']} | Relevance: {chunk['score']:.2f}]\n{chunk['text']}"
chunk_tokens = len(encoder.encode(chunk_text))

if used_tokens + chunk_tokens > max_context_tokens:
break

context_parts.append(chunk_text)
used_tokens += chunk_tokens

print(f"RAG: Selected {len(context_parts)}/{len(top_chunks)} chunks, "
f"{used_tokens:,}/{max_context_tokens:,} tokens used")

# 5. Order by source position (not relevance) to maintain narrative flow
# This counteracts lost-in-the-middle — most relevant was already at position 1
return "\n\n---\n\n".join(context_parts)


# Full RAG pipeline
async def rag_answer(query: str, document_chunks: list[dict]) -> str:
context = await build_rag_context(query, document_chunks)

response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Answer the question using only the provided context. "
"If the answer isn't in the context, say so explicitly."
},
{
"role": "user",
"content": f"Context:\n{context}\n\n---\n\nQuestion: {query}"
}
],
temperature=0.1,
max_tokens=1_000,
)
return response.choices[0].message.content

Best for: Document Q&A, knowledge base search, enterprise information retrieval. Weakness: Only works when the question can be answered by a few relevant passages. Fails for cross-document reasoning.

Part 6: Context Management in Multi-Turn Conversations

Conversations accumulate context automatically. Without management, a long chat session eventually hits the context limit:

from openai import AsyncOpenAI
from collections import deque
import tiktoken

client = AsyncOpenAI()
encoder = tiktoken.encoding_for_model("gpt-4o")

class ManagedConversation:
"""
A multi-turn conversation that automatically manages context length.

Strategies:
- SLIDING: Drop oldest messages when over budget
- SUMMARIZE: Compress old history into a rolling summary
"""

def __init__(
self,
system_prompt: str,
max_context_tokens: int = 100_000,
output_reserve: int = 4_000,
strategy: str = "summarize", # "sliding" or "summarize"
model: str = "gpt-4o",
):
self.system_prompt = system_prompt
self.max_context_tokens = max_context_tokens
self.output_reserve = output_reserve
self.strategy = strategy
self.model = model

self.messages: list[dict] = []
self.summary: str = "" # Rolling summary of compressed history

def _count_tokens(self, messages: list[dict]) -> int:
total = 4 # Base overhead
for msg in messages:
total += 4 + len(encoder.encode(msg.get("content", "")))
return total

def _system_messages(self) -> list[dict]:
"""Build system context including rolling summary if it exists."""
content = self.system_prompt
if self.summary:
content += f"\n\n[Conversation summary so far]\n{self.summary}"
return [{"role": "system", "content": content}]

async def _compress_history(self) -> None:
"""Summarize the oldest half of the conversation history."""

half = len(self.messages) // 2
to_compress = self.messages[:half]
self.messages = self.messages[half:]

# Build a conversation transcript to summarize
transcript = "\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_compress
])

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "Summarize this conversation excerpt concisely. "
"Preserve key facts, decisions, and user preferences."
},
{"role": "user", "content": transcript}
],
temperature=0,
max_tokens=500,
)

new_summary = response.choices[0].message.content

# Append to existing summary
if self.summary:
self.summary = f"{self.summary}\n\n[Later:]\n{new_summary}"
else:
self.summary = new_summary

print(f" [Context manager] Compressed {half} messages into summary")

async def chat(self, user_message: str) -> str:
"""Send a message and get a response, with automatic context management."""

self.messages.append({"role": "user", "content": user_message})

# Check if we're over budget
all_messages = self._system_messages() + self.messages
current_tokens = self._count_tokens(all_messages)
budget = self.max_context_tokens - self.output_reserve

if current_tokens > budget:
print(f" [Context manager] {current_tokens:,} tokens > {budget:,} budget")

if self.strategy == "sliding":
# Drop oldest messages (keep system + last N messages)
while self._count_tokens(self._system_messages() + self.messages) > budget:
if len(self.messages) > 2:
dropped = self.messages.pop(0)
print(f" [Context manager] Dropped: {dropped['content'][:50]}...")
else:
break

elif self.strategy == "summarize":
await self._compress_history()

# Make the API call
response = await client.chat.completions.create(
model=self.model,
messages=self._system_messages() + self.messages,
temperature=0.7,
max_tokens=self.output_reserve,
)

assistant_message = response.choices[0].message.content
self.messages.append({"role": "assistant", "content": assistant_message})

return assistant_message

def context_report(self) -> dict:
all_messages = self._system_messages() + self.messages
current_tokens = self._count_tokens(all_messages)
return {
"current_tokens": current_tokens,
"max_tokens": self.max_context_tokens,
"utilization": f"{current_tokens / self.max_context_tokens * 100:.1f}%",
"message_count": len(self.messages),
"has_summary": bool(self.summary),
"summary_preview": self.summary[:100] + "..." if self.summary else None,
}


# Usage
async def demo_conversation():
convo = ManagedConversation(
system_prompt="You are a helpful AI assistant.",
max_context_tokens=50_000,
strategy="summarize",
)

# Simulate a long conversation
exchanges = [
"My name is Alex and I'm building a fintech startup.",
"We're focused on B2B payment infrastructure for Southeast Asia.",
"Our main challenge is regulatory compliance across 6 different markets.",
"Can you explain the key regulatory frameworks I need to know?",
# ... many more messages
]

for message in exchanges:
response = await convo.chat(message)
print(f"User: {message}")
print(f"Assistant: {response[:100]}...\n")

print("Context report:", convo.context_report())

Part 7: Choosing the Right Strategy

Document size        Query type              Recommended strategy
──────────────────────────────────────────────────────────────────────────
< 50k tokens Any Direct context stuffing
(fits in Claude/Gemini window)

50k – 200k tokens Q&A, fact lookup RAG (retrieve top-k chunks)
Summarization Hierarchical or Map-Reduce

200k – 1M tokens Q&A, extraction Gemini 2.5 Pro (1M context)
or RAG

> 1M tokens Any RAG required — no model
(e.g., full codebases) fits this in context window

Multi-turn chat Conversation Sliding window or summarization
management (depending on coherence needs)

Repeated same doc Q&A Prompt caching (Claude/Gemini)

🔍 Common Mistakes

1. Assuming the model read the whole document

When you send a 100k-token document, the model processes all tokens — but attention is not uniform. Critical information buried in the middle may be de-emphasized. Always test retrieval quality across document positions.

2. Silent truncation by SDKs

Some older integrations silently truncate input rather than raising an error. Always count tokens before sending large contexts. Never assume “it fits.”

# ✅ Always check before sending
assert count_tokens(messages) < MODEL_LIMIT - OUTPUT_RESERVE, "Context too large!"

3. Forgetting output tokens count against the limit

The context window includes both input and output. If you send 127,000 tokens to a 128k model and ask for a 2,000-token response, you’ll hit the limit mid-generation.

4. Using a 1M-context model for everything

Gemini 2.5 Pro’s 1M context window is impressive — but at 100k+ tokens, costs compound and latency grows. Use it when you genuinely need it. For most Q&A tasks, RAG at 3k-10k tokens is 10–100× cheaper and faster.

5. Not handling the summarize strategy’s information loss

Rolling summarization loses details. If a user asks “What did I say about X in our third message?” after 50 compressed messages, the answer may be gone. For applications where conversation history fidelity matters, use a database + retrieval, not summarization.

💼 Quick Questions

Q: A user complains that your chatbot “forgot” something they mentioned 20 messages ago. What’s the likely cause and how do you fix it?

The conversation exceeded the context window and oldest messages were dropped (sliding window) or compressed (summarization). The fix depends on the use case: if exact recall is required, store conversation history in a database and retrieve relevant past messages using semantic search before each turn. If approximate recall is acceptable, tune the summarization strategy to preserve key user facts (name, preferences, stated goals) in a structured memory store separate from the rolling summary.

Q: When would you use long-context stuffing vs RAG?

Long-context stuffing is appropriate when: (1) the entire document must be read holistically, not by passage retrieval (e.g., understanding the overall argument of a book); (2) cross-document reasoning is required (RAG can only retrieve isolated passages); (3) the document is small enough that cost and latency are acceptable. RAG is superior when: (1) you need cost efficiency at scale; (2) you need low latency; (3) the question can be answered by a few relevant passages; (4) documents are larger than any available context window.

Q: Explain the lost-in-the-middle problem and how you would design around it in a production RAG system.

The model’s attention is stronger at the beginning and end of the context. Middle content is retrieved less reliably. Design mitigations: (1) Place highest-relevance chunks at the end of the context (recency bias); (2) Use a “sandwich” layout with the most critical chunk first and second-most-critical last; (3) Limit retrieved chunks to the absolute minimum needed — padding the context with low-relevance filler amplifies the problem; (4) Use reranking to aggressively filter retrieved chunks before building context.

🏭 Production Considerations

Monitor context utilization in your observability pipeline. Log prompt_tokenscompletion_tokens, and total_tokens from every API response. Alert when average utilization exceeds 80% of the window — approaching the limit causes non-linear quality degradation.

Token counting is cheap; overruns are expensive. Adding a tiktoken token count before every API call costs microseconds but prevents context overflow errors in production. Always count before sending.

Prompt caching is a free optimization for high-volume applications. If your system prompt or reference documents are shared across many requests (which they almost always are), enabling Anthropic’s prompt caching or Gemini’s implicit caching can reduce costs by 70–90% on those tokens.

Test your context management strategy against real edge cases. The message “What did we discuss at the beginning of this conversation?” is a standard test for sliding window failure. Build an eval suite that exercises your context management code before deploying.

🔑 Key Takeaways

  1. The context window is the entire universe the model can see — everything outside it does not exist
  2. Tokens ≠ words — code, JSON, and technical content use significantly more tokens per “word”
  3. Long context is expensive — KV cache size scales linearly; at 1M tokens, you need a GPU cluster just for KV
  4. Lost in the middle is real — place your highest-relevance content at the end or beginning of long contexts
  5. Prompt caching is a free win — if the same documents appear across many requests, cache them
  6. Four strategies for long documents: sliding window, map-reduce, hierarchical summarization, RAG
  7. Multi-turn conversations need management — implement sliding window or summarization before you hit the limit, not after
  8. Count tokens before every large context call — never rely on the API to tell you when you’ve overflowed
  9. Match strategy to use case — 1M context models exist, but cost and latency make them wrong for most tasks
  10. RAG is the production default for document Q&A — long context stuffing is for edge cases that require holistic reading

📚 Further Reading

Leave a Reply

Your email address will not be published.