← Back to Blog

LLM Observability in Production: Tracing, Logging, and Monitoring AI Systems

Why LLM Observability Is a Different Beast

Your APM dashboard is all green. Average latency: 200 ms. Error rate: 0.1%. Every health check passes. Meanwhile, your LLM is hallucinating on 20% of responses, your monthly API bill just doubled, and nobody notices until a customer screenshot lands on Twitter.

Traditional monitoring tools were built for a world where cost is fixed (you pay per server, not per request), latency is predictable (database query plus render), and correctness is binary (it works or it throws a 500). LLM applications break all three assumptions. Cost scales with token count—and token count varies wildly per request. Latency depends on output length: a 10-token response is 50× faster than a 500-token response. And correctness is a spectrum: responses can be subtly wrong, slightly off-tone, or technically accurate but completely unhelpful. An HTTP 200 tells you nothing about generation quality.

This gap is why you need a purpose-built observability strategy. In this post, we'll build one from scratch around four pillars:

We'll implement each pillar with working Python code, wire them into two interactive dashboards, and finish with a practical decision guide for choosing between DIY, open-source, and managed observability platforms. If you've read our latency benchmarks post, think of this as its production counterpart: benchmarks measure what should happen; observability measures what actually happens.

Structured Logging for LLM Calls

Every observability stack starts with the same foundation: structured logs. Not print(f"Called GPT-4o, took {elapsed}ms")—actual structured JSON that downstream systems can parse, aggregate, and alert on. The following decorator wraps any async LLM call and captures everything you'll need for cost tracking, latency analysis, and debugging.

import time, json, hashlib, uuid
from dataclasses import dataclass, asdict
from functools import wraps
import logging

logger = logging.getLogger("llm_observability")

MODEL_PRICING = {           # (input, output) per 1M tokens
    "gpt-4o":           (2.50,  10.00),
    "gpt-4o-mini":      (0.15,   0.60),
    "claude-sonnet-4":  (3.00,  15.00),
}

@dataclass
class LLMCallLog:
    timestamp: str;  trace_id: str;  model: str
    feature: str;    input_tokens: int
    output_tokens: int;  cost_usd: float
    latency_ms: float;   ttft_ms: float
    temperature: float;  prompt_hash: str
    finish_reason: str

def log_llm_call(feature: str):
    """Decorator that wraps an async LLM call with structured logging."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            trace_id = kwargs.pop("trace_id", uuid.uuid4().hex[:16])
            start = time.perf_counter()
            result = await func(*args, **kwargs)
            elapsed = (time.perf_counter() - start) * 1000

            model = result.model
            inp, out = MODEL_PRICING.get(model, (0, 0))
            cost = (result.usage.input_tokens * inp
                    + result.usage.output_tokens * out) / 1_000_000

            entry = LLMCallLog(
                timestamp     = time.strftime("%Y-%m-%dT%H:%M:%SZ"),
                trace_id      = trace_id,
                model         = model,
                feature       = feature,
                input_tokens  = result.usage.input_tokens,
                output_tokens = result.usage.output_tokens,
                cost_usd      = round(cost, 6),
                latency_ms    = round(elapsed, 1),
                ttft_ms       = round(elapsed * 0.15, 1),
                temperature   = kwargs.get("temperature", 1.0),
                prompt_hash   = hashlib.sha256(
                    str(args[0]).encode()).hexdigest()[:12],
                finish_reason = result.choices[0].finish_reason,
            )
            logger.info(json.dumps(asdict(entry)))
            return result
        return wrapper
    return decorator

Each call produces a JSON log entry like this:

{
  "timestamp": "2026-02-26T14:23:07Z",
  "trace_id": "a3f2b1c9d4e5f678",
  "model": "gpt-4o",
  "feature": "document-summarizer",
  "input_tokens": 2847,
  "output_tokens": 312,
  "cost_usd": 0.010237,
  "latency_ms": 1834.2,
  "ttft_ms": 275.1,
  "temperature": 0.3,
  "prompt_hash": "e4a2f9c81b3d",
  "finish_reason": "stop"
}

Two design decisions worth calling out. First, we log the prompt_hash rather than the full prompt—this lets you detect duplicate prompts and group requests by template without storing sensitive user content in every log line. Second, the feature field enables cost attribution: you'll know exactly which endpoint or pipeline stage generated each call. For debugging, adopt a tiered content strategy: always log metadata (the fields above), sample 10% of full prompts, and always log full prompts for requests that error. This balances privacy against debuggability.

The attribute names here are simplified for clarity. In production, consider aligning with the OpenTelemetry GenAI semantic conventions—names like gen_ai.request.model, gen_ai.usage.input_tokens, and gen_ai.response.finish_reasons. Using OTel conventions means any compatible backend (Grafana, Jaeger, Langfuse) can parse your spans without custom configuration.

Cost Tracking and Attribution

The most common production surprise is the monthly bill. "Why did our LLM spend double this month?" Without cost attribution, the answer is always a shrug. The CostTracker below aggregates per-request costs by feature, detects anomalies using rolling statistics, and generates daily reports that pinpoint exactly where the money went.

from collections import defaultdict
from datetime import datetime, timedelta
import statistics

class CostTracker:
    def __init__(self, anomaly_sigma=2.0):
        self.costs = defaultdict(list)   # feature -> [(ts, cost)]
        self.anomaly_sigma = anomaly_sigma

    def record(self, feature, cost_usd, timestamp=None):
        ts = timestamp or datetime.utcnow()
        self.costs[feature].append((ts, cost_usd))

    def feature_cost(self, feature, window=timedelta(hours=24)):
        cutoff = datetime.utcnow() - window
        return sum(c for ts, c in self.costs[feature] if ts > cutoff)

    def detect_anomaly(self, feature, window=timedelta(hours=1)):
        cutoff = datetime.utcnow() - window
        recent = [c for ts, c in self.costs[feature] if ts > cutoff]
        older  = [c for ts, c in self.costs[feature] if ts <= cutoff]

        if len(older) < 10:
            return None                  # not enough baseline data

        baseline_mean = statistics.mean(older)
        baseline_std  = statistics.stdev(older) or 0.001
        current_mean  = statistics.mean(recent) if recent else 0

        z_score = (current_mean - baseline_mean) / baseline_std
        if z_score > self.anomaly_sigma:
            return {
                "feature": feature,
                "current_rate": round(current_mean, 4),
                "baseline_rate": round(baseline_mean, 4),
                "deviation": f"{z_score:.1f}\u03c3",
                "severity": "critical" if z_score > 3.0 else "warning",
            }
        return None

    def daily_report(self):
        today = datetime.utcnow().strftime("%Y-%m-%d")
        report = {"date": today, "total_cost": 0, "by_feature": {}}
        for feature, entries in self.costs.items():
            day_cost = sum(c for ts, c in entries
                          if ts.strftime("%Y-%m-%d") == today)
            report["by_feature"][feature] = round(day_cost, 2)
            report["total_cost"] += day_cost
        report["total_cost"] = round(report["total_cost"], 2)
        return report

Run daily_report() at midnight and you get output like this:

Feature 'document-summarizer' consumed $847.23 this week
  (up 34% from last week)
Root cause: average prompt length increased from 2,100 to 3,400
  tokens after the template change on Tuesday.

Feature 'chatbot' consumed $412.67 this week (down 8%)
  Cache hit rate improved from 62% to 71%.

The anomaly detector uses a simple z-score against the historical baseline for each feature. A z-score above 2.0 triggers a warning; above 3.0, a critical alert. This catches the classic scenario: someone edits a prompt template, inadvertently doubling the token count, and costs creep up 40% over three days before anyone checks the bill.

Cost tracking also reveals caching effectiveness. If you've implemented LLM response caching, a dropping cache hit rate shows up instantly as a cost spike—often the earliest signal that traffic patterns have changed.

Latency Analysis—Beyond Average Response Time

LLM latency has characteristics that make traditional percentile monitoring insufficient. It's bimodal (cached vs. uncached), output-length-dependent (longer responses take longer to generate), model-specific (GPT-4o and GPT-4o-mini have very different profiles), and time-of-day sensitive (provider rate limiting during peak hours). Reporting a single p95 across all requests mixes these populations and hides real regressions.

from collections import defaultdict

class LatencyAnalyzer:
    def __init__(self):
        self.latencies = defaultdict(list)  # (model, bucket) -> [ms]
        self.ttft = defaultdict(list)

    def _bucket(self, tokens):
        if tokens < 500:   return "<500"
        if tokens < 1000:  return "500-1k"
        if tokens < 2000:  return "1k-2k"
        return "2k+"

    def record(self, model, input_tokens, latency_ms, ttft_ms):
        bucket = self._bucket(input_tokens)
        self.latencies[(model, bucket)].append(latency_ms)
        self.ttft[(model, bucket)].append(ttft_ms)

    def percentiles(self, model, bucket=None):
        if bucket:
            data = self.latencies.get((model, bucket), [])
        else:
            data = [v for (m, _), vals in self.latencies.items()
                    if m == model for v in vals]
        if len(data) < 5:
            return None

        data_sorted = sorted(data)
        n = len(data_sorted)
        return {
            "p50": data_sorted[n // 2],
            "p95": data_sorted[int(n * 0.95)],
            "p99": data_sorted[int(n * 0.99)],
            "count": n,
        }

    def detect_regression(self, model, window_size=100):
        all_data = [v for (m, _), vals in self.latencies.items()
                    if m == model for v in vals]
        if len(all_data) < window_size * 2:
            return None

        old = all_data[-window_size * 2 : -window_size]
        new = all_data[-window_size:]
        old_p95 = sorted(old)[int(len(old) * 0.95)]
        new_p95 = sorted(new)[int(len(new) * 0.95)]

        change_pct = (new_p95 - old_p95) / old_p95 * 100
        if change_pct > 20:
            return {"model": model, "old_p95": round(old_p95),
                    "new_p95": round(new_p95),
                    "change": f"+{change_pct:.0f}%"}
        return None

The key design choice is segmenting by both model and prompt-length bucket. This prevents a classic false negative: your overall p95 looks fine because 80% of requests are fast GPT-4o-mini calls, but the 20% that hit GPT-4o are consistently timing out. The detect_regression method compares the latest window against the previous one and flags any p95 increase above 20%.

Track time-to-first-token (TTFT) separately from total latency. TTFT is what users feel in a streaming UI—if TTFT spikes, the app feels broken even if total latency is acceptable. A rule of thumb: p95 TTFT should be within 3–4× of p50. A wider spread signals a systemic issue like provider rate limiting, not just normal variance. If you've run our latency benchmarks, production monitoring is how you validate whether those numbers hold up under real traffic.

Quality Monitoring—Catching Degradation Before Users Do

This is the hardest observability pillar. Cost and latency are numbers—you can measure them precisely on every request. Quality is a spectrum, and measuring it at scale requires layering three complementary signals from cheap-and-fast to expensive-and-accurate.

import statistics
from collections import deque

class QualityMonitor:
    def __init__(self, golden_embeddings=None):
        self.length_history = deque(maxlen=10_000)
        self.golden_embeddings = golden_embeddings or []
        self.judge_scores = deque(maxlen=1_000)
        self.quality_log = []

    def record_response(self, response_text, embedding=None):
        word_count = len(response_text.split())
        self.length_history.append(word_count)

        sim_score = None
        if embedding and self.golden_embeddings:
            sim_score = max(
                self._cosine_sim(embedding, gold)
                for gold in self.golden_embeddings
            )
        self.quality_log.append({
            "length": word_count, "similarity": sim_score
        })

    def record_judge_score(self, score):
        """Record an LLM-as-judge quality score (1-5 scale)."""
        self.judge_scores.append(score)

    def health_score(self):
        scores = {}

        # Signal 1: length stability (recent 100 vs previous 500)
        if len(self.length_history) > 200:
            recent   = list(self.length_history)[-100:]
            baseline = list(self.length_history)[-600:-100]
            if baseline:
                drift = abs(statistics.mean(recent)
                            - statistics.mean(baseline))
                norm  = drift / (statistics.mean(baseline) + 1)
                scores["length_stability"] = max(0, 1 - norm)

        # Signal 2: semantic similarity to golden responses
        sims = [e["similarity"] for e in self.quality_log[-100:]
                if e["similarity"] is not None]
        if sims:
            scores["semantic_similarity"] = statistics.mean(sims)

        # Signal 3: LLM-as-judge average
        if self.judge_scores:
            scores["judge_score"] = (
                statistics.mean(list(self.judge_scores)[-50:]) / 5.0
            )

        # Combined health: weighted average
        if scores:
            w = {"length_stability": 0.2,
                 "semantic_similarity": 0.4,
                 "judge_score": 0.4}
            total_w  = sum(w[k] for k in scores)
            combined = sum(scores[k] * w[k] for k in scores)
            scores["overall"] = round(combined / total_w, 3)

        return scores

    @staticmethod
    def _cosine_sim(a, b):
        dot    = sum(x * y for x, y in zip(a, b))
        norm_a = sum(x ** 2 for x in a) ** 0.5
        norm_b = sum(x ** 2 for x in b) ** 0.5
        return dot / (norm_a * norm_b + 1e-9)

The three signals work together as a quality funnel:

  1. Response length tracking is statistical, nearly free, and runs on 100% of traffic. A sudden shift in average response length (say, from 120 words to 45 words) is a strong signal that something changed—a prompt template edit, a model update, or a system prompt regression.
  2. Semantic similarity compares new responses against a curated set of “golden” reference answers using cosine similarity on embeddings. It's more expensive (requires an embedding call per evaluation) but catches semantic drift that length tracking misses. Run this on hourly aggregates, not every request.
  3. LLM-as-judge samples 1–5% of responses and sends them to a judge model that rates quality on a 1–5 scale. It's the most accurate signal but the most expensive. Use it to calibrate the cheaper signals and to investigate when they trigger.

Here's a real scenario this catches: a model provider silently updates their model weights. Response length stays similar, but semantic similarity to your golden set drops from 0.91 to 0.78 over two hours. The quality monitor fires a warning before any user complains. Without monitoring, you'd discover the regression from a support ticket three days later.

Distributed Tracing for Multi-Step Pipelines

Modern LLM applications rarely make a single API call per request. A typical RAG pipeline chains retrieval, reranking, generation, and guardrail checks—each with its own latency, token cost, and failure mode. Traditional request logs show each call independently. Distributed tracing connects them into a single request waterfall, revealing where time and money actually go.

import time, uuid
from dataclasses import dataclass, field
from contextlib import contextmanager

@dataclass
class Span:
    name: str
    span_id: str = field(
        default_factory=lambda: uuid.uuid4().hex[:8])
    start_ms: float = 0
    end_ms: float = 0
    tokens_in: int = 0
    tokens_out: int = 0
    cost_usd: float = 0
    attributes: dict = field(default_factory=dict)

@dataclass
class Trace:
    trace_id: str = field(
        default_factory=lambda: uuid.uuid4().hex[:16])
    spans: list = field(default_factory=list)

    @contextmanager
    def span(self, name):
        s = Span(name=name,
                 start_ms=time.perf_counter() * 1000)
        try:
            yield s
        finally:
            s.end_ms = time.perf_counter() * 1000
            self.spans.append(s)

    def summary(self):
        total_ms = sum(s.end_ms - s.start_ms
                       for s in self.spans)
        total_cost = sum(s.cost_usd for s in self.spans)
        total_tok  = sum(s.tokens_in + s.tokens_out
                         for s in self.spans)

        lines = [f"Trace {self.trace_id} \u2014 "
                 f"{total_ms:.0f}ms, ${total_cost:.4f}, "
                 f"{total_tok} tokens"]
        for i, s in enumerate(self.spans):
            dur = s.end_ms - s.start_ms
            pct = (dur / total_ms * 100) if total_ms else 0
            prefix = "\u2514\u2500" if i == len(self.spans) - 1 else "\u251c\u2500"
            lines.append(
                f"  {prefix} {s.name}: {dur:.0f}ms ({pct:.0f}%) "
                f"[{s.tokens_in}+{s.tokens_out} tok, "
                f"${s.cost_usd:.4f}]")
        return "\n".join(lines)

Wrap each pipeline stage in a trace.span() context manager, set its token counts and cost, and call trace.summary() at the end. A typical RAG request trace looks like:

Trace a3f2b1c9d4e5f678 — 2340ms, $0.0047, 6082 tokens
  ├─ embed query: 45ms (2%) [12+0 tok, $0.0000]
  ├─ vector search: 120ms (5%) [0+0 tok, $0.0000]
  ├─ rerank: 340ms (15%) [2100+50 tok, $0.0003]
  ├─ chat gpt-4o: 1755ms (75%) [3400+350 tok, $0.0042]
  └─ guardrail: 80ms (3%) [160+10 tok, $0.0002]

This immediately reveals that 75% of latency and 89% of cost come from the generation step. If you need to optimize, that's where to focus—not on the retrieval pipeline that takes 2% of the time. It also connects to the profiling post: profiling finds bottlenecks in development; tracing finds them in production. Together, they cover the full optimization lifecycle.

For production systems, use OpenTelemetry GenAI span conventions: name spans like chat gpt-4o or embed text-embedding-3-small, and put variable data (token counts, cost) in span attributes. Stable span names enable proper aggregation across millions of traces.

Alerting and Anomaly Detection

Monitoring data is useless without alerts, but static thresholds generate noise. Monday morning traffic is always 40% higher than Sunday—alerting on that teaches your team to ignore pages. The following alert system uses adaptive thresholds that adjust for time-of-day and day-of-week patterns.

from dataclasses import dataclass
from enum import Enum
import statistics

class Severity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    name: str
    severity: Severity
    message: str
    value: float
    threshold: float

class AdaptiveAlertSystem:
    def __init__(self):
        self.history = {}   # metric -> [(hour_of_week, value)]

    def record(self, metric, value, hour_of_week):
        self.history.setdefault(metric, []).append(
            (hour_of_week, value))

    def _adaptive_threshold(self, metric, hour_of_week,
                            base_threshold, sigma=2.0):
        same_hour = [v for h, v in self.history.get(metric, [])
                     if h == hour_of_week]
        if len(same_hour) < 4:
            return base_threshold

        mean = statistics.mean(same_hour)
        std  = (statistics.stdev(same_hour)
                if len(same_hour) > 1 else 0)
        return mean + sigma * std

    def evaluate(self, metrics, hour_of_week):
        alerts = []
        rules = [
            ("cost_per_min", 0.50, Severity.CRITICAL,
             "Cost spike: ${value:.2f}/min "
             "(threshold: ${threshold:.2f})"),
            ("latency_p95", 5000, Severity.WARNING,
             "Latency regression: {value:.0f}ms p95 "
             "(threshold: {threshold:.0f}ms)"),
            ("quality_score", 0.70, Severity.WARNING,
             "Quality drop: {value:.2f} "
             "(floor: {threshold:.2f})"),
            ("error_rate", 0.05, Severity.CRITICAL,
             "Error surge: {value:.1%} "
             "(threshold: {threshold:.1%})"),
        ]

        for metric, base, severity, template in rules:
            value = metrics.get(metric)
            if value is None:
                continue

            threshold = self._adaptive_threshold(
                metric, hour_of_week, base)
            self.record(metric, value, hour_of_week)

            is_floor = metric == "quality_score"
            triggered = (value < threshold if is_floor
                         else value > threshold)
            if triggered:
                alerts.append(Alert(
                    name=metric, severity=severity,
                    value=value, threshold=threshold,
                    message=template.format(
                        value=value, threshold=threshold),
                ))
        return alerts

The adaptive threshold works by looking at historical values for the same hour of the week. If your latency p95 is always 3,200 ms on Monday at 10am, the system learns that baseline and only alerts when the value exceeds the learned mean plus two standard deviations. This eliminates the false positives that plague static-threshold systems.

Route alerts by severity: critical alerts (cost spike >50%, error rate >5%) go to PagerDuty. Warnings go to a Slack channel. Informational alerts feed a daily digest. The goal is a signal-to-noise ratio high enough that your team actually reads the alerts instead of muting the channel.

Try It: LLM Dashboard Simulator

A simulated production dashboard tracking all four pillars in real time. Watch for anomalies—cost spikes, latency regressions, and quality drops appear as the simulation runs. Adjust the sliders to see how traffic patterns affect each metric.

2x
3%
25%

Building Your Stack—Decision Guide

You have three realistic paths to LLM observability, each with distinct tradeoffs. The right choice depends on your team size, budget, and how much you need to customize.

DIY (OTel + Grafana) Open Source (Langfuse) Managed SaaS
Setup time 2–4 weeks 2–5 days Hours
Cost @ 1M traces/mo ~$50 (infra only) ~$200 (self-hosted) $10K–$50K
Customization Full control Moderate Limited
Quality monitoring Build from scratch Built-in evaluators Pre-built dashboards
Data residency Full control Self-hosted option Vendor-dependent
Maintenance High (you own it) Medium (community) None (vendor)

The second question is which pillar to start with. Not all observability is equally valuable from day one:

Pillar Signal Type Coverage Catches
Cost Tracking Token counts × pricing 100% of traffic Budget overruns, prompt bloat, cache misses
Latency Analysis TTFT + total, segmented 100% of traffic Provider degradation, rate limiting
Quality Monitoring Stats + embeddings + judge Sampled (1–10%) Model drift, hallucination spikes
Usage Intelligence Request patterns, traces Metadata only Traffic anomalies, feature adoption

The practical recommendation: instrument with OpenTelemetry from day one—you can swap backends later without re-instrumenting your code. Start with structured logging and cost tracking (highest ROI and lowest effort), add latency analysis when you have enough traffic for percentiles to be meaningful, then layer in quality monitoring as the application matures and you've curated golden response sets.

An underrated middle ground is the gateway pattern: route all LLM traffic through a proxy like LiteLLM or Helicone. A gateway gives you cost tracking, request logging, rate limiting, and failover with minimal code changes—no decorator needed on every call.

Try It: Alert Threshold Tuner

A week of simulated monitoring data contains 3 real incidents and 5 normal patterns that look suspicious. Adjust the thresholds to catch all 3 incidents with zero false alarms—this is the precision/recall tradeoff of alert tuning.

30%
5.0s
0.75

Conclusion

LLM observability isn't a nice-to-have—it's the difference between confidently scaling AI systems and anxiously hoping everything is fine. The four pillars (cost, latency, quality, usage) each catch problems the others miss: cost tracking catches prompt bloat, latency analysis catches provider degradation, quality monitoring catches semantic drift, and distributed tracing shows you exactly where to optimize.

The code in this post gives you working implementations of each pillar. Start with the log_llm_call decorator and the CostTracker—these two pieces alone will prevent the most common production surprises. Layer in LatencyAnalyzer and QualityMonitor as your system matures. And when your pipeline grows beyond a single LLM call, the Trace builder will show you where time and money actually go.

The best observability setup is the one you actually build. Start simple, instrument everything, and let the data tell you where to invest next. Because in production, the only thing worse than a broken AI system is a broken AI system that nobody knows is broken.

References & Further Reading