πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

LLM Observability: Tracing, Logging, and Token Usage

LLMOps in ProductionLLM Observability🟒 Free Lesson

Advertisement

LLM Observability: Tracing, Logging, and Token Usage

Observability is the cornerstone of reliable LLM operations. Unlike traditional software, LLM systems exhibit non-deterministic behavior that requires specialized monitoring approaches spanning token economics, latency profiling, and quality assessment.

Observability Pipeline

Three Pillars of LLM Observability

1. Distributed Tracing

Tracing captures the complete lifecycle of an LLM request across services. Each span represents a discrete operation such as prompt construction, model inference, or post-processing.

import uuid
import time
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from contextlib import contextmanager

@dataclass
class Span:
    span_id: str
    trace_id: str
    name: str
    start_time: float
    end_time: Optional[float] = None
    parent_id: Optional[str] = None
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict[str, Any]] = field(default_factory=list)

    @property
    def duration_ms(self) -> float:
        if self.end_time is None:
            return 0.0
        return (self.end_time - self.start_time) * 1000

    def add_event(self, name: str, attributes: Optional[Dict] = None):
        self.events.append({
            "name": name,
            "timestamp": time.time(),
            "attributes": attributes or {}
        })

@dataclass
class Trace:
    trace_id: str
    spans: List[Span] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)

class LLMTracer:
    def __init__(self):
        self.traces: Dict[str, Trace] = {}
        self._current_span_id: Optional[str] = None

    def start_trace(self, name: str, metadata: Optional[Dict] = None) -> str:
        trace_id = str(uuid.uuid4())
        root_span = Span(
            span_id=str(uuid.uuid4()),
            trace_id=trace_id,
            name=name,
            start_time=time.time()
        )
        trace = Trace(trace_id=trace_id, metadata=metadata or {})
        trace.spans.append(root_span)
        self.traces[trace_id] = trace
        self._current_span_id = root_span.span_id
        return trace_id

    @contextmanager
    def span(self, name: str, attributes: Optional[Dict] = None):
        trace_id = self._current_trace_id()
        span = Span(
            span_id=str(uuid.uuid4()),
            trace_id=trace_id,
            name=name,
            start_time=time.time(),
            parent_id=self._current_span_id,
            attributes=attributes or {}
        )
        self.traces[trace_id].spans.append(span)
        prev_span_id = self._current_span_id
        self._current_span_id = span.span_id
        try:
            yield span
        except Exception as e:
            span.add_event("error", {"message": str(e)})
            raise
        finally:
            span.end_time = time.time()
            self._current_span_id = prev_span_id

    def _current_trace_id(self) -> str:
        for trace_id, trace in self.traces.items():
            for span in trace.spans:
                if span.span_id == self._current_span_id:
                    return trace_id
        raise ValueError("No active trace")

    def get_trace(self, trace_id: str) -> Optional[Trace]:
        return self.traces.get(trace_id)

    def export_trace(self, trace_id: str) -> Dict[str, Any]:
        trace = self.traces.get(trace_id)
        if not trace:
            return {}
        return {
            "trace_id": trace.trace_id,
            "metadata": trace.metadata,
            "spans": [
                {
                    "span_id": s.span_id,
                    "name": s.name,
                    "duration_ms": s.duration_ms,
                    "parent_id": s.parent_id,
                    "attributes": s.attributes,
                    "events": s.events
                }
                for s in trace.spans
            ]
        }

2. Structured Logging

Structured logs provide machine-readable records of LLM operations. Each log entry captures input/output pairs, token counts, latency, and quality indicators.

import json
import time
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class LogLevel(Enum):
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"

@dataclass
class LLMLogEntry:
    timestamp: float
    level: str
    trace_id: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    latency_ms: float
    prompt_preview: str
    completion_preview: str
    cost_usd: float
    quality_score: Optional[float] = None
    error: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = None

class LLMLogger:
    def __init__(self, service_name: str, log_level: str = "INFO"):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        self.logger.setLevel(getattr(logging, log_level))
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)

    def log_llm_call(self, trace_id: str, model: str, prompt_tokens: int,
                     completion_tokens: int, latency_ms: float,
                     prompt: str, completion: str, cost_usd: float,
                     quality_score: Optional[float] = None,
                     error: Optional[str] = None) -> str:
        entry = LLMLogEntry(
            timestamp=time.time(),
            level="error" if error else "info",
            trace_id=trace_id,
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=prompt_tokens + completion_tokens,
            latency_ms=latency_ms,
            prompt_preview=prompt[:100],
            completion_preview=completion[:100],
            cost_usd=cost_usd,
            quality_score=quality_score,
            error=error
        )
        self.logger.info(json.dumps(asdict(entry)))
        return json.dumps(asdict(entry))

    def log_batch_call(self, trace_id: str, model: str, batch_size: int,
                       total_tokens: int, total_cost: float, avg_latency: float):
        entry = {
            "timestamp": time.time(),
            "level": "info",
            "trace_id": trace_id,
            "event": "batch_completion",
            "model": model,
            "batch_size": batch_size,
            "total_tokens": total_tokens,
            "total_cost_usd": total_cost,
            "avg_latency_ms": avg_latency
        }
        self.logger.info(json.dumps(entry))

3. Token Usage Tracking

Token economics directly impact operational costs. Tracking enables optimization and budget forecasting.

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json

@dataclass
class TokenUsageRecord:
    timestamp: datetime
    model: str
    prompt_tokens: int
    completion_tokens: int
    cost_per_1k_input: float
    cost_per_1k_output: float

    @property
    def total_tokens(self) -> int:
        return self.prompt_tokens + self.completion_tokens

    @property
    def cost_usd(self) -> float:
        return (self.prompt_tokens * self.cost_per_1k_input / 1000 +
                self.completion_tokens * self.cost_per_1k_output / 1000)

class TokenTracker:
    MODEL_PRICING = {
        "gpt-4": {"input": 0.03, "output": 0.06},
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},
        "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        "claude-3-opus": {"input": 0.015, "output": 0.075},
        "claude-3-sonnet": {"input": 0.003, "output": 0.015},
        "llama-3-70b": {"input": 0.0009, "output": 0.0009},
    }

    def __init__(self):
        self.records: List[TokenUsageRecord] = []

    def record_usage(self, model: str, prompt_tokens: int, completion_tokens: int) -> TokenUsageRecord:
        pricing = self.MODEL_PRICING.get(model, {"input": 0.001, "output": 0.001})
        record = TokenUsageRecord(
            timestamp=datetime.utcnow(),
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            cost_per_1k_input=pricing["input"],
            cost_per_1k_output=pricing["output"]
        )
        self.records.append(record)
        return record

    def get_daily_cost(self, date: Optional[datetime] = None) -> float:
        target_date = date or datetime.utcnow().date()
        return sum(
            r.cost_usd for r in self.records
            if r.timestamp.date() == target_date
        )

    def get_hourly_breakdown(self, hours: int = 24) -> Dict[str, float]:
        cutoff = datetime.utcnow() - timedelta(hours=hours)
        breakdown: Dict[str, float] = {}
        for record in self.records:
            if record.timestamp >= cutoff:
                hour_key = record.timestamp.strftime("%Y-%m-%d %H:00")
                breakdown[hour_key] = breakdown.get(hour_key, 0) + record.cost_usd
        return breakdown

    def get_model_breakdown(self) -> Dict[str, Dict]:
        breakdown: Dict[str, Dict] = {}
        for record in self.records:
            model = record.model
            if model not in breakdown:
                breakdown[model] = {"tokens": 0, "cost": 0, "calls": 0}
            breakdown[model]["tokens"] += record.total_tokens
            breakdown[model]["cost"] += record.cost_usd
            breakdown[model]["calls"] += 1
        return breakdown

    def forecast_monthly_cost(self) -> float:
        if not self.records:
            return 0.0
        daily_costs = {}
        for record in self.records:
            day = record.timestamp.date()
            daily_costs[day] = daily_costs.get(day, 0) + record.cost_usd
        if not daily_costs:
            return 0.0
        avg_daily = sum(daily_costs.values()) / len(daily_costs)
        return avg_daily * 30

Key Formulas

Token Cost Calculation

C=Tin1000β‹…Pin+Tout1000β‹…PoutC = \frac{T_{in}}{1000} \cdot P_{in} + \frac{T_{out}}{1000} \cdot P_{out}

Here,

  • CC=Total cost in USD
  • TinT_{in}=Number of input tokens
  • PinP_{in}=Price per 1,000 input tokens
  • ToutT_{out}=Number of output tokens
  • PoutP_{out}=Price per 1,000 output tokens

Latency Percentile

pk=sorted(L)[⌊kβ‹…nβŒ‹]p_{k} = \text{sorted}(L)[\lfloor k \cdot n \rfloor]

Here,

  • pkp_{k}=k-th percentile latency
  • LL=List of observed latencies
  • nn=Total number of observations

Throughput (tokens/sec)

Throughput=TtotalΞ”t\text{Throughput} = \frac{T_{total}}{\Delta t}

Here,

  • TtotalT_{total}=Total tokens processed
  • Ξ”t\Delta t=Time window in seconds

Observability Comparison

AspectTraditional AppsLLM ApplicationsKey Difference
DeterminismSame input β†’ same outputSame input β†’ different outputsNon-determinism
Cost ModelFixed compute costPer-token variable costToken economics
LatencyMillisecondsSeconds to minutesLong inference
Quality MetricCorrectnessRelevance, coherenceSubjective
Failure ModeCrashes, errorsHallucination, driftSubtle failures

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

class OpenTelemetryLLMObservability:
    def __init__(self, service_name: str, endpoint: str = "http://localhost:4317"):
        provider = TracerProvider(resource=Resource({"service.name": service_name}))
        exporter = OTLPSpanExporter(endpoint=endpoint)
        provider.add_span_processor(BatchSpanProcessor(exporter))
        trace.set_tracer_provider(provider)
        self.tracer = trace.get_tracer(service_name)

    def trace_llm_call(self, model: str, prompt: str, max_tokens: int):
        with self.tracer.start_as_current_span("llm.inference") as span:
            span.set_attribute("llm.model", model)
            span.set_attribute("llm.prompt.length", len(prompt))
            span.set_attribute("llm.max_tokens", max_tokens)
            start = time.time()
            response = call_llm(model, prompt, max_tokens)
            latency = (time.time() - start) * 1000
            span.set_attribute("llm.completion.tokens", response.usage.completion_tokens)
            span.set_attribute("llm.prompt.tokens", response.usage.prompt_tokens)
            span.set_attribute("llm.latency_ms", latency)
            span.add_event("llm.completed", {
                "tokens": response.usage.total_tokens,
                "cost_usd": calculate_cost(model, response.usage)
            })
            return response

Best Practices

  1. Always trace: Attach trace IDs to every LLM call for end-to-end visibility
  2. Log inputs/outputs: Store prompt-completion pairs for quality analysis
  3. Track costs: Monitor token usage per model, per user, per feature
  4. Set alerts: Trigger alerts on latency spikes and cost anomalies
  5. Use sampling: In high-volume systems, sample traces to reduce storage overhead
⭐

Premium Content

LLM Observability: Tracing, Logging, and Token Usage

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement