Introduction
Deploying LLM agents to production is harder than deploying standard APIs because agents are autonomous, stateful, and non-deterministic. They make decisions, call external tools, and run for minutes instead of milliseconds. This guide covers the infrastructure, safety patterns, and operational practices needed to run agents reliably at scale.
The Production Readiness Checklist
Before deploying an agent to production, verify:
Safety
- ✅ Tool allowlist (only permitted tools can be called)
- ✅ Tool call validation (schema check before execution)
- ✅ Output filtering (PII redaction, profanity filter)
- ✅ Human-in-the-loop for high-risk actions (delete, payment)
- ✅ Timeout limits (agent can't run indefinitely)
Reliability
- ✅ Retry logic with exponential backoff
- ✅ Circuit breaker for failing tools
- ✅ Graceful degradation (fallback to simpler agent)
- ✅ State persistence (resume from checkpoint on crash)
Observability
- ✅ Trace every LLM call (prompt, response, tokens, cost)
- ✅ Trace every tool call (name, args, result, latency)
- ✅ Session-level logging (full conversation history)
- ✅ Alerting on cost spikes, error spikes, latency spikes
Cost Control
- ✅ Per-user token budgets (kill session if exceeded)
- ✅ Prompt caching for repeated inputs
- ✅ Model selection (use GPT-4o-mini for simple tasks)
- ✅ Max turns per session (prevent infinite loops)
Infrastructure Patterns
Pattern 1: Synchronous (Request-Response)
Agent runs inline with HTTP request, returns result synchronously.
@app.post("/agent")
async def run_agent(request: AgentRequest):
agent = CodeReviewAgent()
result = await agent.run(request.pr_url)
return {"status": "complete", "result": result}
Pros: Simple, stateless
Cons: HTTP timeout if agent takes >30s, no progress updates
Use when: Agent completes in <10s (simple RAG, single-turn Q&A)
Pattern 2: Async with Polling (Job Queue)
Submit job, get job_id, poll for completion.
@app.post("/agent/submit")
async def submit_job(request: AgentRequest):
job_id = queue.enqueue(agent_task, request)
return {"job_id": job_id, "status": "pending"}
@app.get("/agent/status/{job_id}")
async def get_status(job_id: str):
job = queue.get_job(job_id)
return {"status": job.status, "result": job.result}
Pros: Handles long-running agents, decouples HTTP from execution
Cons: Polling overhead, no real-time updates
Use when: Agent takes 10s-5min (code generation, research)
Pattern 3: Async with Webhooks (Event-Driven)
Submit job, agent sends result to callback URL when done.
@app.post("/agent/submit")
async def submit_job(request: AgentRequest):
job_id = queue.enqueue(agent_task, request, callback_url=request.webhook_url)
return {"job_id": job_id, "status": "pending"}
# Worker
async def agent_task(job_id, request, callback_url):
result = await agent.run(request)
await httpx.post(callback_url, json={"job_id": job_id, "result": result})
Pros: No polling, efficient
Cons: Requires webhook endpoint from client
Use when: Long-running agents (>5min), server-to-server integration
Pattern 4: Streaming (Server-Sent Events)
Stream agent progress in real-time via SSE.
@app.post("/agent/stream")
async def stream_agent(request: AgentRequest):
async def event_generator():
agent = CodeReviewAgent()
async for event in agent.run_streaming(request.pr_url):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Pros: Real-time updates, better UX
Cons: Requires persistent connection, harder to load balance
Use when: User-facing UI, need to show incremental progress
Safety Guardrails
Tool Allowlist and Validation
Never let agents call arbitrary tools. Maintain an explicit allowlist:
ALLOWED_TOOLS = {
"search_web": {"schema": WebSearchSchema, "risk": "low"},
"read_file": {"schema": FileReadSchema, "risk": "medium"},
"execute_code": {"schema": CodeExecSchema, "risk": "high"},
}
def validate_tool_call(tool_name: str, args: dict) -> bool:
if tool_name not in ALLOWED_TOOLS:
raise ToolNotAllowedError(f"{tool_name} is not permitted")
schema = ALLOWED_TOOLS[tool_name]["schema"]
try:
schema(**args) # Pydantic validation
return True
except ValidationError as e:
raise ToolCallValidationError(f"Invalid args: {e}")
Human-in-the-Loop for High-Risk Actions
For destructive or financial actions, require human approval:
async def execute_tool(tool_name: str, args: dict):
risk_level = ALLOWED_TOOLS[tool_name]["risk"]
if risk_level == "high":
approval = await request_human_approval(tool_name, args)
if not approval:
return {"status": "denied", "reason": "User rejected action"}
return await tools[tool_name](**args)
Output Filtering
Scan agent outputs for PII, secrets, profanity:
import re
PII_PATTERNS = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
}
def redact_pii(text: str) -> str:
for name, pattern in PII_PATTERNS.items():
text = re.sub(pattern, f"[{name.upper()}_REDACTED]", text)
return text
# Apply after LLM generation
response = llm.invoke(prompt)
safe_response = redact_pii(response.content)
Timeout and Loop Detection
Prevent infinite agent loops:
class AgentRunner:
def __init__(self, max_turns=20, timeout_seconds=300):
self.max_turns = max_turns
self.timeout_seconds = timeout_seconds
async def run(self, task):
start_time = time.time()
turns = 0
while True:
if turns >= self.max_turns:
raise MaxTurnsExceededError(f"Agent exceeded {self.max_turns} turns")
if time.time() - start_time > self.timeout_seconds:
raise TimeoutError(f"Agent exceeded {self.timeout_seconds}s timeout")
action = await agent.step(task)
turns += 1
if action.is_final:
return action.result
State Management
Stateless vs Stateful Agents
Stateless (Recommended for most use cases):
# Each request is independent
@app.post("/agent")
async def run_agent(request: AgentRequest):
agent = Agent() # fresh instance
result = await agent.run(request.task)
return result
Stateful (For multi-turn conversations):
# State persisted in DB
@app.post("/agent/message")
async def send_message(session_id: str, message: str):
state = db.get_session_state(session_id)
agent = Agent.from_state(state)
result = await agent.step(message)
db.save_session_state(session_id, agent.to_state())
return result
State Serialization
For checkpointing and resume:
class AgentState:
messages: List[Message]
tool_results: Dict[str, Any]
metadata: Dict[str, Any]
def to_json(self) -> str:
return json.dumps({
"messages": [m.dict() for m in self.messages],
"tool_results": self.tool_results,
"metadata": self.metadata
})
@classmethod
def from_json(cls, json_str: str):
data = json.loads(json_str)
return cls(
messages=[Message(**m) for m in data["messages"]],
tool_results=data["tool_results"],
metadata=data["metadata"]
)
Cost Control Strategies
Per-User Token Budgets
class TokenBudget:
def __init__(self, user_id: str, limit: int):
self.user_id = user_id
self.limit = limit
self.used = redis.get(f"tokens:{user_id}") or 0
def consume(self, tokens: int):
self.used += tokens
redis.set(f"tokens:{self.user_id}", self.used, ex=86400) # 24h TTL
if self.used > self.limit:
raise BudgetExceededError(f"User {self.user_id} exceeded {self.limit} tokens")
# Usage
budget = TokenBudget(user_id="alice", limit=100000)
response = llm.invoke(prompt)
budget.consume(response.usage.total_tokens)
Prompt Caching
Cache common prompts to avoid recomputing:
import hashlib
def get_cached_response(prompt: str, model: str):
cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest()
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
response = llm.invoke(prompt, model=model)
redis.set(cache_key, json.dumps(response.dict()), ex=3600) # 1h TTL
return response
Model Selection Logic
Route simple tasks to cheap models:
def select_model(task_complexity: str) -> str:
if task_complexity == "simple":
return "gpt-4o-mini" # $0.15/$0.60 per 1M tokens
elif task_complexity == "medium":
return "claude-3-5-sonnet-20250219" # $3/$15 per 1M tokens
else:
return "gpt-4o" # $2.50/$10 per 1M tokens
# Usage
complexity = classify_task_complexity(task)
model = select_model(complexity)
response = llm.invoke(prompt, model=model)
Error Handling and Resilience
Retry with Exponential Backoff
import asyncio
import random
async def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return await func()
except (RateLimitError, TimeoutError) as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
# Usage
response = await retry_with_backoff(lambda: llm.invoke(prompt))
Circuit Breaker for Tools
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
async def call(self, func):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half_open"
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func()
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = "closed"
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
Deployment Platforms
| Platform | Best For | Pros | Cons |
|---|---|---|---|
| Modal | Async agents, batch jobs | Auto-scaling, GPU support, simple API | Vendor lock-in, cold starts |
| AWS Lambda | Lightweight agents (<15min) | Mature, cheap, integrates with AWS | 15min timeout, cold starts |
| Cloudflare Workers | Edge agents, low-latency | Global edge network, instant startup | Limited CPU time, no GPU |
| Kubernetes | Long-running agents, custom infra | Full control, any runtime | Ops overhead, complex setup |
| Fly.io | Always-on agents, stateful apps | Simple, persistent volumes, global | More expensive than serverless |
Monitoring and Alerting
Key Metrics to Track
- Agent success rate: % of sessions that completed successfully
- Average turns per session: How many LLM calls per task
- Tool success rate: % of tool calls that succeeded
- Cost per session: Total LLM cost per agent run
- Latency distribution: p50/p95/p99 end-to-end latency
- User satisfaction: Thumbs up/down rate
Alerts to Set Up
- Alert if agent error rate > 5%
- Alert if average cost per session spikes 3×
- Alert if any single session exceeds $10 cost
- Alert if p95 latency > 60s
- Alert if tool X fails > 50% of the time
Operational Best Practices
1. Canary Deployments
Roll out new agent versions to 10% of traffic first:
if random.random() < 0.1:
agent = NewAgent() # canary
else:
agent = OldAgent() # stable
Monitor metrics for 24h. If canary performs worse, rollback.
2. A/B Testing Prompts
Test prompt changes with statistical rigor:
variant = hash(user_id) % 2 # deterministic A/B split
if variant == 0:
prompt = PROMPT_A
else:
prompt = PROMPT_B
log_experiment("prompt_ab_test", user_id, variant)
3. Shadow Mode
Run new agent in parallel, log results, but don't return to user:
primary_result = await agent_v1.run(task)
asyncio.create_task(agent_v2.run(task)) # shadow, don't await
return primary_result
Compare shadow results offline to validate new version.
Conclusion
Production agent deployment requires more than just wrapping an LLM in an API. You need safety guardrails (tool validation, output filtering, human-in-the-loop), reliability patterns (retries, circuit breakers, state management), cost controls (token budgets, model selection), and comprehensive observability. Start simple—synchronous execution, stateless agents—and add complexity only when needed.
Build Better AI Tools
DevKits provides developer tools for JSON formatting, Base64 encoding, regex testing, and more — all free and privacy-first.
Try DevKits →