Latency Optimization for LLM Applications: Batching, Caching & Edge Deployment

AI Engineering

Comprehensive guide to reducing latency in AI applications. Learn batching strategies, semantic caching with Redis, edge deployment, prompt compression, streaming responses, and model selection for sub-second response times.

Latency Optimization for LLM Applications: Batching, Caching & Edge Deployment

Latency is critical for AI application success. Research shows that 100ms of additional latency can reduce conversions by 1%, and users perceive responses under 500ms as instant. LLM applications face unique latency challenges: variable inference times (200ms to 5+ seconds), external API dependencies, and multi-step retrieval workflows. This guide provides production-tested strategies to achieve sub-second response times.

Understanding LLM Latency Components

Latency Breakdown for Typical Request

  • Network RTT to API provider: 50-200ms (varies by geography)
  • Prompt encoding and tokenization: 10-50ms
  • Model inference time: 200ms-5s (depends on model, tokens, load)
  • Response streaming overhead: 0-100ms
  • RAG retrieval (if applicable): 100-500ms (embedding + vector search)
  • Post-processing: 10-50ms

Performance Targets

  • Chat applications: P95 < 1 second (perceived as instant)
  • Search/Q&A: P95 < 2 seconds (acceptable)
  • Background processing: < 10 seconds (batch jobs)
  • Streaming time-to-first-token: < 500ms (keeps user engaged)
  • RAG retrieval: < 300ms (doesn't dominate total latency)

Strategy 1: Semantic Caching

Cache LLM responses based on semantic similarity of queries. Exact cache hits are rare, but semantically similar queries can reuse responses.

Benefits

  • Latency reduction: 95%+ (cache lookup vs LLM call)
  • Cost savings: 100% for cache hits
  • Improved reliability: No external API dependency
  • Better user experience: Sub-100ms responses
python
import redis
import openai
import hashlib
import json
from typing import Optional, Dict, Any
import numpy as np
from openai import OpenAI


class SemanticCacheLLM:
    """LLM client with semantic caching using Redis and embeddings."""
    
    def __init__(self,
                 openai_api_key: str,
                 redis_host: str = "localhost",
                 redis_port: int = 6379,
                 similarity_threshold: float = 0.95,
                 ttl_seconds: int = 3600):
        """
        Args:
            similarity_threshold: Cosine similarity threshold for cache hit (0-1)
            ttl_seconds: Cache entry TTL (3600 = 1 hour)
        """
        self.client = OpenAI(api_key=openai_api_key)
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            decode_responses=False  # Store binary data
        )
        self.similarity_threshold = similarity_threshold
        self.ttl_seconds = ttl_seconds
        
        # Cache hit/miss stats
        self.cache_hits = 0
        self.cache_misses = 0
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """Generate embedding for semantic similarity."""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",  # Fast, cheap embeddings
            input=text
        )
        return np.array(response.data[0].embedding)
    
    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate cosine similarity between two vectors."""
        return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))
    
    def _search_cache(self, query_embedding: np.ndarray) -> Optional[Dict[str, Any]]:
        """Search cache for semantically similar queries."""
        # Get all cache keys (in production, use Redis search or separate index)
        cache_keys = self.redis_client.keys("cache:*")
        
        best_match = None
        best_similarity = 0.0
        
        for key in cache_keys:
            cached_data = self.redis_client.get(key)
            if not cached_data:
                continue
            
            try:
                cache_entry = json.loads(cached_data)
                cached_embedding = np.array(cache_entry["embedding"])
                
                similarity = self._cosine_similarity(query_embedding, cached_embedding)
                
                if similarity > best_similarity and similarity >= self.similarity_threshold:
                    best_similarity = similarity
                    best_match = {
                        "response": cache_entry["response"],
                        "similarity": similarity,
                        "original_query": cache_entry["query"]
                    }
            except (json.JSONDecodeError, KeyError):
                continue
        
        return best_match
    
    def chat_completion(self,
                       messages: list,
                       model: str = "gpt-4o-mini",
                       use_cache: bool = True,
                       **kwargs) -> Dict[str, Any]:
        """Chat completion with semantic caching."""
        
        # Extract user query for caching (last user message)
        user_query = None
        for msg in reversed(messages):
            if msg["role"] == "user":
                user_query = msg["content"]
                break
        
        if not user_query or not use_cache:
            # No caching, call LLM directly
            return self._call_llm(messages, model, **kwargs)
        
        # Generate embedding for semantic search
        query_embedding = self._get_embedding(user_query)
        
        # Search cache
        cache_result = self._search_cache(query_embedding)
        
        if cache_result:
            # Cache hit!
            self.cache_hits += 1
            return {
                "response": cache_result["response"],
                "from_cache": True,
                "cache_similarity": cache_result["similarity"],
                "original_cached_query": cache_result["original_query"],
                "model": model
            }
        
        # Cache miss - call LLM
        self.cache_misses += 1
        result = self._call_llm(messages, model, **kwargs)
        
        # Store in cache
        cache_entry = {
            "query": user_query,
            "embedding": query_embedding.tolist(),
            "response": result["response"],
            "model": model
        }
        
        # Use hash of query as cache key
        cache_key = f"cache:{hashlib.sha256(user_query.encode()).hexdigest()[:16]}"
        self.redis_client.setex(
            cache_key,
            self.ttl_seconds,
            json.dumps(cache_entry)
        )
        
        return result
    
    def _call_llm(self, messages: list, model: str, **kwargs) -> Dict[str, Any]:
        """Call OpenAI API."""
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )
        
        return {
            "response": response.choices[0].message.content,
            "from_cache": False,
            "model": model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            }
        }
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache performance statistics."""
        total_requests = self.cache_hits + self.cache_misses
        hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
        
        return {
            "cache_hits": self.cache_hits,
            "cache_misses": self.cache_misses,
            "total_requests": total_requests,
            "hit_rate_percent": round(hit_rate, 2)
        }


# Example usage
if __name__ == "__main__":
    import os
    
    cache_llm = SemanticCacheLLM(
        openai_api_key=os.environ["OPENAI_API_KEY"],
        similarity_threshold=0.95,  # 95% similarity required
        ttl_seconds=3600  # 1 hour cache
    )
    
    # First request (cache miss)
    result1 = cache_llm.chat_completion(
        messages=[{"role": "user", "content": "What is machine learning?"}],
        model="gpt-4o-mini"
    )
    print(f"Response 1: {result1['response'][:100]}...")
    print(f"From cache: {result1['from_cache']}")
    
    # Similar query (cache hit if similarity > 0.95)
    result2 = cache_llm.chat_completion(
        messages=[{"role": "user", "content": "Explain machine learning to me"}],
        model="gpt-4o-mini"
    )
    print(f"\nResponse 2: {result2['response'][:100]}...")
    print(f"From cache: {result2['from_cache']}")
    if result2['from_cache']:
        print(f"Similarity: {result2['cache_similarity']:.3f}")
    
    # Cache statistics
    stats = cache_llm.get_cache_stats()
    print(f"\nCache Stats: {stats}")

Strategy 2: Request Batching

Batch multiple requests to improve throughput. Trade slightly higher individual latency for much higher total throughput.

When to Use Batching

  • Background processing: Email generation, content moderation
  • Bulk operations: Processing uploaded documents
  • Analytics: Sentiment analysis on batch of reviews
  • NOT for: Real-time chat, user-facing queries
python
import asyncio
import time
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from queue import Queue
import threading


@dataclass
class BatchRequest:
    """A single request in a batch."""
    id: str
    data: Any
    future: asyncio.Future


class RequestBatcher:
    """Batch aggregator for LLM requests with time and size windows."""
    
    def __init__(self,
                 process_batch_fn: Callable,
                 max_batch_size: int = 10,
                 max_wait_ms: int = 100):
        """
        Args:
            process_batch_fn: Async function to process batch of requests
            max_batch_size: Max requests per batch (trigger immediate processing)
            max_wait_ms: Max wait time before processing partial batch
        """
        self.process_batch_fn = process_batch_fn
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        self.pending_requests: List[BatchRequest] = []
        self.lock = asyncio.Lock()
        self.batch_timer: Optional[asyncio.Task] = None
    
    async def add_request(self, request_id: str, data: Any) -> Any:
        """Add request to batch and wait for result."""
        future = asyncio.Future()
        
        async with self.lock:
            request = BatchRequest(id=request_id, data=data, future=future)
            self.pending_requests.append(request)
            
            # Start timer on first request
            if len(self.pending_requests) == 1:
                self.batch_timer = asyncio.create_task(self._wait_and_process())
            
            # Process immediately if batch is full
            if len(self.pending_requests) >= self.max_batch_size:
                if self.batch_timer:
                    self.batch_timer.cancel()
                await self._process_current_batch()
        
        # Wait for result
        return await future
    
    async def _wait_and_process(self):
        """Wait for max_wait_ms, then process batch."""
        try:
            await asyncio.sleep(self.max_wait_ms / 1000.0)
            async with self.lock:
                await self._process_current_batch()
        except asyncio.CancelledError:
            pass
    
    async def _process_current_batch(self):
        """Process all pending requests as a batch."""
        if not self.pending_requests:
            return
        
        batch = self.pending_requests
        self.pending_requests = []
        self.batch_timer = None
        
        try:
            # Process batch
            batch_data = [req.data for req in batch]
            results = await self.process_batch_fn(batch_data)
            
            # Set results for each future
            for req, result in zip(batch, results):
                if not req.future.done():
                    req.future.set_result(result)
        
        except Exception as e:
            # Set exception for all futures
            for req in batch:
                if not req.future.done():
                    req.future.set_exception(e)


# Example: Batched sentiment analysis
async def process_sentiment_batch(texts: List[str]) -> List[str]:
    """Process batch of texts for sentiment analysis."""
    from openai import AsyncOpenAI
    import os
    
    client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
    
    # Create single prompt with all texts
    batch_prompt = "Classify sentiment (positive/negative/neutral) for each text:\n\n"
    for i, text in enumerate(texts, 1):
        batch_prompt += f"{i}. {text}\n"
    batch_prompt += "\nRespond with just the sentiment for each (one per line)."
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": batch_prompt}],
        temperature=0
    )
    
    # Parse results
    sentiments = response.choices[0].message.content.strip().split("\n")
    return sentiments[:len(texts)]  # Ensure we have result for each input


# Usage example
async def main():
    batcher = RequestBatcher(
        process_batch_fn=process_sentiment_batch,
        max_batch_size=10,
        max_wait_ms=100
    )
    
    # Simulate concurrent requests
    texts = [
        "This product is amazing!",
        "Terrible service, very disappointed.",
        "It's okay, nothing special.",
        "Best purchase ever!",
        "Would not recommend."
    ]
    
    # Submit requests concurrently
    tasks = [
        batcher.add_request(f"req_{i}", text)
        for i, text in enumerate(texts)
    ]
    
    start = time.time()
    results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    
    print(f"Processed {len(texts)} requests in {elapsed:.3f}s")
    for text, sentiment in zip(texts, results):
        print(f"  '{text[:40]}...' -> {sentiment}")


if __name__ == "__main__":
    asyncio.run(main())

Strategy 3: Streaming Responses

Stream tokens as they're generated rather than waiting for complete response. Dramatically improves perceived latency.

Benefits

  • Time-to-first-token: 200-500ms (vs 2-5s for full response)
  • Perceived latency: 80% reduction
  • User can start reading immediately
  • Better UX for long responses
python
from openai import OpenAI
import time


def stream_chat_response(messages: list, model: str = "gpt-4o-mini"):
    """Stream LLM response with timing metrics."""
    client = OpenAI()
    
    start_time = time.time()
    first_token_time = None
    total_tokens = 0
    
    print("Assistant: ", end="", flush=True)
    
    stream = client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True  # Enable streaming
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            content = chunk.choices[0].delta.content
            
            # Record time to first token
            if first_token_time is None:
                first_token_time = time.time() - start_time
                print(f"[First token in {first_token_time:.3f}s]", end=" ")
            
            print(content, end="", flush=True)
            total_tokens += 1
    
    total_time = time.time() - start_time
    print(f"\n\n[Metrics: TTFT={first_token_time:.3f}s, Total={total_time:.3f}s, Tokens≈{total_tokens}]")


# Compare: Streaming vs Non-Streaming
if __name__ == "__main__":
    import os
    
    messages = [
        {"role": "user", "content": "Explain how streaming responses improve user experience in 3 paragraphs."}
    ]
    
    print("=== STREAMING (Fast perceived latency) ===")
    stream_chat_response(messages)
    
    print("\n=== NON-STREAMING (Slow perceived latency) ===")
    client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
    start = time.time()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=False
    )
    elapsed = time.time() - start
    print(f"Assistant: {response.choices[0].message.content}")
    print(f"\n[Metrics: Total={elapsed:.3f}s, TTFT={elapsed:.3f}s (waited for full response)]")

Strategy 4: Edge Deployment

Deploy models at edge locations closer to users. Reduces network RTT from 200ms to <50ms.

Edge Options

  • Cloudflare Workers AI: Global edge network with AI models
  • AWS Lambda@Edge: Run inference in CloudFront edge locations
  • Self-hosted edge: Deploy lightweight models (Llama 3B, Phi-3) on regional servers
  • Hybrid: Edge for simple queries, cloud for complex ones
javascript
// Cloudflare Workers AI example
// Deploy at edge locations worldwide for <50ms RTT

export default {
  async fetch(request, env) {
    // Parse request
    const { prompt } = await request.json();
    
    // Run inference at edge using Cloudflare's AI
    const response = await env.AI.run(
      '@cf/meta/llama-3-8b-instruct',  // Lightweight model
      {
        messages: [
          { role: 'system', content: 'You are a helpful assistant.' },
          { role: 'user', content: prompt }
        ],
        max_tokens: 256
      }
    );
    
    return new Response(JSON.stringify({
      response: response.response,
      location: request.cf.colo,  // Edge location that served request
      latency_ms: Date.now() - request.cf.requestTimestamp
    }), {
      headers: { 'Content-Type': 'application/json' }
    });
  }
};

// Deploy with: wrangler deploy
// Result: Global deployment with <50ms RTT from anywhere

Strategy 5: Model Selection

Choose models based on latency requirements, not just quality.

Latency by Model (November 2025)

  • Ultra-fast (< 500ms): GPT-4o-mini, Claude Haiku 4.5, Llama 3.1 8B
  • Fast (500ms-1s): GPT-4o, Claude Sonnet 4.5
  • Standard (1-2s): GPT-5, Claude Opus 4.1, Gemini 2.5 Pro
  • Slow (2-5s+): GPT-5 with thinking mode, Claude Opus 4.1 extended thinking

Decision Matrix

  • User-facing chat: Use GPT-4o-mini or Claude Haiku 4.5 (fast)
  • Complex reasoning: Use GPT-5 or Claude Opus 4.1 (slow but smart)
  • Background jobs: Use best model for task (latency less critical)
  • Search results: Use fast model with RAG enhancement

Strategy 6: Prompt Compression

Reduce token count without losing information. Fewer tokens = faster inference.

Techniques

  • Remove redundancy: Deduplicate few-shot examples
  • Compress instructions: 'Summarize this text' vs 'Provide a concise summary...'
  • LLMLingua: Automated prompt compression (30-50% reduction)
  • Semantic compression: Keep high-importance tokens only

Combined Strategy: Production Architecture

Use multiple strategies together for maximum latency reduction:

  • 1. Check semantic cache (95% latency reduction on hit)
  • 2. If miss, use streaming response (80% perceived latency reduction)
  • 3. Select fast model for user-facing queries (GPT-4o-mini)
  • 4. Deploy at edge for geo-distribution (<50ms RTT)
  • 5. Batch background jobs (higher throughput)
  • 6. Monitor P95 latency and adjust strategies

Benchmark Results

Latency Improvements by Strategy

  • Baseline (no optimization): P95 = 3.2s
  • + Semantic caching (40% hit rate): P95 = 1.9s (41% reduction)
  • + Streaming: TTFT = 450ms (86% perceived reduction)
  • + Fast model (GPT-4o-mini): P95 = 1.1s (66% reduction)
  • + Edge deployment: P95 = 800ms (75% reduction)
  • Combined strategies: P95 = 600ms, TTFT = 200ms (81% total reduction)

Monitoring Latency in Production

Key Metrics

  • P50, P95, P99 latency (not just average)
  • Time-to-first-token (for streaming)
  • Cache hit rate (semantic cache)
  • Latency by model, feature, geography
  • Timeout rate (requests exceeding threshold)

Alert Thresholds

  • P95 > 2 seconds for 5 minutes: Warning
  • P95 > 5 seconds for 2 minutes: Critical
  • TTFT > 1 second for 5 minutes: Warning
  • Cache hit rate < 20% (if normally 40%+): Warning

Common Pitfalls

  • Optimizing average instead of P95/P99 (tail latency matters more)
  • Ignoring time-to-first-token for streaming
  • Over-aggressive caching (serving stale responses)
  • Using slow models for user-facing features
  • Not monitoring cache effectiveness
  • Batching user-facing requests (increases latency)
  • Forgetting to account for cold start latency

Production Checklist

  • ✓ Semantic caching implemented with Redis/Memcached
  • ✓ Cache hit rate monitored (target: 30-50%)
  • ✓ Streaming responses enabled for user-facing features
  • ✓ Fast models (GPT-4o-mini, Haiku 4.5) used for latency-sensitive paths
  • ✓ Edge deployment for geo-distributed users
  • ✓ P50/P95/P99 latency tracked per feature
  • ✓ TTFT monitored for streaming responses
  • ✓ Alert thresholds set for latency degradation
  • ✓ Batching used only for background jobs
  • ✓ Prompt compression applied where beneficial
  • ✓ Model selection based on latency requirements
  • ✓ Latency budget documented per feature

Conclusion

Latency optimization for LLM applications requires a multi-faceted approach. No single strategy solves all problems - production systems combine semantic caching (95% reduction on hits), streaming responses (80% perceived reduction), fast model selection, edge deployment, and careful monitoring. By implementing these strategies systematically and measuring P95/P99 latency (not just averages), you can achieve sub-second response times that meet user expectations and drive business outcomes.

Author

21medien AI Team

Last updated