Production-grade LLM agent orchestration for extracting validated value propositions from unstructured customer feedback.
Not a toy chatbot. A production-ready system with real observability, cost tracking, and quality controls.
A FastAPI service that orchestrates multi-step LLM agent workflows to transform unstructured sales notes into structured, validated value propositions.
Key Features:
- 🤖 Multi-provider support: OpenAI, Anthropic, Google
- ⚡ Async-first: FastAPI + Gunicorn + Uvicorn workers
- 📊 Full observability: Prometheus metrics, LangSmith tracing, structured logging
- 🔄 Production patterns: Retries, fallbacks, circuit breakers
- 💾 Cost optimization: Redis caching, token accounting
- 🛡️ Quality controls: PII detection, hallucination prevention
- 🐳 Production-ready: Docker, Postgres, Redis, MinIO
5-minute setup with Docker:
# 1. Clone and setup
git clone <your-repo>
cd agentops-studio
cp .env.example .env
# 2. Add your API keys to .env
# Required: OPENAI_API_KEY, ANTHROPIC_API_KEY, or GOOGLE_API_KEY
# 3. Start the stack
docker-compose up -d
# 4. Test the API
curl -X POST "http://localhost:8000/api/v1/workflow/execute" \
-H "Content-Type: application/json" \
-d '{
"content": "Customer struggling with 5 hours/day manual data entry from PDFs. Budget approved for Q1.",
"source": "sales_call"
}'Services now running:
- API: http://localhost:8000 (Docs)
- Prometheus: http://localhost:9090
- Grafana: http://localhost:3000
See QUICKSTART.md for detailed setup instructions.
Sales teams have unstructured notes from customer conversations. They need to extract validated, actionable value propositions quickly and accurately.
Input Example:
"Had a call with Acme Corp. They're manually entering data from PDFs into
spreadsheets - takes 5 hours/day. Looking for automation. Budget approved
for Q1. CFO Sarah mentioned compliance is critical."
Output Example:
{
"headline": "Eliminate 5 hours of daily manual data entry with compliant automation",
"quantified_value": "Save 25 hours/week per employee",
"key_talking_points": [
"Reduce manual data entry from 5 hours to minutes",
"Built-in compliance for CFO approval",
"Ready for Q1 deployment"
]
}4-Step Agent Pipeline:
Input → [1. Ingest] → [2. Extract] → [3. Self-Check] → [4. Rewrite] → Output
↓ ↓ ↓ ↓
Normalize Structured Validate Value Prop
Detect PII Insights Prevent Errors Per Persona
Each step:
- Runs async for performance
- Has retry logic with exponential backoff
- Records metrics (cost, latency, tokens)
- Traces execution in LangSmith
- Handles provider-specific quirks
See ARCHITECTURE.md for deep technical dive.
# All I/O is async - handle multiple requests concurrently
async def execute_workflow(input: RawInput):
# LLM calls are high-latency (0.5-5s)
# Async lets one worker handle many concurrent requests
result = await provider.generate(...)# Same interface, different implementations
class BaseProvider(ABC):
async def generate(prompt, schema) -> ProviderResponse
def calculate_cost(tokens_in, tokens_out) -> float
# OpenAI uses function calling
# Anthropic uses tool calling
# Google embeds schema in prompt# Every request produces:
# - Prometheus metrics (latency, cost, errors)
# - Structured JSON logs
# - LangSmith traces (prompt -> response chain)
# - Database record for analytics
@track_workflow_execution
async def execute(...):
logger.info("workflow_started", provider="openai", model="gpt-4")
# ... execution ...
metrics.record(cost_usd=0.005, latency_ms=1200)# Self-check agent validates extraction
self_check = await verify_extraction(original, extracted)
if not self_check.approved:
# Reject workflow, don't pass hallucinated data
raise ValidationError(self_check.rejection_reason)# Redis cache by content hash
cache_key = sha256(content + provider + model)
if cached := await redis.get(cache_key):
return cached # Skip expensive LLM call
# Track costs in real-time
total_cost = sum(step.cost for step in workflow.steps)app/
├── api/ # FastAPI routes and handlers
├── agents/ # 4-step workflow orchestration
├── providers/ # OpenAI, Anthropic, Google abstractions
├── prompts/ # System prompts per agent step
├── schemas/ # Pydantic models for validation
├── workflows/ # Redis caching
├── observability/ # Metrics, logging, tracing
├── config.py # Settings management
└── main.py # FastAPI application
tests/ # pytest test suite
docker/ # Docker configurations
config/ # Prometheus, Grafana configs
examples/ # Sample API calls
curl -X POST "http://localhost:8000/api/v1/workflow/execute" \
-H "Content-Type: application/json" \
-d '{
"content": "Your sales notes here...",
"source": "sales_call"
}'curl -X POST "http://localhost:8000/api/v1/workflow/execute?provider=anthropic" \
-H "Content-Type: application/json" \
-d '{ ... }'curl http://localhost:8000/api/v1/metricsPer-Workflow Costs (4 LLM calls per run):
- OpenAI GPT-4o-mini: ~$0.001-0.003
- Anthropic Claude Sonnet: ~$0.05-0.10
- Google Gemini Flash Lite: ~$0.001-0.002
Performance:
- Average latency: 2-5 seconds (4 LLM calls)
- Cache hit rate: ~70% in dev/test
- Self-check rejection rate: ~5% (prevents bad extractions)
Observability:
- Prometheus metrics: 15+ custom metrics
- Structured logs: All events in JSON
- LangSmith traces: Full prompt/response chains
This project showcases production engineering patterns for LLM applications:
✅ Async Python - FastAPI, asyncio, concurrent LLM calls ✅ Multi-provider integration - OpenAI, Anthropic, Google with unified interface ✅ Production infrastructure - Docker, Gunicorn, PostgreSQL, Redis ✅ Observability - Prometheus, Grafana, LangSmith, structured logging ✅ Safety - PII detection, hallucination prevention, retries ✅ Cost awareness - Token accounting, caching, model selection ✅ Testing - pytest, mocks, integration tests ✅ Documentation - Architecture diagrams, design decisions, examples
This is NOT:
- ❌ A toy chatbot
- ❌ A single-provider wrapper
- ❌ An unmonitored black box
- ❌ Production-deployed (but architecture-ready)
# Install dependencies
poetry install
# Run tests
make test
# Format code
make format
# Lint
make lint
# Run locally
make run
# Docker
make docker-up- QUICKSTART.md - Get running in 5 minutes
- README_DETAILED.md - Full feature documentation
- ARCHITECTURE.md - System design deep-dive
- API Docs - Interactive Swagger UI
LLM API calls have high latency (0.5-5s). With sync code:
# BAD: Blocks worker for 10+ seconds per request
def execute_workflow():
step1 = openai.call() # 2s
step2 = openai.call() # 2s
step3 = openai.call() # 3s
step4 = openai.call() # 3s
# Total: 10s, worker is blocked entire timeWith async:
# GOOD: Worker can handle other requests while waiting for I/O
async def execute_workflow():
step1 = await openai.call() # 2s (worker free during wait)
step2 = await openai.call() # 2s (worker free during wait)
# Worker can process 10+ concurrent requestsDifferent providers have different APIs:
OpenAI (Function Calling):
response = await openai.chat.completions.create(
functions=[{"name": "extract", "parameters": schema}],
function_call={"name": "extract"}
)Anthropic (Tool Calling):
response = await anthropic.messages.create(
tools=[{"name": "extract", "input_schema": schema}],
tool_choice={"type": "tool", "name": "extract"}
)Our Abstraction:
# Same interface for all providers
response = await provider.generate(
prompt=prompt,
json_schema=schema
)Prevents hallucinations by validating extractions:
# Extract data
extracted = await extract_agent.run(input)
# Verify against original
verification = await self_check_agent.run(
original=input,
extracted=extracted
)
# Only proceed if verified
if verification.hallucination_risk > 0.3:
raise ValidationError("High hallucination risk")This catches ~5% of extractions that would be inaccurate.
Development Costs (1000 workflows):
- Without caching: ~$4-8
- With caching (70% hit rate): ~$1.20-2.40
- Savings: 70%
Production Optimizations:
- Use GPT-3.5 for simple steps: 5x cheaper
- Tune prompts to reduce input tokens
- Batch similar requests
- Cache results for 1 hour (configurable)
Current capacity (single pod):
- 4 Uvicorn workers
- ~40 concurrent requests
- ~400 requests/minute
Horizontal scaling:
# Kubernetes HPA
replicas: 3-10 # Auto-scale on CPU
targetCPU: 70%Bottlenecks:
- LLM API rate limits (provider-specific)
- Database writes (use async + batching)
- Redis memory (increase for larger cache)
Current:
- Non-root Docker user
- Environment-based secrets
- PII detection and redaction
- Input validation (Pydantic)
TODO (pre-production):
- HTTPS/TLS termination
- JWT authentication
- Rate limiting per client
- SQL injection protection (SQLAlchemy handles)
MIT License - see LICENSE
Built using production patterns from:
- Stripe's API design
- Anthropic's tool calling
- OpenAI's function calling
- FastAPI best practices
Prompt engineering patterns from OpenAI Cookbook.
This demonstrates production-grade engineering for LLM agent systems. It shows:
- How to architect multi-step agentic workflows
- How to integrate multiple LLM providers
- How to build production observability
- How to manage costs and quality at scale
Not just code - a complete system.