LLM Observability in Production: Tracing, Logging, and Monitoring AI Systems
Why LLM Observability Is a Different Beast
Your APM dashboard is all green. Average latency: 200 ms. Error rate: 0.1%. Every health check passes. Meanwhile, your LLM is hallucinating on 20% of responses, your monthly API bill just doubled, and nobody notices until a customer screenshot lands on Twitter.
Traditional monitoring tools were built for a world where cost is fixed (you pay per server, not per request), latency is predictable (database query plus render), and correctness is binary (it works or it throws a 500). LLM applications break all three assumptions. Cost scales with token count—and token count varies wildly per request. Latency depends on output length: a 10-token response is 50× faster than a 500-token response. And correctness is a spectrum: responses can be subtly wrong, slightly off-tone, or technically accurate but completely unhelpful. An HTTP 200 tells you nothing about generation quality.
This gap is why you need a purpose-built observability strategy. In this post, we'll build one from scratch around four pillars:
- Cost tracking — know exactly where every dollar goes, per feature, per model, per day
- Latency analysis — segment by model, prompt length, and time of day to catch what averages hide
- Quality monitoring — detect degradation before users do, using statistical signals, embeddings, and LLM-as-judge
- Usage intelligence — trace multi-step pipelines end to end and attribute resources to specific features
We'll implement each pillar with working Python code, wire them into two interactive dashboards, and finish with a practical decision guide for choosing between DIY, open-source, and managed observability platforms. If you've read our latency benchmarks post, think of this as its production counterpart: benchmarks measure what should happen; observability measures what actually happens.
Structured Logging for LLM Calls
Every observability stack starts with the same foundation: structured logs. Not print(f"Called GPT-4o, took {elapsed}ms")—actual structured JSON that downstream systems can parse, aggregate, and alert on. The following decorator wraps any async LLM call and captures everything you'll need for cost tracking, latency analysis, and debugging.
import time, json, hashlib, uuid
from dataclasses import dataclass, asdict
from functools import wraps
import logging
logger = logging.getLogger("llm_observability")
MODEL_PRICING = { # (input, output) per 1M tokens
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"claude-sonnet-4": (3.00, 15.00),
}
@dataclass
class LLMCallLog:
timestamp: str; trace_id: str; model: str
feature: str; input_tokens: int
output_tokens: int; cost_usd: float
latency_ms: float; ttft_ms: float
temperature: float; prompt_hash: str
finish_reason: str
def log_llm_call(feature: str):
"""Decorator that wraps an async LLM call with structured logging."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
trace_id = kwargs.pop("trace_id", uuid.uuid4().hex[:16])
start = time.perf_counter()
result = await func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
model = result.model
inp, out = MODEL_PRICING.get(model, (0, 0))
cost = (result.usage.input_tokens * inp
+ result.usage.output_tokens * out) / 1_000_000
entry = LLMCallLog(
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ"),
trace_id = trace_id,
model = model,
feature = feature,
input_tokens = result.usage.input_tokens,
output_tokens = result.usage.output_tokens,
cost_usd = round(cost, 6),
latency_ms = round(elapsed, 1),
ttft_ms = round(elapsed * 0.15, 1),
temperature = kwargs.get("temperature", 1.0),
prompt_hash = hashlib.sha256(
str(args[0]).encode()).hexdigest()[:12],
finish_reason = result.choices[0].finish_reason,
)
logger.info(json.dumps(asdict(entry)))
return result
return wrapper
return decorator
Each call produces a JSON log entry like this:
{
"timestamp": "2026-02-26T14:23:07Z",
"trace_id": "a3f2b1c9d4e5f678",
"model": "gpt-4o",
"feature": "document-summarizer",
"input_tokens": 2847,
"output_tokens": 312,
"cost_usd": 0.010237,
"latency_ms": 1834.2,
"ttft_ms": 275.1,
"temperature": 0.3,
"prompt_hash": "e4a2f9c81b3d",
"finish_reason": "stop"
}
Two design decisions worth calling out. First, we log the prompt_hash rather than the full prompt—this lets you detect duplicate prompts and group requests by template without storing sensitive user content in every log line. Second, the feature field enables cost attribution: you'll know exactly which endpoint or pipeline stage generated each call. For debugging, adopt a tiered content strategy: always log metadata (the fields above), sample 10% of full prompts, and always log full prompts for requests that error. This balances privacy against debuggability.
The attribute names here are simplified for clarity. In production, consider aligning with the OpenTelemetry GenAI semantic conventions—names like gen_ai.request.model, gen_ai.usage.input_tokens, and gen_ai.response.finish_reasons. Using OTel conventions means any compatible backend (Grafana, Jaeger, Langfuse) can parse your spans without custom configuration.
Cost Tracking and Attribution
The most common production surprise is the monthly bill. "Why did our LLM spend double this month?" Without cost attribution, the answer is always a shrug. The CostTracker below aggregates per-request costs by feature, detects anomalies using rolling statistics, and generates daily reports that pinpoint exactly where the money went.
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
class CostTracker:
def __init__(self, anomaly_sigma=2.0):
self.costs = defaultdict(list) # feature -> [(ts, cost)]
self.anomaly_sigma = anomaly_sigma
def record(self, feature, cost_usd, timestamp=None):
ts = timestamp or datetime.utcnow()
self.costs[feature].append((ts, cost_usd))
def feature_cost(self, feature, window=timedelta(hours=24)):
cutoff = datetime.utcnow() - window
return sum(c for ts, c in self.costs[feature] if ts > cutoff)
def detect_anomaly(self, feature, window=timedelta(hours=1)):
cutoff = datetime.utcnow() - window
recent = [c for ts, c in self.costs[feature] if ts > cutoff]
older = [c for ts, c in self.costs[feature] if ts <= cutoff]
if len(older) < 10:
return None # not enough baseline data
baseline_mean = statistics.mean(older)
baseline_std = statistics.stdev(older) or 0.001
current_mean = statistics.mean(recent) if recent else 0
z_score = (current_mean - baseline_mean) / baseline_std
if z_score > self.anomaly_sigma:
return {
"feature": feature,
"current_rate": round(current_mean, 4),
"baseline_rate": round(baseline_mean, 4),
"deviation": f"{z_score:.1f}\u03c3",
"severity": "critical" if z_score > 3.0 else "warning",
}
return None
def daily_report(self):
today = datetime.utcnow().strftime("%Y-%m-%d")
report = {"date": today, "total_cost": 0, "by_feature": {}}
for feature, entries in self.costs.items():
day_cost = sum(c for ts, c in entries
if ts.strftime("%Y-%m-%d") == today)
report["by_feature"][feature] = round(day_cost, 2)
report["total_cost"] += day_cost
report["total_cost"] = round(report["total_cost"], 2)
return report
Run daily_report() at midnight and you get output like this:
Feature 'document-summarizer' consumed $847.23 this week (up 34% from last week) Root cause: average prompt length increased from 2,100 to 3,400 tokens after the template change on Tuesday. Feature 'chatbot' consumed $412.67 this week (down 8%) Cache hit rate improved from 62% to 71%.
The anomaly detector uses a simple z-score against the historical baseline for each feature. A z-score above 2.0 triggers a warning; above 3.0, a critical alert. This catches the classic scenario: someone edits a prompt template, inadvertently doubling the token count, and costs creep up 40% over three days before anyone checks the bill.
Cost tracking also reveals caching effectiveness. If you've implemented LLM response caching, a dropping cache hit rate shows up instantly as a cost spike—often the earliest signal that traffic patterns have changed.
Latency Analysis—Beyond Average Response Time
LLM latency has characteristics that make traditional percentile monitoring insufficient. It's bimodal (cached vs. uncached), output-length-dependent (longer responses take longer to generate), model-specific (GPT-4o and GPT-4o-mini have very different profiles), and time-of-day sensitive (provider rate limiting during peak hours). Reporting a single p95 across all requests mixes these populations and hides real regressions.
from collections import defaultdict
class LatencyAnalyzer:
def __init__(self):
self.latencies = defaultdict(list) # (model, bucket) -> [ms]
self.ttft = defaultdict(list)
def _bucket(self, tokens):
if tokens < 500: return "<500"
if tokens < 1000: return "500-1k"
if tokens < 2000: return "1k-2k"
return "2k+"
def record(self, model, input_tokens, latency_ms, ttft_ms):
bucket = self._bucket(input_tokens)
self.latencies[(model, bucket)].append(latency_ms)
self.ttft[(model, bucket)].append(ttft_ms)
def percentiles(self, model, bucket=None):
if bucket:
data = self.latencies.get((model, bucket), [])
else:
data = [v for (m, _), vals in self.latencies.items()
if m == model for v in vals]
if len(data) < 5:
return None
data_sorted = sorted(data)
n = len(data_sorted)
return {
"p50": data_sorted[n // 2],
"p95": data_sorted[int(n * 0.95)],
"p99": data_sorted[int(n * 0.99)],
"count": n,
}
def detect_regression(self, model, window_size=100):
all_data = [v for (m, _), vals in self.latencies.items()
if m == model for v in vals]
if len(all_data) < window_size * 2:
return None
old = all_data[-window_size * 2 : -window_size]
new = all_data[-window_size:]
old_p95 = sorted(old)[int(len(old) * 0.95)]
new_p95 = sorted(new)[int(len(new) * 0.95)]
change_pct = (new_p95 - old_p95) / old_p95 * 100
if change_pct > 20:
return {"model": model, "old_p95": round(old_p95),
"new_p95": round(new_p95),
"change": f"+{change_pct:.0f}%"}
return None
The key design choice is segmenting by both model and prompt-length bucket. This prevents a classic false negative: your overall p95 looks fine because 80% of requests are fast GPT-4o-mini calls, but the 20% that hit GPT-4o are consistently timing out. The detect_regression method compares the latest window against the previous one and flags any p95 increase above 20%.
Track time-to-first-token (TTFT) separately from total latency. TTFT is what users feel in a streaming UI—if TTFT spikes, the app feels broken even if total latency is acceptable. A rule of thumb: p95 TTFT should be within 3–4× of p50. A wider spread signals a systemic issue like provider rate limiting, not just normal variance. If you've run our latency benchmarks, production monitoring is how you validate whether those numbers hold up under real traffic.
Quality Monitoring—Catching Degradation Before Users Do
This is the hardest observability pillar. Cost and latency are numbers—you can measure them precisely on every request. Quality is a spectrum, and measuring it at scale requires layering three complementary signals from cheap-and-fast to expensive-and-accurate.
import statistics
from collections import deque
class QualityMonitor:
def __init__(self, golden_embeddings=None):
self.length_history = deque(maxlen=10_000)
self.golden_embeddings = golden_embeddings or []
self.judge_scores = deque(maxlen=1_000)
self.quality_log = []
def record_response(self, response_text, embedding=None):
word_count = len(response_text.split())
self.length_history.append(word_count)
sim_score = None
if embedding and self.golden_embeddings:
sim_score = max(
self._cosine_sim(embedding, gold)
for gold in self.golden_embeddings
)
self.quality_log.append({
"length": word_count, "similarity": sim_score
})
def record_judge_score(self, score):
"""Record an LLM-as-judge quality score (1-5 scale)."""
self.judge_scores.append(score)
def health_score(self):
scores = {}
# Signal 1: length stability (recent 100 vs previous 500)
if len(self.length_history) > 200:
recent = list(self.length_history)[-100:]
baseline = list(self.length_history)[-600:-100]
if baseline:
drift = abs(statistics.mean(recent)
- statistics.mean(baseline))
norm = drift / (statistics.mean(baseline) + 1)
scores["length_stability"] = max(0, 1 - norm)
# Signal 2: semantic similarity to golden responses
sims = [e["similarity"] for e in self.quality_log[-100:]
if e["similarity"] is not None]
if sims:
scores["semantic_similarity"] = statistics.mean(sims)
# Signal 3: LLM-as-judge average
if self.judge_scores:
scores["judge_score"] = (
statistics.mean(list(self.judge_scores)[-50:]) / 5.0
)
# Combined health: weighted average
if scores:
w = {"length_stability": 0.2,
"semantic_similarity": 0.4,
"judge_score": 0.4}
total_w = sum(w[k] for k in scores)
combined = sum(scores[k] * w[k] for k in scores)
scores["overall"] = round(combined / total_w, 3)
return scores
@staticmethod
def _cosine_sim(a, b):
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x ** 2 for x in a) ** 0.5
norm_b = sum(x ** 2 for x in b) ** 0.5
return dot / (norm_a * norm_b + 1e-9)
The three signals work together as a quality funnel:
- Response length tracking is statistical, nearly free, and runs on 100% of traffic. A sudden shift in average response length (say, from 120 words to 45 words) is a strong signal that something changed—a prompt template edit, a model update, or a system prompt regression.
- Semantic similarity compares new responses against a curated set of “golden” reference answers using cosine similarity on embeddings. It's more expensive (requires an embedding call per evaluation) but catches semantic drift that length tracking misses. Run this on hourly aggregates, not every request.
- LLM-as-judge samples 1–5% of responses and sends them to a judge model that rates quality on a 1–5 scale. It's the most accurate signal but the most expensive. Use it to calibrate the cheaper signals and to investigate when they trigger.
Here's a real scenario this catches: a model provider silently updates their model weights. Response length stays similar, but semantic similarity to your golden set drops from 0.91 to 0.78 over two hours. The quality monitor fires a warning before any user complains. Without monitoring, you'd discover the regression from a support ticket three days later.
Distributed Tracing for Multi-Step Pipelines
Modern LLM applications rarely make a single API call per request. A typical RAG pipeline chains retrieval, reranking, generation, and guardrail checks—each with its own latency, token cost, and failure mode. Traditional request logs show each call independently. Distributed tracing connects them into a single request waterfall, revealing where time and money actually go.
import time, uuid
from dataclasses import dataclass, field
from contextlib import contextmanager
@dataclass
class Span:
name: str
span_id: str = field(
default_factory=lambda: uuid.uuid4().hex[:8])
start_ms: float = 0
end_ms: float = 0
tokens_in: int = 0
tokens_out: int = 0
cost_usd: float = 0
attributes: dict = field(default_factory=dict)
@dataclass
class Trace:
trace_id: str = field(
default_factory=lambda: uuid.uuid4().hex[:16])
spans: list = field(default_factory=list)
@contextmanager
def span(self, name):
s = Span(name=name,
start_ms=time.perf_counter() * 1000)
try:
yield s
finally:
s.end_ms = time.perf_counter() * 1000
self.spans.append(s)
def summary(self):
total_ms = sum(s.end_ms - s.start_ms
for s in self.spans)
total_cost = sum(s.cost_usd for s in self.spans)
total_tok = sum(s.tokens_in + s.tokens_out
for s in self.spans)
lines = [f"Trace {self.trace_id} \u2014 "
f"{total_ms:.0f}ms, ${total_cost:.4f}, "
f"{total_tok} tokens"]
for i, s in enumerate(self.spans):
dur = s.end_ms - s.start_ms
pct = (dur / total_ms * 100) if total_ms else 0
prefix = "\u2514\u2500" if i == len(self.spans) - 1 else "\u251c\u2500"
lines.append(
f" {prefix} {s.name}: {dur:.0f}ms ({pct:.0f}%) "
f"[{s.tokens_in}+{s.tokens_out} tok, "
f"${s.cost_usd:.4f}]")
return "\n".join(lines)
Wrap each pipeline stage in a trace.span() context manager, set its token counts and cost, and call trace.summary() at the end. A typical RAG request trace looks like:
Trace a3f2b1c9d4e5f678 — 2340ms, $0.0047, 6082 tokens ├─ embed query: 45ms (2%) [12+0 tok, $0.0000] ├─ vector search: 120ms (5%) [0+0 tok, $0.0000] ├─ rerank: 340ms (15%) [2100+50 tok, $0.0003] ├─ chat gpt-4o: 1755ms (75%) [3400+350 tok, $0.0042] └─ guardrail: 80ms (3%) [160+10 tok, $0.0002]
This immediately reveals that 75% of latency and 89% of cost come from the generation step. If you need to optimize, that's where to focus—not on the retrieval pipeline that takes 2% of the time. It also connects to the profiling post: profiling finds bottlenecks in development; tracing finds them in production. Together, they cover the full optimization lifecycle.
For production systems, use OpenTelemetry GenAI span conventions: name spans like chat gpt-4o or embed text-embedding-3-small, and put variable data (token counts, cost) in span attributes. Stable span names enable proper aggregation across millions of traces.
Alerting and Anomaly Detection
Monitoring data is useless without alerts, but static thresholds generate noise. Monday morning traffic is always 40% higher than Sunday—alerting on that teaches your team to ignore pages. The following alert system uses adaptive thresholds that adjust for time-of-day and day-of-week patterns.
from dataclasses import dataclass
from enum import Enum
import statistics
class Severity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: Severity
message: str
value: float
threshold: float
class AdaptiveAlertSystem:
def __init__(self):
self.history = {} # metric -> [(hour_of_week, value)]
def record(self, metric, value, hour_of_week):
self.history.setdefault(metric, []).append(
(hour_of_week, value))
def _adaptive_threshold(self, metric, hour_of_week,
base_threshold, sigma=2.0):
same_hour = [v for h, v in self.history.get(metric, [])
if h == hour_of_week]
if len(same_hour) < 4:
return base_threshold
mean = statistics.mean(same_hour)
std = (statistics.stdev(same_hour)
if len(same_hour) > 1 else 0)
return mean + sigma * std
def evaluate(self, metrics, hour_of_week):
alerts = []
rules = [
("cost_per_min", 0.50, Severity.CRITICAL,
"Cost spike: ${value:.2f}/min "
"(threshold: ${threshold:.2f})"),
("latency_p95", 5000, Severity.WARNING,
"Latency regression: {value:.0f}ms p95 "
"(threshold: {threshold:.0f}ms)"),
("quality_score", 0.70, Severity.WARNING,
"Quality drop: {value:.2f} "
"(floor: {threshold:.2f})"),
("error_rate", 0.05, Severity.CRITICAL,
"Error surge: {value:.1%} "
"(threshold: {threshold:.1%})"),
]
for metric, base, severity, template in rules:
value = metrics.get(metric)
if value is None:
continue
threshold = self._adaptive_threshold(
metric, hour_of_week, base)
self.record(metric, value, hour_of_week)
is_floor = metric == "quality_score"
triggered = (value < threshold if is_floor
else value > threshold)
if triggered:
alerts.append(Alert(
name=metric, severity=severity,
value=value, threshold=threshold,
message=template.format(
value=value, threshold=threshold),
))
return alerts
The adaptive threshold works by looking at historical values for the same hour of the week. If your latency p95 is always 3,200 ms on Monday at 10am, the system learns that baseline and only alerts when the value exceeds the learned mean plus two standard deviations. This eliminates the false positives that plague static-threshold systems.
Route alerts by severity: critical alerts (cost spike >50%, error rate >5%) go to PagerDuty. Warnings go to a Slack channel. Informational alerts feed a daily digest. The goal is a signal-to-noise ratio high enough that your team actually reads the alerts instead of muting the channel.
Try It: LLM Dashboard Simulator
A simulated production dashboard tracking all four pillars in real time. Watch for anomalies—cost spikes, latency regressions, and quality drops appear as the simulation runs. Adjust the sliders to see how traffic patterns affect each metric.
Building Your Stack—Decision Guide
You have three realistic paths to LLM observability, each with distinct tradeoffs. The right choice depends on your team size, budget, and how much you need to customize.
| DIY (OTel + Grafana) | Open Source (Langfuse) | Managed SaaS | |
|---|---|---|---|
| Setup time | 2–4 weeks | 2–5 days | Hours |
| Cost @ 1M traces/mo | ~$50 (infra only) | ~$200 (self-hosted) | $10K–$50K |
| Customization | Full control | Moderate | Limited |
| Quality monitoring | Build from scratch | Built-in evaluators | Pre-built dashboards |
| Data residency | Full control | Self-hosted option | Vendor-dependent |
| Maintenance | High (you own it) | Medium (community) | None (vendor) |
The second question is which pillar to start with. Not all observability is equally valuable from day one:
| Pillar | Signal Type | Coverage | Catches |
|---|---|---|---|
| Cost Tracking | Token counts × pricing | 100% of traffic | Budget overruns, prompt bloat, cache misses |
| Latency Analysis | TTFT + total, segmented | 100% of traffic | Provider degradation, rate limiting |
| Quality Monitoring | Stats + embeddings + judge | Sampled (1–10%) | Model drift, hallucination spikes |
| Usage Intelligence | Request patterns, traces | Metadata only | Traffic anomalies, feature adoption |
The practical recommendation: instrument with OpenTelemetry from day one—you can swap backends later without re-instrumenting your code. Start with structured logging and cost tracking (highest ROI and lowest effort), add latency analysis when you have enough traffic for percentiles to be meaningful, then layer in quality monitoring as the application matures and you've curated golden response sets.
An underrated middle ground is the gateway pattern: route all LLM traffic through a proxy like LiteLLM or Helicone. A gateway gives you cost tracking, request logging, rate limiting, and failover with minimal code changes—no decorator needed on every call.
Try It: Alert Threshold Tuner
A week of simulated monitoring data contains 3 real incidents and 5 normal patterns that look suspicious. Adjust the thresholds to catch all 3 incidents with zero false alarms—this is the precision/recall tradeoff of alert tuning.
Conclusion
LLM observability isn't a nice-to-have—it's the difference between confidently scaling AI systems and anxiously hoping everything is fine. The four pillars (cost, latency, quality, usage) each catch problems the others miss: cost tracking catches prompt bloat, latency analysis catches provider degradation, quality monitoring catches semantic drift, and distributed tracing shows you exactly where to optimize.
The code in this post gives you working implementations of each pillar. Start with the log_llm_call decorator and the CostTracker—these two pieces alone will prevent the most common production surprises. Layer in LatencyAnalyzer and QualityMonitor as your system matures. And when your pipeline grows beyond a single LLM call, the Trace builder will show you where time and money actually go.
The best observability setup is the one you actually build. Start simple, instrument everything, and let the data tell you where to invest next. Because in production, the only thing worse than a broken AI system is a broken AI system that nobody knows is broken.
References & Further Reading
- OpenTelemetry — GenAI Semantic Conventions — the emerging standard for instrumenting LLM calls with OTel spans and attributes
- Langfuse — open-source LLM observability platform with tracing, evaluations, and prompt management
- Arize Phoenix — open-source LLM tracing and evaluation library built on OpenTelemetry
- OpenLLMetry — auto-instrumentation for OpenAI, Anthropic, Cohere, and vector databases using pure OTel
- LiteLLM — unified LLM API proxy with cost tracking, rate limiting, and failover
- Helicone — open-source LLM gateway providing observability, caching, and usage analytics
- Sentry — Core KPIs for LLM Performance — practical guide to choosing which metrics to track first