Production Agent Deployment: Infrastructure, Safety, and Operations

Production agent deployment: infrastructure patterns, safety guardrails, cost control, state management, and operational best practices for LLM agents.

Introduction

Deploying LLM agents to production is harder than deploying standard APIs because agents are autonomous, stateful, and non-deterministic. They make decisions, call external tools, and run for minutes instead of milliseconds. This guide covers the infrastructure, safety patterns, and operational practices needed to run agents reliably at scale.

The Production Readiness Checklist

Before deploying an agent to production, verify:

Safety

  • ✅ Tool allowlist (only permitted tools can be called)
  • ✅ Tool call validation (schema check before execution)
  • ✅ Output filtering (PII redaction, profanity filter)
  • ✅ Human-in-the-loop for high-risk actions (delete, payment)
  • ✅ Timeout limits (agent can't run indefinitely)

Reliability

  • ✅ Retry logic with exponential backoff
  • ✅ Circuit breaker for failing tools
  • ✅ Graceful degradation (fallback to simpler agent)
  • ✅ State persistence (resume from checkpoint on crash)

Observability

  • ✅ Trace every LLM call (prompt, response, tokens, cost)
  • ✅ Trace every tool call (name, args, result, latency)
  • ✅ Session-level logging (full conversation history)
  • ✅ Alerting on cost spikes, error spikes, latency spikes

Cost Control

  • ✅ Per-user token budgets (kill session if exceeded)
  • ✅ Prompt caching for repeated inputs
  • ✅ Model selection (use GPT-4o-mini for simple tasks)
  • ✅ Max turns per session (prevent infinite loops)

Infrastructure Patterns

Pattern 1: Synchronous (Request-Response)

Agent runs inline with HTTP request, returns result synchronously.

@app.post("/agent")
async def run_agent(request: AgentRequest):
    agent = CodeReviewAgent()
    result = await agent.run(request.pr_url)
    return {"status": "complete", "result": result}

Pros: Simple, stateless

Cons: HTTP timeout if agent takes >30s, no progress updates

Use when: Agent completes in <10s (simple RAG, single-turn Q&A)

Pattern 2: Async with Polling (Job Queue)

Submit job, get job_id, poll for completion.

@app.post("/agent/submit")
async def submit_job(request: AgentRequest):
    job_id = queue.enqueue(agent_task, request)
    return {"job_id": job_id, "status": "pending"}

@app.get("/agent/status/{job_id}")
async def get_status(job_id: str):
    job = queue.get_job(job_id)
    return {"status": job.status, "result": job.result}

Pros: Handles long-running agents, decouples HTTP from execution

Cons: Polling overhead, no real-time updates

Use when: Agent takes 10s-5min (code generation, research)

Pattern 3: Async with Webhooks (Event-Driven)

Submit job, agent sends result to callback URL when done.

@app.post("/agent/submit")
async def submit_job(request: AgentRequest):
    job_id = queue.enqueue(agent_task, request, callback_url=request.webhook_url)
    return {"job_id": job_id, "status": "pending"}

# Worker
async def agent_task(job_id, request, callback_url):
    result = await agent.run(request)
    await httpx.post(callback_url, json={"job_id": job_id, "result": result})

Pros: No polling, efficient

Cons: Requires webhook endpoint from client

Use when: Long-running agents (>5min), server-to-server integration

Pattern 4: Streaming (Server-Sent Events)

Stream agent progress in real-time via SSE.

@app.post("/agent/stream")
async def stream_agent(request: AgentRequest):
    async def event_generator():
        agent = CodeReviewAgent()
        async for event in agent.run_streaming(request.pr_url):
            yield f"data: {json.dumps(event)}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Pros: Real-time updates, better UX

Cons: Requires persistent connection, harder to load balance

Use when: User-facing UI, need to show incremental progress

Safety Guardrails

Tool Allowlist and Validation

Never let agents call arbitrary tools. Maintain an explicit allowlist:

ALLOWED_TOOLS = {
    "search_web": {"schema": WebSearchSchema, "risk": "low"},
    "read_file": {"schema": FileReadSchema, "risk": "medium"},
    "execute_code": {"schema": CodeExecSchema, "risk": "high"},
}

def validate_tool_call(tool_name: str, args: dict) -> bool:
    if tool_name not in ALLOWED_TOOLS:
        raise ToolNotAllowedError(f"{tool_name} is not permitted")

    schema = ALLOWED_TOOLS[tool_name]["schema"]
    try:
        schema(**args)  # Pydantic validation
        return True
    except ValidationError as e:
        raise ToolCallValidationError(f"Invalid args: {e}")

Human-in-the-Loop for High-Risk Actions

For destructive or financial actions, require human approval:

async def execute_tool(tool_name: str, args: dict):
    risk_level = ALLOWED_TOOLS[tool_name]["risk"]

    if risk_level == "high":
        approval = await request_human_approval(tool_name, args)
        if not approval:
            return {"status": "denied", "reason": "User rejected action"}

    return await tools[tool_name](**args)

Output Filtering

Scan agent outputs for PII, secrets, profanity:

import re

PII_PATTERNS = {
    "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
    "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
    "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
    "credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
}

def redact_pii(text: str) -> str:
    for name, pattern in PII_PATTERNS.items():
        text = re.sub(pattern, f"[{name.upper()}_REDACTED]", text)
    return text

# Apply after LLM generation
response = llm.invoke(prompt)
safe_response = redact_pii(response.content)

Timeout and Loop Detection

Prevent infinite agent loops:

class AgentRunner:
    def __init__(self, max_turns=20, timeout_seconds=300):
        self.max_turns = max_turns
        self.timeout_seconds = timeout_seconds

    async def run(self, task):
        start_time = time.time()
        turns = 0

        while True:
            if turns >= self.max_turns:
                raise MaxTurnsExceededError(f"Agent exceeded {self.max_turns} turns")

            if time.time() - start_time > self.timeout_seconds:
                raise TimeoutError(f"Agent exceeded {self.timeout_seconds}s timeout")

            action = await agent.step(task)
            turns += 1

            if action.is_final:
                return action.result

State Management

Stateless vs Stateful Agents

Stateless (Recommended for most use cases):

# Each request is independent
@app.post("/agent")
async def run_agent(request: AgentRequest):
    agent = Agent()  # fresh instance
    result = await agent.run(request.task)
    return result

Stateful (For multi-turn conversations):

# State persisted in DB
@app.post("/agent/message")
async def send_message(session_id: str, message: str):
    state = db.get_session_state(session_id)
    agent = Agent.from_state(state)

    result = await agent.step(message)

    db.save_session_state(session_id, agent.to_state())
    return result

State Serialization

For checkpointing and resume:

class AgentState:
    messages: List[Message]
    tool_results: Dict[str, Any]
    metadata: Dict[str, Any]

    def to_json(self) -> str:
        return json.dumps({
            "messages": [m.dict() for m in self.messages],
            "tool_results": self.tool_results,
            "metadata": self.metadata
        })

    @classmethod
    def from_json(cls, json_str: str):
        data = json.loads(json_str)
        return cls(
            messages=[Message(**m) for m in data["messages"]],
            tool_results=data["tool_results"],
            metadata=data["metadata"]
        )

Cost Control Strategies

Per-User Token Budgets

class TokenBudget:
    def __init__(self, user_id: str, limit: int):
        self.user_id = user_id
        self.limit = limit
        self.used = redis.get(f"tokens:{user_id}") or 0

    def consume(self, tokens: int):
        self.used += tokens
        redis.set(f"tokens:{self.user_id}", self.used, ex=86400)  # 24h TTL

        if self.used > self.limit:
            raise BudgetExceededError(f"User {self.user_id} exceeded {self.limit} tokens")

# Usage
budget = TokenBudget(user_id="alice", limit=100000)
response = llm.invoke(prompt)
budget.consume(response.usage.total_tokens)

Prompt Caching

Cache common prompts to avoid recomputing:

import hashlib

def get_cached_response(prompt: str, model: str):
    cache_key = hashlib.md5(f"{model}:{prompt}".encode()).hexdigest()
    cached = redis.get(cache_key)

    if cached:
        return json.loads(cached)

    response = llm.invoke(prompt, model=model)
    redis.set(cache_key, json.dumps(response.dict()), ex=3600)  # 1h TTL
    return response

Model Selection Logic

Route simple tasks to cheap models:

def select_model(task_complexity: str) -> str:
    if task_complexity == "simple":
        return "gpt-4o-mini"  # $0.15/$0.60 per 1M tokens
    elif task_complexity == "medium":
        return "claude-3-5-sonnet-20250219"  # $3/$15 per 1M tokens
    else:
        return "gpt-4o"  # $2.50/$10 per 1M tokens

# Usage
complexity = classify_task_complexity(task)
model = select_model(complexity)
response = llm.invoke(prompt, model=model)

Error Handling and Resilience

Retry with Exponential Backoff

import asyncio
import random

async def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return await func()
        except (RateLimitError, TimeoutError) as e:
            if attempt == max_retries - 1:
                raise

            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(delay)

# Usage
response = await retry_with_backoff(lambda: llm.invoke(prompt))

Circuit Breaker for Tools

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open

    async def call(self, func):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half_open"
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func()
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise

    def on_success(self):
        self.failure_count = 0
        self.state = "closed"

    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

Deployment Platforms

Platform Best For Pros Cons
Modal Async agents, batch jobs Auto-scaling, GPU support, simple API Vendor lock-in, cold starts
AWS Lambda Lightweight agents (<15min) Mature, cheap, integrates with AWS 15min timeout, cold starts
Cloudflare Workers Edge agents, low-latency Global edge network, instant startup Limited CPU time, no GPU
Kubernetes Long-running agents, custom infra Full control, any runtime Ops overhead, complex setup
Fly.io Always-on agents, stateful apps Simple, persistent volumes, global More expensive than serverless

Monitoring and Alerting

Key Metrics to Track

  • Agent success rate: % of sessions that completed successfully
  • Average turns per session: How many LLM calls per task
  • Tool success rate: % of tool calls that succeeded
  • Cost per session: Total LLM cost per agent run
  • Latency distribution: p50/p95/p99 end-to-end latency
  • User satisfaction: Thumbs up/down rate

Alerts to Set Up

  • Alert if agent error rate > 5%
  • Alert if average cost per session spikes 3×
  • Alert if any single session exceeds $10 cost
  • Alert if p95 latency > 60s
  • Alert if tool X fails > 50% of the time

Operational Best Practices

1. Canary Deployments

Roll out new agent versions to 10% of traffic first:

if random.random() < 0.1:
    agent = NewAgent()  # canary
else:
    agent = OldAgent()  # stable

Monitor metrics for 24h. If canary performs worse, rollback.

2. A/B Testing Prompts

Test prompt changes with statistical rigor:

variant = hash(user_id) % 2  # deterministic A/B split
if variant == 0:
    prompt = PROMPT_A
else:
    prompt = PROMPT_B

log_experiment("prompt_ab_test", user_id, variant)

3. Shadow Mode

Run new agent in parallel, log results, but don't return to user:

primary_result = await agent_v1.run(task)
asyncio.create_task(agent_v2.run(task))  # shadow, don't await

return primary_result

Compare shadow results offline to validate new version.

Conclusion

Production agent deployment requires more than just wrapping an LLM in an API. You need safety guardrails (tool validation, output filtering, human-in-the-loop), reliability patterns (retries, circuit breakers, state management), cost controls (token budgets, model selection), and comprehensive observability. Start simple—synchronous execution, stateless agents—and add complexity only when needed.

Build Better AI Tools

DevKits provides developer tools for JSON formatting, Base64 encoding, regex testing, and more — all free and privacy-first.

Try DevKits →