LLM Observability: Tracing, Logging, and Token Usage
Observability is the cornerstone of reliable LLM operations. Unlike traditional software, LLM systems exhibit non-deterministic behavior that requires specialized monitoring approaches spanning token economics, latency profiling, and quality assessment.
Observability Pipeline
Three Pillars of LLM Observability
1. Distributed Tracing
Tracing captures the complete lifecycle of an LLM request across services. Each span represents a discrete operation such as prompt construction, model inference, or post-processing.
import uuid
import time
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
@dataclass
class Span:
span_id: str
trace_id: str
name: str
start_time: float
end_time: Optional[float] = None
parent_id: Optional[str] = None
attributes: Dict[str, Any] = field(default_factory=dict)
events: List[Dict[str, Any]] = field(default_factory=list)
@property
def duration_ms(self) -> float:
if self.end_time is None:
return 0.0
return (self.end_time - self.start_time) * 1000
def add_event(self, name: str, attributes: Optional[Dict] = None):
self.events.append({
"name": name,
"timestamp": time.time(),
"attributes": attributes or {}
})
@dataclass
class Trace:
trace_id: str
spans: List[Span] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
class LLMTracer:
def __init__(self):
self.traces: Dict[str, Trace] = {}
self._current_span_id: Optional[str] = None
def start_trace(self, name: str, metadata: Optional[Dict] = None) -> str:
trace_id = str(uuid.uuid4())
root_span = Span(
span_id=str(uuid.uuid4()),
trace_id=trace_id,
name=name,
start_time=time.time()
)
trace = Trace(trace_id=trace_id, metadata=metadata or {})
trace.spans.append(root_span)
self.traces[trace_id] = trace
self._current_span_id = root_span.span_id
return trace_id
@contextmanager
def span(self, name: str, attributes: Optional[Dict] = None):
trace_id = self._current_trace_id()
span = Span(
span_id=str(uuid.uuid4()),
trace_id=trace_id,
name=name,
start_time=time.time(),
parent_id=self._current_span_id,
attributes=attributes or {}
)
self.traces[trace_id].spans.append(span)
prev_span_id = self._current_span_id
self._current_span_id = span.span_id
try:
yield span
except Exception as e:
span.add_event("error", {"message": str(e)})
raise
finally:
span.end_time = time.time()
self._current_span_id = prev_span_id
def _current_trace_id(self) -> str:
for trace_id, trace in self.traces.items():
for span in trace.spans:
if span.span_id == self._current_span_id:
return trace_id
raise ValueError("No active trace")
def get_trace(self, trace_id: str) -> Optional[Trace]:
return self.traces.get(trace_id)
def export_trace(self, trace_id: str) -> Dict[str, Any]:
trace = self.traces.get(trace_id)
if not trace:
return {}
return {
"trace_id": trace.trace_id,
"metadata": trace.metadata,
"spans": [
{
"span_id": s.span_id,
"name": s.name,
"duration_ms": s.duration_ms,
"parent_id": s.parent_id,
"attributes": s.attributes,
"events": s.events
}
for s in trace.spans
]
}
2. Structured Logging
Structured logs provide machine-readable records of LLM operations. Each log entry captures input/output pairs, token counts, latency, and quality indicators.
import json
import time
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class LogLevel(Enum):
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
@dataclass
class LLMLogEntry:
timestamp: float
level: str
trace_id: str
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
latency_ms: float
prompt_preview: str
completion_preview: str
cost_usd: float
quality_score: Optional[float] = None
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class LLMLogger:
def __init__(self, service_name: str, log_level: str = "INFO"):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
self.logger.setLevel(getattr(logging, log_level))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
def log_llm_call(self, trace_id: str, model: str, prompt_tokens: int,
completion_tokens: int, latency_ms: float,
prompt: str, completion: str, cost_usd: float,
quality_score: Optional[float] = None,
error: Optional[str] = None) -> str:
entry = LLMLogEntry(
timestamp=time.time(),
level="error" if error else "info",
trace_id=trace_id,
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
latency_ms=latency_ms,
prompt_preview=prompt[:100],
completion_preview=completion[:100],
cost_usd=cost_usd,
quality_score=quality_score,
error=error
)
self.logger.info(json.dumps(asdict(entry)))
return json.dumps(asdict(entry))
def log_batch_call(self, trace_id: str, model: str, batch_size: int,
total_tokens: int, total_cost: float, avg_latency: float):
entry = {
"timestamp": time.time(),
"level": "info",
"trace_id": trace_id,
"event": "batch_completion",
"model": model,
"batch_size": batch_size,
"total_tokens": total_tokens,
"total_cost_usd": total_cost,
"avg_latency_ms": avg_latency
}
self.logger.info(json.dumps(entry))
3. Token Usage Tracking
Token economics directly impact operational costs. Tracking enables optimization and budget forecasting.
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
@dataclass
class TokenUsageRecord:
timestamp: datetime
model: str
prompt_tokens: int
completion_tokens: int
cost_per_1k_input: float
cost_per_1k_output: float
@property
def total_tokens(self) -> int:
return self.prompt_tokens + self.completion_tokens
@property
def cost_usd(self) -> float:
return (self.prompt_tokens * self.cost_per_1k_input / 1000 +
self.completion_tokens * self.cost_per_1k_output / 1000)
class TokenTracker:
MODEL_PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
"llama-3-70b": {"input": 0.0009, "output": 0.0009},
}
def __init__(self):
self.records: List[TokenUsageRecord] = []
def record_usage(self, model: str, prompt_tokens: int, completion_tokens: int) -> TokenUsageRecord:
pricing = self.MODEL_PRICING.get(model, {"input": 0.001, "output": 0.001})
record = TokenUsageRecord(
timestamp=datetime.utcnow(),
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cost_per_1k_input=pricing["input"],
cost_per_1k_output=pricing["output"]
)
self.records.append(record)
return record
def get_daily_cost(self, date: Optional[datetime] = None) -> float:
target_date = date or datetime.utcnow().date()
return sum(
r.cost_usd for r in self.records
if r.timestamp.date() == target_date
)
def get_hourly_breakdown(self, hours: int = 24) -> Dict[str, float]:
cutoff = datetime.utcnow() - timedelta(hours=hours)
breakdown: Dict[str, float] = {}
for record in self.records:
if record.timestamp >= cutoff:
hour_key = record.timestamp.strftime("%Y-%m-%d %H:00")
breakdown[hour_key] = breakdown.get(hour_key, 0) + record.cost_usd
return breakdown
def get_model_breakdown(self) -> Dict[str, Dict]:
breakdown: Dict[str, Dict] = {}
for record in self.records:
model = record.model
if model not in breakdown:
breakdown[model] = {"tokens": 0, "cost": 0, "calls": 0}
breakdown[model]["tokens"] += record.total_tokens
breakdown[model]["cost"] += record.cost_usd
breakdown[model]["calls"] += 1
return breakdown
def forecast_monthly_cost(self) -> float:
if not self.records:
return 0.0
daily_costs = {}
for record in self.records:
day = record.timestamp.date()
daily_costs[day] = daily_costs.get(day, 0) + record.cost_usd
if not daily_costs:
return 0.0
avg_daily = sum(daily_costs.values()) / len(daily_costs)
return avg_daily * 30
Key Formulas
Token Cost Calculation
Here,
- =Total cost in USD
- =Number of input tokens
- =Price per 1,000 input tokens
- =Number of output tokens
- =Price per 1,000 output tokens
Latency Percentile
Here,
- =k-th percentile latency
- =List of observed latencies
- =Total number of observations
Throughput (tokens/sec)
Here,
- =Total tokens processed
- =Time window in seconds
Observability Comparison
| Aspect | Traditional Apps | LLM Applications | Key Difference |
|---|---|---|---|
| Determinism | Same input β same output | Same input β different outputs | Non-determinism |
| Cost Model | Fixed compute cost | Per-token variable cost | Token economics |
| Latency | Milliseconds | Seconds to minutes | Long inference |
| Quality Metric | Correctness | Relevance, coherence | Subjective |
| Failure Mode | Crashes, errors | Hallucination, drift | Subtle failures |
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
class OpenTelemetryLLMObservability:
def __init__(self, service_name: str, endpoint: str = "http://localhost:4317"):
provider = TracerProvider(resource=Resource({"service.name": service_name}))
exporter = OTLPSpanExporter(endpoint=endpoint)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
self.tracer = trace.get_tracer(service_name)
def trace_llm_call(self, model: str, prompt: str, max_tokens: int):
with self.tracer.start_as_current_span("llm.inference") as span:
span.set_attribute("llm.model", model)
span.set_attribute("llm.prompt.length", len(prompt))
span.set_attribute("llm.max_tokens", max_tokens)
start = time.time()
response = call_llm(model, prompt, max_tokens)
latency = (time.time() - start) * 1000
span.set_attribute("llm.completion.tokens", response.usage.completion_tokens)
span.set_attribute("llm.prompt.tokens", response.usage.prompt_tokens)
span.set_attribute("llm.latency_ms", latency)
span.add_event("llm.completed", {
"tokens": response.usage.total_tokens,
"cost_usd": calculate_cost(model, response.usage)
})
return response
Best Practices
- Always trace: Attach trace IDs to every LLM call for end-to-end visibility
- Log inputs/outputs: Store prompt-completion pairs for quality analysis
- Track costs: Monitor token usage per model, per user, per feature
- Set alerts: Trigger alerts on latency spikes and cost anomalies
- Use sampling: In high-volume systems, sample traces to reduce storage overhead