Building production AI systems requires comprehensive observability to understand performance, costs, errors, and user experience. Unlike traditional applications, AI systems have unique monitoring requirements: token usage, model latency distribution, cost attribution per feature, and quality metrics like hallucination rates. This guide provides a complete framework for observability in AI/LLM applications.
Why AI Systems Need Special Observability
Unique Challenges
- Non-deterministic outputs: Same input may produce different results
- Cost variability: Token counts vary, affecting per-request costs
- Latency unpredictability: Model response times vary significantly
- Quality degradation: Model updates or prompt changes affect output quality
- External dependencies: API rate limits, model availability, provider changes
- Complex error modes: Hallucinations, refusals, partial responses
Key Metrics to Track
- Request latency (P50, P95, P99 percentiles)
- Token usage (input/output tokens per request)
- Cost per request and per feature
- Error rates (API failures, timeouts, content policy violations)
- Model performance (quality scores, user feedback)
- Cache hit rates (if using semantic caching)
- Throughput (requests per second)
- Concurrent requests and queue depth
The Three Pillars: Logs, Metrics, and Traces
1. Structured Logging
Capture detailed information about each LLM interaction:
- Request ID and user ID (for debugging and cost attribution)
- Model name and version (e.g., gpt-5, claude-opus-4.1)
- Prompt (sanitized if contains PII)
- Response (truncated or hashed if sensitive)
- Token counts (input and output)
- Latency breakdown (prompt encoding, model inference, streaming)
- Error details (if failed)
- Metadata (feature, team, environment)
2. Time-Series Metrics
Aggregate data for dashboards and alerting:
- Counters: Total requests, total errors, total tokens used
- Gauges: Active connections, queue depth, cache size
- Histograms: Latency distribution, token count distribution
- Summary: Percentiles (P50, P95, P99) for latency and cost
3. Distributed Tracing
Track requests across multiple services:
- API Gateway → RAG retrieval → Embedding generation → Vector search → LLM call → Response streaming
- Identify bottlenecks in multi-step AI workflows
- Debug failures in complex agent systems
- Understand cascading latency effects
- Correlate logs and metrics with specific traces
Implementation Architecture
Technology Stack
- Instrumentation: OpenTelemetry (OTEL) for unified logs/metrics/traces
- Metrics storage: Prometheus for time-series data
- Log storage: Loki or Elasticsearch for structured logs
- Tracing backend: Jaeger or Tempo for distributed traces
- Visualization: Grafana for dashboards and alerts
- Cost attribution: Custom exporters to data warehouse
Data Flow
- 1. Application instruments calls with OpenTelemetry SDK
- 2. OTEL Collector aggregates and routes telemetry data
- 3. Prometheus scrapes metrics from OTEL Collector
- 4. Loki ingests logs from OTEL Collector
- 5. Jaeger receives traces from OTEL Collector
- 6. Grafana queries all three for unified view
- 7. Alertmanager triggers alerts based on Prometheus rules
Code Example 1: Instrumenting LLM Calls with OpenTelemetry
Complete instrumentation for OpenAI API calls with logs, metrics, and traces:
from opentelemetry import trace, metrics
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import openai
import time
import logging
import json
# Configure OpenTelemetry
resource = Resource.create({
"service.name": "ai-app",
"service.version": "1.0.0",
"deployment.environment": "production"
})
# Setup tracing
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
# Setup OTLP exporter for traces (sends to Jaeger/Tempo)
otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Setup metrics
metric_reader = PrometheusMetricReader()
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter(__name__)
# Define metrics
llm_request_counter = meter.create_counter(
name="llm_requests_total",
description="Total LLM requests",
unit="1"
)
llm_token_counter = meter.create_counter(
name="llm_tokens_total",
description="Total tokens used",
unit="tokens"
)
llm_cost_counter = meter.create_counter(
name="llm_cost_total",
description="Total LLM costs in USD",
unit="USD"
)
llm_latency_histogram = meter.create_histogram(
name="llm_request_duration_seconds",
description="LLM request latency",
unit="s"
)
llm_error_counter = meter.create_counter(
name="llm_errors_total",
description="Total LLM errors",
unit="1"
)
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ObservableOpenAIClient:
"""OpenAI client with full observability instrumentation."""
# Model pricing (November 2025, per 1M tokens)
PRICING = {
"gpt-5": {"input": 2.50, "output": 10.00},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60}
}
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)
def chat_completion(self,
model: str,
messages: list,
user_id: str = None,
feature: str = "default",
**kwargs):
"""Chat completion with comprehensive instrumentation."""
# Start span for distributed tracing
with tracer.start_as_current_span(
"llm.chat_completion",
attributes={
"llm.model": model,
"llm.provider": "openai",
"user.id": user_id,
"feature": feature
}
) as span:
start_time = time.time()
try:
# Make API call
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
# Calculate metrics
latency = time.time() - start_time
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
# Calculate cost
cost = self._calculate_cost(model, input_tokens, output_tokens)
# Record metrics with labels
metric_attrs = {
"model": model,
"feature": feature,
"status": "success"
}
llm_request_counter.add(1, metric_attrs)
llm_token_counter.add(total_tokens, {**metric_attrs, "type": "total"})
llm_token_counter.add(input_tokens, {**metric_attrs, "type": "input"})
llm_token_counter.add(output_tokens, {**metric_attrs, "type": "output"})
llm_cost_counter.add(cost, metric_attrs)
llm_latency_histogram.record(latency, metric_attrs)
# Add span attributes
span.set_attributes({
"llm.input_tokens": input_tokens,
"llm.output_tokens": output_tokens,
"llm.total_tokens": total_tokens,
"llm.cost_usd": cost,
"llm.latency_seconds": latency,
"llm.finish_reason": response.choices[0].finish_reason
})
# Structured logging
logger.info(
"LLM request completed",
extra={
"request_id": span.get_span_context().span_id,
"user_id": user_id,
"model": model,
"feature": feature,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"cost_usd": cost,
"latency_seconds": latency,
"finish_reason": response.choices[0].finish_reason,
"prompt_preview": messages[0]["content"][:100] if messages else ""
}
)
return response
except openai.APIError as e:
# Record error metrics
error_attrs = {
"model": model,
"feature": feature,
"error_type": type(e).__name__,
"status": "error"
}
llm_request_counter.add(1, error_attrs)
llm_error_counter.add(1, error_attrs)
llm_latency_histogram.record(time.time() - start_time, error_attrs)
# Add error to span
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
# Log error
logger.error(
f"LLM request failed: {e}",
extra={
"user_id": user_id,
"model": model,
"feature": feature,
"error_type": type(e).__name__,
"error_message": str(e)
},
exc_info=True
)
raise
def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate request cost based on token usage and model pricing."""
if model not in self.PRICING:
return 0.0
pricing = self.PRICING[model]
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return round(input_cost + output_cost, 6)
# Example usage
if __name__ == "__main__":
import os
client = ObservableOpenAIClient(api_key=os.environ["OPENAI_API_KEY"])
response = client.chat_completion(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain observability in 2 sentences."}
],
user_id="user_123",
feature="chat_assistant",
temperature=0.7,
max_tokens=100
)
print(response.choices[0].message.content)
Code Example 2: Cost Attribution System
Track costs per feature, team, and user for chargeback and budgeting:
from datetime import datetime, timedelta
from collections import defaultdict
import psycopg2
from typing import Dict, List
class CostAttributionSystem:
"""Track and attribute LLM costs across dimensions."""
def __init__(self, db_connection_string: str):
self.conn = psycopg2.connect(db_connection_string)
self._init_schema()
def _init_schema(self):
"""Create cost tracking tables."""
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS llm_usage (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
request_id VARCHAR(255),
user_id VARCHAR(255),
team_id VARCHAR(255),
feature VARCHAR(255),
model VARCHAR(255),
input_tokens INTEGER,
output_tokens INTEGER,
total_tokens INTEGER,
cost_usd DECIMAL(10, 6),
latency_ms INTEGER,
status VARCHAR(50),
error_type VARCHAR(255)
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON llm_usage(timestamp);
CREATE INDEX IF NOT EXISTS idx_user_id ON llm_usage(user_id);
CREATE INDEX IF NOT EXISTS idx_team_id ON llm_usage(team_id);
CREATE INDEX IF NOT EXISTS idx_feature ON llm_usage(feature);
CREATE INDEX IF NOT EXISTS idx_model ON llm_usage(model);
""")
self.conn.commit()
def record_usage(self,
request_id: str,
user_id: str,
team_id: str,
feature: str,
model: str,
input_tokens: int,
output_tokens: int,
cost_usd: float,
latency_ms: int,
status: str = "success",
error_type: str = None):
"""Record a single LLM request for cost attribution."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO llm_usage (
timestamp, request_id, user_id, team_id, feature,
model, input_tokens, output_tokens, total_tokens,
cost_usd, latency_ms, status, error_type
) VALUES (
NOW(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
""", (
request_id, user_id, team_id, feature, model,
input_tokens, output_tokens, input_tokens + output_tokens,
cost_usd, latency_ms, status, error_type
))
self.conn.commit()
def get_costs_by_feature(self,
start_date: datetime,
end_date: datetime) -> List[Dict]:
"""Get total costs grouped by feature."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
feature,
COUNT(*) as request_count,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost_usd,
AVG(latency_ms) as avg_latency_ms,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
FROM llm_usage
WHERE timestamp BETWEEN %s AND %s
GROUP BY feature
ORDER BY total_cost_usd DESC
""", (start_date, end_date))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def get_costs_by_team(self,
start_date: datetime,
end_date: datetime) -> List[Dict]:
"""Get total costs grouped by team for chargeback."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
team_id,
COUNT(*) as request_count,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost_usd,
COUNT(DISTINCT user_id) as unique_users,
COUNT(DISTINCT feature) as features_used
FROM llm_usage
WHERE timestamp BETWEEN %s AND %s
GROUP BY team_id
ORDER BY total_cost_usd DESC
""", (start_date, end_date))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def get_costs_by_user(self,
start_date: datetime,
end_date: datetime,
limit: int = 100) -> List[Dict]:
"""Get top users by cost."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
user_id,
team_id,
COUNT(*) as request_count,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost_usd,
AVG(latency_ms) as avg_latency_ms
FROM llm_usage
WHERE timestamp BETWEEN %s AND %s
GROUP BY user_id, team_id
ORDER BY total_cost_usd DESC
LIMIT %s
""", (start_date, end_date, limit))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def get_daily_costs(self,
start_date: datetime,
end_date: datetime) -> List[Dict]:
"""Get daily cost trends."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
DATE(timestamp) as date,
COUNT(*) as request_count,
SUM(total_tokens) as total_tokens,
SUM(cost_usd) as total_cost_usd,
AVG(latency_ms) as avg_latency_ms,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as error_count
FROM llm_usage
WHERE timestamp BETWEEN %s AND %s
GROUP BY DATE(timestamp)
ORDER BY date
""", (start_date, end_date))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
def detect_anomalies(self,
threshold_multiplier: float = 3.0) -> List[Dict]:
"""Detect cost anomalies (unusually high usage)."""
with self.conn.cursor() as cur:
# Get average and stddev for last 7 days
cur.execute("""
WITH daily_stats AS (
SELECT
DATE(timestamp) as date,
feature,
SUM(cost_usd) as daily_cost
FROM llm_usage
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY DATE(timestamp), feature
),
feature_baseline AS (
SELECT
feature,
AVG(daily_cost) as avg_daily_cost,
STDDEV(daily_cost) as stddev_daily_cost
FROM daily_stats
GROUP BY feature
),
today_costs AS (
SELECT
feature,
SUM(cost_usd) as today_cost
FROM llm_usage
WHERE DATE(timestamp) = CURRENT_DATE
GROUP BY feature
)
SELECT
t.feature,
t.today_cost,
b.avg_daily_cost,
b.stddev_daily_cost,
(t.today_cost - b.avg_daily_cost) / NULLIF(b.stddev_daily_cost, 0) as z_score
FROM today_costs t
JOIN feature_baseline b ON t.feature = b.feature
WHERE t.today_cost > b.avg_daily_cost + (%s * b.stddev_daily_cost)
ORDER BY z_score DESC
""", (threshold_multiplier,))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
# Example usage
if __name__ == "__main__":
cost_system = CostAttributionSystem(
db_connection_string="postgresql://user:pass@localhost/aiapp"
)
# Record usage (typically done by Observable client)
cost_system.record_usage(
request_id="req_abc123",
user_id="user_456",
team_id="team_engineering",
feature="code_review",
model="gpt-4o",
input_tokens=1500,
output_tokens=800,
cost_usd=0.0235,
latency_ms=2300,
status="success"
)
# Generate cost reports
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
# Costs by feature
feature_costs = cost_system.get_costs_by_feature(start_date, end_date)
print("\nCosts by Feature:")
for item in feature_costs[:5]:
print(f" {item['feature']}: ${item['total_cost_usd']:.2f} "
f"({item['request_count']} requests)")
# Costs by team (for chargeback)
team_costs = cost_system.get_costs_by_team(start_date, end_date)
print("\nCosts by Team:")
for item in team_costs:
print(f" {item['team_id']}: ${item['total_cost_usd']:.2f} "
f"({item['unique_users']} users)")
# Detect anomalies
anomalies = cost_system.detect_anomalies(threshold_multiplier=2.5)
if anomalies:
print("\nCost Anomalies Detected:")
for anomaly in anomalies:
print(f" {anomaly['feature']}: ${anomaly['today_cost']:.2f} "
f"(avg: ${anomaly['avg_daily_cost']:.2f}, "
f"z-score: {anomaly['z_score']:.1f})")
Grafana Dashboard Configuration
Key Panels to Include
- Request rate (requests/second) - Gauge with sparkline
- P50/P95/P99 latency - Line graph over time
- Error rate percentage - Stat panel with threshold coloring
- Cost per hour/day - Bar chart grouped by feature
- Token usage - Stacked area chart (input vs output)
- Model distribution - Pie chart showing usage by model
- Top users by cost - Table with sortable columns
- Cache hit rate - Gauge (if using semantic caching)
Sample Prometheus Queries
# Request rate per second
rate(llm_requests_total[5m])
# P95 latency by model
histogram_quantile(0.95,
rate(llm_request_duration_seconds_bucket[5m])
)
# Error rate percentage
(
rate(llm_errors_total[5m]) /
rate(llm_requests_total[5m])
) * 100
# Cost per hour by feature
rate(llm_cost_total[1h]) * 3600
# Token usage by type
sum by (type) (
rate(llm_tokens_total[5m])
)
# Requests by model
sum by (model) (
rate(llm_requests_total[5m])
)
Alerting Rules
Critical Alerts
- High error rate (>5% for 5 minutes)
- P95 latency degradation (>5 seconds for 10 minutes)
- Cost spike (>200% of daily average)
- API rate limit approaching (>80% of quota)
- Service unavailable (0 successful requests for 2 minutes)
Warning Alerts
- Elevated latency (P95 >3 seconds for 15 minutes)
- Increased error rate (>2% for 15 minutes)
- Cost trending above budget (on track to exceed monthly limit)
- Token usage spike (>150% of hourly average)
- Model performance degradation (based on quality scores)
Example Alertmanager Configuration
groups:
- name: llm_alerts
interval: 30s
rules:
# High error rate
- alert: HighLLMErrorRate
expr: |
(
rate(llm_errors_total[5m]) /
rate(llm_requests_total[5m])
) > 0.05
for: 5m
labels:
severity: critical
component: llm
annotations:
summary: "High LLM error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} (threshold: 5%)"
# High latency
- alert: HighLLMLatency
expr: |
histogram_quantile(0.95,
rate(llm_request_duration_seconds_bucket[5m])
) > 5
for: 10m
labels:
severity: warning
component: llm
annotations:
summary: "High LLM P95 latency"
description: "P95 latency is {{ $value }}s (threshold: 5s)"
# Cost spike
- alert: LLMCostSpike
expr: |
(
rate(llm_cost_total[1h]) * 3600 * 24
) > (
avg_over_time(llm_cost_total[7d]) * 2
)
for: 30m
labels:
severity: warning
component: llm
annotations:
summary: "LLM cost spike detected"
description: "Daily cost projection is 2x the 7-day average"
# Service down
- alert: LLMServiceDown
expr: |
sum(rate(llm_requests_total{status="success"}[5m])) == 0
for: 2m
labels:
severity: critical
component: llm
annotations:
summary: "LLM service appears down"
description: "No successful requests in the last 2 minutes"
Best Practices
Data Retention
- Metrics: 15-30 days in Prometheus (high resolution)
- Metrics: 1+ year in long-term storage (downsampled)
- Logs: 7-14 days in hot storage (Loki/Elasticsearch)
- Logs: 90+ days in cold storage (S3/GCS) for compliance
- Traces: 7 days in Jaeger (sampled)
- Cost data: Indefinite in data warehouse
Privacy Considerations
- Sanitize prompts before logging (remove PII)
- Hash or truncate sensitive responses
- Implement log redaction for regulated industries
- Separate user IDs from request content
- Comply with data retention policies (GDPR Article 5)
- Implement right to deletion for user data
Performance Impact
- Instrumentation overhead: <5ms per request
- Use async exporters to avoid blocking
- Batch telemetry data before sending
- Sample traces (e.g., 10%) for high-volume systems
- Use separate OTEL Collector to reduce app load
- Monitor observability system itself
Advanced Topics
Quality Monitoring
Beyond technical metrics, track output quality:
- User feedback (thumbs up/down)
- Hallucination detection scores
- Content policy violations
- Refusal rates (model refusing to answer)
- Semantic similarity to expected outputs
- A/B test results (model A vs model B)
Semantic Caching Metrics
- Cache hit rate by similarity threshold
- Cost savings from cache hits
- Latency reduction from cache
- Cache size and eviction rate
- Cache consistency (false positives)
Multi-Model Tracking
For systems using multiple providers (OpenAI, Anthropic, Google):
- Normalize metrics across providers
- Track provider-specific errors separately
- Monitor fallback frequency
- Compare cost-effectiveness by provider
- Detect provider-specific performance patterns
Troubleshooting Common Issues
High Latency
- Check P95/P99 vs P50 - large gap indicates tail latency issues
- Use traces to identify bottlenecks (retrieval vs inference)
- Monitor concurrent requests - queuing causes latency
- Check model selection - larger models are slower
- Verify network latency to API providers
- Review prompt length - longer prompts increase latency
Cost Overruns
- Identify top cost features/users from attribution data
- Check for retry loops causing duplicate requests
- Review token usage - unnecessarily long prompts/responses
- Audit model selection - using expensive models unnecessarily
- Implement rate limiting per user/feature
- Add caching to reduce redundant calls
Quality Degradation
- Compare current performance to baseline metrics
- Check for model version changes (provider updates)
- Review recent prompt template changes
- Analyze user feedback trends
- Check for increased refusal rates
- Validate embedding model consistency (for RAG)
Production Checklist
- ✓ All LLM calls instrumented with OpenTelemetry
- ✓ Metrics exported to Prometheus and scraped regularly
- ✓ Structured logs sent to centralized system (Loki/ES)
- ✓ Distributed tracing enabled for complex workflows
- ✓ Grafana dashboards created for key metrics
- ✓ Alert rules configured with appropriate thresholds
- ✓ On-call runbooks document common issues
- ✓ Cost attribution tracks usage by feature/team/user
- ✓ Anomaly detection identifies unusual patterns
- ✓ PII redaction implemented for prompt logging
- ✓ Data retention policies meet compliance requirements
- ✓ Observability system itself is monitored
Conclusion
Comprehensive observability is non-negotiable for production AI systems. The unique characteristics of LLM applications - non-determinism, variable costs, external dependencies, and quality concerns - require specialized instrumentation beyond traditional application monitoring. By implementing the patterns in this guide, you'll gain the visibility needed to operate AI systems reliably at scale, control costs, and maintain quality for your users.