A comprehensive, production-ready Go framework for building AI agents with streaming support, tool integration, and multi-backend storage.
- Multi-Provider LLM Support: OpenAI, Anthropic Claude, Google Gemini
- Streaming Response Support: Real-time streaming for all LLM providers
- Agent Framework: Configurable agents with middleware, prompts, and memory
- Tool Integration: Register and execute tools/functions
- Model Context Protocol (MCP) Support: Connect over stdio or streamable HTTP (SSE), discover MCP tools, and invoke them through agents
- Multi-Backend Storage:
- In-Memory (development)
- PostgreSQL with full-text search
- Redis with caching
- MongoDB document storage
- PGVector for embeddings
- Session Management: Conversation session tracking and management, supporting single-agent and multi-agent shared sessions, with serializable session records for persistence and analytics
- Runtime Executor Layer: Agent specs and executors decouple runtime behavior from stored conversations, enabling custom execution strategies and observability
- Tool Supervisor: A built-in supervisor keeps tool providers synchronized and refreshes tool schemas automatically
- Execution Graphs: Workflow orchestration with conditional branching
- Agentic RAG: Multi-agent Retrieval-Augmented Generation pipeline with planners, researchers, writers, and critics wired through
graph.Graph - RAG Building Blocks: Dedicated packages for documents, chunking, embedders, retrievers, and rerankers to compose custom pipelines
- Observability-Ready: Structured logging everywhere plus built-in OpenTelemetry tracing hooks (
pkg/telemetry) for pipelines, agents, sessions, and runtime executors - Thread-Safe Operations: RWMutex protected concurrent access
- Configuration Validation: Environment-based configuration with validation
go get github.com/sweetpotato0/ai-allinpackage main
import (
"context"
"github.com/sweetpotato0/ai-allin/agent"
"github.com/sweetpotato0/ai-allin/contrib/provider/openai"
)
func main() {
// Create LLM provider
llm := openai.New(&openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
MaxTokens: 2000,
Temperature: 0.7,
})
// Create agent
ag := agent.New(
agent.WithName("MyAgent"),
agent.WithSystemPrompt("You are a helpful assistant"),
agent.WithProvider(llm),
)
// Run agent; Run now returns *message.Message so you can inspect metadata as well.
resp, err := ag.Run(context.Background(), "What is AI?")
if err != nil {
panic(err)
}
println(resp.Text())
}The runtime package lets you decouple persisted conversation state from the live agent.Agent. A runtime executor consumes a session transcript (session.Record) and produces a runtime.TurnResult with timing metadata plus the final assistant message:
exec := runtime.NewAgentExecutor(ag)
result, err := exec.Execute(ctx, &runtime.Request{
SessionID: "session-1",
Input: "What's next?",
History: existingMessages,
})
if err != nil {
log.Fatalf("executor failed: %v", err)
}
fmt.Println("assistant:", result.Output, "took", result.Duration)This is the same executor that backs session.SingleAgentSession and SharedSession, so you can swap in alternative executors (streaming, tracing, multi-agent) without touching session code.
Registering a tool provider with agent.WithToolProvider now delegates to the runtime tool supervisor. The supervisor loads tools on demand, watches for provider updates, and refreshes the agent's registry automatically:
ag := agent.New(
agent.WithProvider(llm),
agent.WithToolProvider(myProvider), // supervisor handles refresh & errors
)You can inspect refresh failures by adding middleware or memory stores—the supervisor pushes errors back into the agent's conversation as system messages so they can be logged or surfaced to observability pipelines.
All core packages emit structured logs via pkg/logging and create OpenTelemetry spans for critical operations (agent runs, pipeline stages, retrieval, sessions, runtime execution). To enable tracing, initialize the shared telemetry package once at startup:
package main
import (
"context"
"log"
"github.com/sweetpotato0/ai-allin/pkg/telemetry"
)
func main() {
ctx := context.Background()
shutdown, err := telemetry.Init(ctx, telemetry.Config{
ServiceName: "ai-allin-example",
ServiceVersion: "v1.0.0",
Environment: "dev",
})
if err != nil {
log.Fatalf("init telemetry: %v", err)
}
defer shutdown(context.Background())
// ... construct providers/agents/sessions/etc.
}- Set
OTEL_EXPORTER_OTLP_ENDPOINTto send traces to your collector; if the variable is empty the framework falls back to a pretty-printed stdout exporter. - Logs include
serviceandcomponentfields so log collectors can correlate them with spans using the trace/span IDs emitted by your OpenTelemetry backend. - The instrumentation is already wired into the agent runtime, the agentic RAG pipeline, retriever/indexing paths, session manager operations, and the runtime executor—no extra wiring is needed once telemetry is initialized.
- See
examples/telemetryfor a runnable sample that initializes telemetry, wires a mock LLM provider, and demonstrates the logs/traces emitted by a single agent run.
package main
import (
"context"
"log"
"github.com/sweetpotato0/ai-allin/agent"
frameworkmcp "github.com/sweetpotato0/ai-allin/tool/mcp"
)
func main() {
ctx := context.Background()
provider, err := frameworkmcp.NewProvider(ctx, frameworkmcp.Config{
Transport: frameworkmcp.TransportStreamable,
Endpoint: "https://example.com/mcp",
})
if err != nil {
log.Fatalf("connect MCP: %v", err)
}
defer provider.Close()
ag := agent.New(
agent.WithName("mcp-agent"),
agent.WithSystemPrompt("You are a helpful assistant."),
agent.WithToolProvider(provider),
)
if _, err := ag.Run(ctx, "List available MCP tools."); err != nil {
log.Fatalf("agent run failed: %v", err)
}
}Two runnable MCP servers live in examples/mcp so you can exercise both transports end-to-end:
# Streamable HTTP (Terminal 1 + 2)
go run ./examples/mcp/http --host 127.0.0.1 --port 8080 --path /mcp
go run ./examples/mcp -transport stream -endpoint http://127.0.0.1:8080/mcp -prompt "List the available tools."
# Stdio (build once, then point the agent at the binary)
go build -o ./bin/mcp-stdio ./examples/mcp/stdio
go run ./examples/mcp -transport stdio -command ./bin/mcp-stdio -prompt "Get the weather in Tokyo."package main
import (
"context"
"fmt"
"github.com/sweetpotato0/ai-allin/agent"
"github.com/sweetpotato0/ai-allin/contrib/provider/openai"
"github.com/sweetpotato0/ai-allin/session"
"github.com/sweetpotato0/ai-allin/session/store"
)
func main() {
ctx := context.Background()
// Create LLM provider
llm := openai.New(&openai.Config{
APIKey: "your-api-key",
Model: "gpt-4",
})
// Create session manager with Option pattern for store injection
mgr := session.NewManager(session.WithStore(store.NewInMemoryStore()))
// Create single-agent session
ag := agent.New(agent.WithProvider(llm))
sess, err := mgr.Create(ctx, "session-1", ag)
if err != nil {
panic(err)
}
// Run session
response, err := sess.Run(ctx, "Hello")
if err != nil {
panic(err)
}
fmt.Println(response)
// Create shared session (multi-agent collaboration)
sharedSess, err := mgr.CreateShared(ctx, "shared-session")
if err != nil {
panic(err)
}
// Run with different agents in shared session
agent1 := agent.New(agent.WithProvider(llm), agent.WithName("researcher"))
agent2 := agent.New(agent.WithProvider(llm), agent.WithName("solver"))
resp1, _ := sharedSess.RunWithAgent(ctx, agent1, "Collect information")
resp2, _ := sharedSess.RunWithAgent(ctx, agent2, "Provide solution based on information")
fmt.Println(resp1, resp2)
}Every session exposes a serializable session.Record (via session.Session.Snapshot()), which includes the full message transcript, the last assistant message, and timing metadata for the most recent turn. Combine this with manager.Save(ctx, session) after each interaction to persist state into any session/store implementation (in-memory, Redis, Postgres, etc.).
The rag/agentic package ships a ready-to-use, multi-agent Retrieval-Augmented Generation workflow. It sits on top of dedicated building blocks so that the entire lifecycle stays explicit:
- Data preparation – model sources as
rag/document.Documentand chunk them with arag/chunking.Chunker. - Index construction – feed chunks to an
rag/embedder.Embedder(e.g.,embedder.NewVectorAdapter) and persist vectors via therag/retrieverpackage. - Query & retrieval –
retriever.Searchembeds questions, queries thevector.VectorStore, and optionally reranks usingrag/reranker. - Generation integration – the Agentic pipeline plans, routes, and generates the final answer using the curated evidence.
A planner agent decomposes the task, a researcher agent issues searches, a writer agent uses the retrieved evidence, and an optional critic agent reviews the draft answer.
package main
import (
"context"
"log"
"os"
"github.com/sweetpotato0/ai-allin/contrib/provider/openai"
"github.com/sweetpotato0/ai-allin/rag/agentic"
vectorstore "github.com/sweetpotato0/ai-allin/vector/store"
)
func main() {
ctx := context.Background()
apiKey := os.Getenv("OPENAI_API_KEY")
if apiKey == "" {
log.Fatal("missing OPENAI_API_KEY")
}
llm := openai.New(openai.DefaultConfig(apiKey))
embedder := newKeywordEmbedder() // see examples/rag/agentic for a placeholder implementation
store := vectorstore.NewInMemoryVectorStore()
pipeline, err := agentic.NewPipeline(
agentic.Clients{Default: llm},
embedder,
store,
agentic.WithTopK(3),
)
if err != nil {
log.Fatal(err)
}
_ = pipeline.IndexDocuments(ctx,
agentic.Document{ID: "shipping", Title: "Shipping Policy", Content: "..."},
agentic.Document{ID: "returns", Title: "Return Policy", Content: "..."},
)
resp, err := pipeline.Run(ctx, "Summarize shipping timelines and returns.")
if err != nil {
log.Fatal(err)
}
log.Println("Plan steps:", len(resp.Plan.Steps))
log.Println("Answer:", resp.FinalAnswer)
}See docs/rag/overview.md for a deeper dive and examples/rag/agentic for a runnable demo with OpenAI plus a toy embedder. Already have a production retrieval stack? Wrap it and hand it to the pipeline via agentic.WithRetriever(...).
examples/rag/postgres shows how to ground the Agentic RAG pipeline in a pgvector-backed store using OpenAI embeddings. It ingests every Markdown file in docs/ plus the top-level README*.md, AGENTS.md, and CLAUDE.md, so the agent can answer questions straight from the repository documentation.
# 1. Start pgvector locally (or point the env vars at an existing cluster)
docker run --rm -e POSTGRES_PASSWORD=postgres -p 5432:5432 ankane/pgvector
# 2. Export credentials for the example
export OPENAI_API_KEY=sk-...
export PGVECTOR_PASSWORD=postgres
export PGVECTOR_USER=postgres
export PGVECTOR_DATABASE=postgres
# 3. Run the example (indexes docs the first time, then reuses them)
go run ./examples/rag/postgres -question "How does ai-allin integrate with MCP?"Set PGVECTOR_HOST, PGVECTOR_PORT, PGVECTOR_SSLMODE, or PGVECTOR_TABLE if you need to override the defaults, and pass -reindex whenever you want to rebuild the embeddings from disk.
If you need to rehydrate sessions from a persistent store in a new process, register an AgentResolver with session.WithAgentResolver so the manager knows how to rebuild the underlying agent prototype for any single-agent session.
- agent: Agent implementation with options pattern
- context: Conversation context management
- graph: Workflow graph execution
- memory: Memory storage interface and implementations
- message: Message and role definitions
- middleware: Middleware chain for request processing
- prompt: Prompt template management
- runner: Parallel task execution
- session: Session management, supporting single-agent and multi-agent shared sessions
- session/store: Session storage backends (InMemory, Redis, etc.)
- tool: Tool registration and execution
- vector: Vector embedding storage and search
- InMemory: Fast development storage
- PostgreSQL: Production-grade with full-text search indexes
- Redis: High-performance caching layer
- MongoDB: Document-based storage
- PGVector: Vector similarity search
# PostgreSQL Configuration
export POSTGRES_HOST=localhost
export POSTGRES_PORT=5432
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=your_password
export POSTGRES_DB=ai_allin
export POSTGRES_SSLMODE=disable
# Redis Configuration
export REDIS_ADDR=localhost:6379
export REDIS_PASSWORD=""
export REDIS_DB=0
export REDIS_PREFIX=ai-allin:memory:
# MongoDB Configuration
export MONGODB_URI=mongodb://localhost:27017
export MONGODB_DB=ai_allin
export MONGODB_COLLECTION=memories| Operation | Before | After | Improvement |
|---|---|---|---|
| ID Generation | 1000 ns/op | 113 ns/op | 9x faster |
| Full-Text Search | O(n) scan | O(log n) index | 10-1000x faster |
| Concurrent Connections | Unlimited | 25 pooled | More stable |
| Query Timeouts | None | 30 seconds | Resource safe |
All concurrent operations are protected with sync.RWMutex:
- Context message management
- Tool registry operations
- Prompt template management
Run all tests:
go test ./...Run specific package tests:
go test ./agent -v
go test ./config -v
go test ./memory -v- PostgreSQL 12+ (optional, for production storage)
- Go 1.18+
- Set required environment variables
- Set up environment variables for your database
- Run database migrations
- Configure connection pooling based on your load
- Enable query timeouts (default: 30 seconds)
Monitor these metrics:
- Active database connections
- Query execution times
- Memory usage (with pagination limits)
- Error rates by operation type
Contributions are welcome! Please ensure:
- Code passes
go build ./... - Tests pass
go test ./... - Code follows Go conventions
- Changes are well-documented
MIT
For issues, questions, or contributions, please refer to the project repository.