Design Patterns¶
This guide shows how to combine cognitia features for real-world use cases. Each pattern includes a complete, runnable code example with the exact imports from the codebase.
1. Production-Ready Agent¶
Combines: Agent + CostBudget + Guardrails + RetryPolicy + InputFilter
When to Use¶
You are deploying an agent that handles real user traffic. You need:
- Input validation (length limits, forbidden patterns)
- Cost budget enforcement so a single query cannot drain your account
- Retry with exponential backoff for transient API failures
- Model fallback chain for rate limit resilience
- Security middleware to block dangerous tool inputs
Architecture¶
User Prompt
|
v
+-------------------+ +--------------------+
| ContentLength |----->| RegexGuardrail |
| Guardrail (input) | | (forbidden words) |
+-------------------+ +--------------------+
|
v
+------------------+
| MaxTokensFilter |
| (truncate old |
| messages) |
+------------------+
|
v
+-------------------------------+
| Agent |
| middleware: |
| SecurityGuard |
| ToolOutputCompressor |
| CostTracker |
| retry: ExponentialBackoff |
| fallback: ModelFallbackChain |
+-------------------------------+
|
v
+------------------+
| ContentLength |
| Guardrail |
| (output check) |
+------------------+
|
v
Result
Code¶
"""Production-ready agent with all safety layers composed together."""
import asyncio
from cognitia.agent.agent import Agent
from cognitia.agent.config import AgentConfig
from cognitia.agent.middleware import (
BudgetExceededError,
CostTracker,
Middleware,
SecurityGuard,
ToolOutputCompressor,
build_middleware_stack,
)
from cognitia.agent.result import Result
from cognitia.guardrails import (
ContentLengthGuardrail,
GuardrailContext,
GuardrailResult,
RegexGuardrail,
)
from cognitia.input_filters import MaxTokensFilter, SystemPromptInjector
from cognitia.retry import ExponentialBackoff, ModelFallbackChain
from cognitia.runtime.cost import CostBudget, CostTracker as RuntimeCostTracker, load_pricing
# --- 1. Guardrails: pre/post-LLM validation ---
input_length_guard = ContentLengthGuardrail(max_length=50_000)
output_length_guard = ContentLengthGuardrail(max_length=100_000)
pii_guard = RegexGuardrail(
patterns=[
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # credit card
],
reason="PII detected in text",
)
async def validate_input(prompt: str, session_id: str | None = None) -> GuardrailResult:
"""Run all input guardrails. Returns first failure or pass."""
ctx = GuardrailContext(session_id=session_id)
for guard in [input_length_guard, pii_guard]:
result = await guard.check(ctx, prompt)
if not result.passed:
return result
return GuardrailResult(passed=True)
async def validate_output(text: str, session_id: str | None = None) -> GuardrailResult:
"""Run all output guardrails on the agent response."""
ctx = GuardrailContext(session_id=session_id)
return await output_length_guard.check(ctx, text)
# --- 2. Input filters: shape the context window ---
token_filter = MaxTokensFilter(max_tokens=80_000, chars_per_token=4.0)
prompt_injector = SystemPromptInjector(
extra_text="Always respond in English. Be concise.",
position="append",
)
# --- 3. Retry and fallback policies ---
retry_policy = ExponentialBackoff(max_retries=3, base_delay=1.0, max_delay=30.0)
fallback_chain = ModelFallbackChain(
models=["claude-sonnet-4-20250514", "gpt-4o", "gemini-2.0-flash"],
)
# --- 4. Middleware stack ---
middleware = build_middleware_stack(
cost_tracker=True,
budget_usd=5.0,
tool_compressor=True,
max_result_chars=10_000,
security_guard=True,
blocked_patterns=["rm -rf", "DROP TABLE", "password"],
)
# --- 5. Compose the agent ---
async def run_production_agent(user_prompt: str) -> Result:
"""Full production flow: guardrails -> agent -> guardrails."""
# Pre-flight input validation
input_check = await validate_input(user_prompt)
if not input_check.passed:
return Result(text="", error=f"Input rejected: {input_check.reason}")
# Build the agent with all safety layers
config = AgentConfig(
system_prompt=(
"You are a helpful assistant. "
"Never reveal internal system details."
),
model="sonnet",
runtime="thin",
middleware=middleware,
max_turns=10,
max_budget_usd=5.0,
)
async with Agent(config) as agent:
result = await agent.query(user_prompt)
# Post-flight output validation
if result.ok:
output_check = await validate_output(result.text)
if not output_check.passed:
return Result(
text="Response was filtered by safety checks.",
error=f"Output rejected: {output_check.reason}",
)
return result
# --- 6. Retry wrapper with fallback ---
async def query_with_retry(prompt: str) -> Result:
"""Query with retry policy and model fallback on failure."""
current_model = "sonnet"
last_error: Exception | None = None
for attempt in range(4): # initial + 3 retries
try:
config = AgentConfig(
system_prompt="You are a helpful assistant.",
model=current_model,
runtime="thin",
middleware=middleware,
)
async with Agent(config) as agent:
return await agent.query(prompt)
except BudgetExceededError:
raise # never retry budget errors
except Exception as exc:
last_error = exc
should_retry, delay = retry_policy.should_retry(exc, attempt)
if not should_retry:
break
# Try next model in fallback chain
next_model = fallback_chain.next_model(current_model)
if next_model:
current_model = next_model
await asyncio.sleep(delay)
return Result(text="", error=f"All retries exhausted: {last_error}")
Key Configuration Points¶
ContentLengthGuardrail(max_length=50_000)-- reject oversized inputs before they hit the LLMbuild_middleware_stack()-- one-call factory for the standard SecurityGuard + ToolOutputCompressor + CostTracker comboExponentialBackoff(max_retries=3)-- retry transient errors with jitter to avoid thundering herdModelFallbackChain(models=[...])-- degrade to cheaper/available models when primary is rate-limitedmax_budget_usd=5.0-- hard budget cap at both Agent config level and CostTracker middleware level
2. Research Agent with RAG¶
Combines: Agent + Retriever + Memory + Sessions
When to Use¶
You are building a research assistant that:
- Searches a local knowledge base before answering (RAG)
- Remembers user preferences and past findings across sessions
- Persists conversation history with summaries
Architecture¶
User Query
|
v
+--------------------+
| SimpleRetriever | documents[]
| (word overlap or |<----- loaded from files/DB
| custom embedding) |
+--------------------+
|
v retrieved docs
+--------------------+
| RagInputFilter | injects <context>...</context>
| | into system_prompt
+--------------------+
|
v
+--------------------+
| Agent |
| runtime: "thin" |
+--------------------+
|
v result
+------------------------------------+
| InMemoryMemoryProvider |
| save_message(user_id, topic_id) |
| upsert_fact(user_id, key, val) |
| save_summary(user_id, topic_id) |
+------------------------------------+
|
v
+------------------------------------+
| SqliteSessionBackend |
| save(key, state) |
| load(key) -> state |
+------------------------------------+
Code¶
"""Research agent with RAG, persistent memory, and session management."""
import asyncio
from cognitia.agent.agent import Agent
from cognitia.agent.config import AgentConfig
from cognitia.memory.inmemory import InMemoryMemoryProvider
from cognitia.rag import Document, RagInputFilter, SimpleRetriever
from cognitia.session.backends import (
InMemorySessionBackend,
MemoryScope,
SqliteSessionBackend,
scoped_key,
)
# --- 1. Build a knowledge base ---
knowledge_base = [
Document(
content="Python asyncio provides an event loop for concurrent I/O operations.",
metadata={"source": "python-docs", "topic": "asyncio"},
),
Document(
content="FastAPI uses Starlette for ASGI and Pydantic for data validation.",
metadata={"source": "fastapi-docs", "topic": "web"},
),
Document(
content="SQLAlchemy 2.0 supports both sync and async database sessions.",
metadata={"source": "sqlalchemy-docs", "topic": "database"},
),
Document(
content="Cognitia agents support thin, claude_sdk, and deepagents runtimes.",
metadata={"source": "cognitia-docs", "topic": "agents"},
),
]
retriever = SimpleRetriever(documents=knowledge_base)
rag_filter = RagInputFilter(retriever=retriever, top_k=3)
# --- 2. Memory provider for facts and history ---
memory = InMemoryMemoryProvider()
# --- 3. Session backend for persistence ---
# Use SqliteSessionBackend for persistence across restarts:
# session_backend = SqliteSessionBackend(db_path="research_sessions.db")
session_backend = InMemorySessionBackend()
USER_ID = "researcher-1"
TOPIC_ID = "python-async"
async def research_query(query: str) -> str:
"""Execute a research query with RAG context injection."""
# Retrieve relevant documents
from cognitia.runtime.types import Message
messages = [Message(role="user", content=query)]
filtered_messages, enriched_prompt = await rag_filter.filter(
messages,
"You are a research assistant. Use the provided context to answer "
"accurately. Cite sources when possible.",
)
# Build agent with enriched system prompt
config = AgentConfig(
system_prompt=enriched_prompt,
model="sonnet",
runtime="thin",
)
async with Agent(config) as agent:
result = await agent.query(query)
if result.ok:
# Persist conversation to memory
await memory.save_message(USER_ID, TOPIC_ID, "user", query)
await memory.save_message(USER_ID, TOPIC_ID, "assistant", result.text)
# Extract and store facts (in production, use LLM extraction)
await memory.upsert_fact(
USER_ID, f"last_query_{TOPIC_ID}", query, source="system"
)
# Persist session state
session_key = scoped_key(MemoryScope.AGENT, f"{USER_ID}:{TOPIC_ID}")
msg_count = await memory.count_messages(USER_ID, TOPIC_ID)
await session_backend.save(session_key, {
"user_id": USER_ID,
"topic_id": TOPIC_ID,
"message_count": msg_count,
"model": "sonnet",
})
return result.text if result.ok else f"Error: {result.error}"
async def get_session_context() -> dict | None:
"""Load previous session context for continuity."""
session_key = scoped_key(MemoryScope.AGENT, f"{USER_ID}:{TOPIC_ID}")
state = await session_backend.load(session_key)
if state:
# Load conversation history
messages = await memory.get_messages(USER_ID, TOPIC_ID, limit=20)
facts = await memory.get_facts(USER_ID, topic_id=TOPIC_ID)
summary = await memory.get_summary(USER_ID, TOPIC_ID)
return {
"session": state,
"history_length": len(messages),
"facts": facts,
"summary": summary,
}
return None
async def summarize_session() -> None:
"""Create a summary of the current research session."""
messages = await memory.get_messages(USER_ID, TOPIC_ID, limit=50)
if len(messages) >= 5:
# In production, use LLM to generate summary
topics = set()
for msg in messages:
if msg.role == "user":
topics.add(msg.content[:50])
summary_text = f"Research session covering: {'; '.join(topics)}"
await memory.save_summary(
USER_ID, TOPIC_ID, summary_text, messages_covered=len(messages)
)
async def main() -> None:
# Check for existing session
ctx = await get_session_context()
if ctx:
print(f"Resuming session: {ctx['history_length']} messages, facts={ctx['facts']}")
# Run research queries
answer = await research_query("How does asyncio work with FastAPI?")
print(f"Answer: {answer[:200]}...")
# Summarize after multiple queries
await summarize_session()
if __name__ == "__main__":
asyncio.run(main())
Key Configuration Points¶
SimpleRetriever-- swap with a customRetrieverimplementation backed by a vector database (Chroma, Pinecone, pgvector) for production embeddingsRagInputFilter-- automatically injects<context>...</context>blocks into the system prompt; works with any runtimeInMemoryMemoryProvider-- swap withSqliteMemoryProviderorPostgresMemoryProviderfor persistence; same APIscoped_key(MemoryScope.AGENT, ...)-- namespace isolation prevents session data leakage between agents
3. Multi-Agent Team¶
Combines: AgentTool + TaskQueue + AgentRegistry
When to Use¶
You are building a system where:
- An orchestrator agent delegates subtasks to specialist agents
- Tasks are queued by priority and claimed by available workers
- Agent lifecycle is tracked in a central registry
Architecture¶
+---------------------+
| Orchestrator |
| Agent |
+----------+----------+
|
+----------------+----------------+
| | |
v v v
+---------+------+ +------+--------+ +-----+---------+
| math_expert | | code_reviewer | | data_analyst |
| (agent-as-tool)| | (agent-as-tool)| | (agent-as-tool)|
+---------+------+ +------+--------+ +-----+---------+
| | |
v v v
+---------+------+ +------+--------+ +-----+---------+
| AgentRecord | | AgentRecord | | AgentRecord |
| in Registry | | in Registry | | in Registry |
+----------------+ +---------------+ +---------------+
| | |
+----------------+----------------+
|
v
+--------+--------+
| InMemoryTask |
| Queue |
| (priority- |
| based) |
+-----------------+
Code¶
"""Multi-agent team: orchestrator delegates to specialists via task queue."""
import asyncio
import time
from collections.abc import AsyncIterator
from cognitia.multi_agent.agent_registry import InMemoryAgentRegistry
from cognitia.multi_agent.agent_tool import create_agent_tool_spec, execute_agent_tool
from cognitia.multi_agent.registry_types import AgentFilter, AgentRecord, AgentStatus
from cognitia.multi_agent.task_queue import InMemoryTaskQueue
from cognitia.multi_agent.task_types import TaskFilter, TaskItem, TaskPriority, TaskStatus
from cognitia.runtime.types import Message, RuntimeEvent, ToolSpec
# --- 1. Agent Registry: track all agents ---
registry = InMemoryAgentRegistry()
task_queue = InMemoryTaskQueue()
async def setup_team() -> None:
"""Register the orchestrator and specialist agents."""
agents = [
AgentRecord(
id="orchestrator",
name="Team Lead",
role="lead",
runtime_name="thin",
budget_limit_usd=10.0,
),
AgentRecord(
id="math-expert",
name="Math Expert",
role="specialist",
parent_id="orchestrator",
runtime_name="thin",
metadata={"specialty": "mathematics"},
),
AgentRecord(
id="code-reviewer",
name="Code Reviewer",
role="specialist",
parent_id="orchestrator",
runtime_name="thin",
metadata={"specialty": "code-review"},
),
AgentRecord(
id="data-analyst",
name="Data Analyst",
role="specialist",
parent_id="orchestrator",
runtime_name="thin",
metadata={"specialty": "data-analysis"},
),
]
for agent in agents:
await registry.register(agent)
# --- 2. Mock specialist runtimes ---
async def math_expert_run(
*, messages: list[Message], system_prompt: str, active_tools: list[ToolSpec], **kw
) -> AsyncIterator[RuntimeEvent]:
"""Simulated math expert agent."""
query = messages[-1].content if messages else ""
response = f"[Math Expert] Analysis of '{query}': The derivative is 2x + 3."
yield RuntimeEvent.assistant_delta(text=response)
yield RuntimeEvent.final(text=response, new_messages=[])
async def code_reviewer_run(
*, messages: list[Message], system_prompt: str, active_tools: list[ToolSpec], **kw
) -> AsyncIterator[RuntimeEvent]:
"""Simulated code reviewer agent."""
query = messages[-1].content if messages else ""
response = f"[Code Reviewer] Review of '{query}': Code looks clean. Consider adding type hints."
yield RuntimeEvent.assistant_delta(text=response)
yield RuntimeEvent.final(text=response, new_messages=[])
# Map agent IDs to their runtime functions
AGENT_RUNTIMES = {
"math-expert": math_expert_run,
"code-reviewer": code_reviewer_run,
}
# --- 3. Create tool specs for agent-as-tool ---
agent_tool_specs = {
"math-expert": create_agent_tool_spec(
name="math_expert",
description="Delegate a math problem to the Math Expert agent.",
),
"code-reviewer": create_agent_tool_spec(
name="code_reviewer",
description="Request a code review from the Code Reviewer agent.",
),
}
# --- 4. Orchestrator workflow ---
async def orchestrate(task_description: str) -> dict[str, str]:
"""Orchestrator: create tasks, dispatch to specialists, collect results."""
results: dict[str, str] = {}
# Create tasks in the queue
tasks = [
TaskItem(
id="task-math-1",
title="Solve the math component",
description=task_description,
priority=TaskPriority.HIGH,
assignee_agent_id="math-expert",
created_at=time.time(),
),
TaskItem(
id="task-review-1",
title="Review the solution code",
description=task_description,
priority=TaskPriority.MEDIUM,
assignee_agent_id="code-reviewer",
created_at=time.time(),
),
]
for task in tasks:
await task_queue.put(task)
# Process tasks by dispatching to specialist agents
for task in tasks:
agent_id = task.assignee_agent_id
if agent_id is None or agent_id not in AGENT_RUNTIMES:
continue
# Update agent status in registry
await registry.update_status(agent_id, AgentStatus.RUNNING)
# Execute the specialist agent as a tool
run_fn = AGENT_RUNTIMES[agent_id]
agent_result = await execute_agent_tool(
run_fn=run_fn,
query=task.description,
system_prompt=f"You are a specialist agent. Task: {task.title}",
timeout_seconds=30.0,
)
# Record result and update task status
if agent_result.success:
results[agent_id] = agent_result.output
await task_queue.complete(task.id)
else:
results[agent_id] = f"ERROR: {agent_result.error}"
await task_queue.cancel(task.id)
await registry.update_status(agent_id, AgentStatus.IDLE)
return results
async def main() -> None:
await setup_team()
# Show registered team
team = await registry.list_agents(AgentFilter(parent_id="orchestrator"))
print("Team members:")
for agent in team:
print(f" {agent.name} (role={agent.role}, specialty={agent.metadata.get('specialty')})")
# Run orchestration
results = await orchestrate("Analyze the function f(x) = x^2 + 3x and review the implementation")
print("\nResults:")
for agent_id, output in results.items():
print(f" {agent_id}: {output}")
# Show final task states
all_tasks = await task_queue.list_tasks()
print("\nTask states:")
for task in all_tasks:
print(f" [{task.status.value}] {task.title}")
if __name__ == "__main__":
asyncio.run(main())
Key Configuration Points¶
create_agent_tool_spec(name, description)-- generates aToolSpecwith a singlequeryparameter so the orchestrator LLM can call specialist agents like regular toolsexecute_agent_tool(run_fn, query, timeout_seconds=30)-- runs any async runtime generator and collects the final result with a timeoutInMemoryTaskQueue-- swap withSqliteTaskQueue(db_path="tasks.db")for persistence across restarts; same APIAgentRecord(parent_id="orchestrator")-- establishes hierarchy; query withAgentFilter(parent_id=...)to find a leader's team
4. Conversational Agent with Memory¶
Combines: Conversation + SessionBackend + MemoryProvider
When to Use¶
You are building a chatbot or assistant that:
- Maintains multi-turn conversation context within a session
- Persists sessions to disk (survives restarts)
- Remembers user facts and preferences across sessions
- Summarizes old conversations to manage context window size
Architecture¶
User
|
| say("Hello")
v
+---------------------+
| Conversation |
| session_id: "abc" |
| history: Message[] |
+----------+----------+
|
| executes via Agent
v
+---------------------+
| Agent |
| runtime: "thin" |
| middleware: [...] |
+----------+----------+
|
v Result
+---------------------+ +-------------------------+
| Conversation | | SqliteSessionBackend |
| appends to history |---->| save(key, state) |
+---------------------+ | load(key) -> state |
+-------------------------+
|
v
+-------------------------+
| InMemoryMemoryProvider |
| save_message() |
| upsert_fact() |
| save_summary() |
+-------------------------+
Code¶
"""Conversational agent with session persistence and long-term memory."""
import asyncio
from cognitia.agent.agent import Agent
from cognitia.agent.config import AgentConfig
from cognitia.agent.conversation import Conversation
from cognitia.agent.middleware import CostTracker, ToolOutputCompressor
from cognitia.memory.inmemory import InMemoryMemoryProvider
from cognitia.session.backends import (
InMemorySessionBackend,
MemoryScope,
SqliteSessionBackend,
scoped_key,
)
# --- 1. Long-term memory ---
memory = InMemoryMemoryProvider()
# For production persistence:
# from cognitia.memory.sqlite import SqliteMemoryProvider
# memory = SqliteMemoryProvider(db_path="memory.db")
# --- 2. Session persistence ---
# InMemorySessionBackend for dev, SqliteSessionBackend for production
session_backend = InMemorySessionBackend()
# session_backend = SqliteSessionBackend(db_path="chat_sessions.db")
# --- 3. Build the conversational agent ---
USER_ID = "user-42"
TOPIC_ID = "general-chat"
async def build_system_prompt() -> str:
"""Build a system prompt enriched with remembered facts."""
base_prompt = "You are a friendly assistant. Remember details the user shares."
# Load known facts about this user
facts = await memory.get_facts(USER_ID, topic_id=TOPIC_ID)
if facts:
facts_text = "\n".join(f"- {k}: {v}" for k, v in facts.items())
base_prompt += f"\n\nKnown facts about this user:\n{facts_text}"
# Load previous summary if available
summary = await memory.get_summary(USER_ID, TOPIC_ID)
if summary:
base_prompt += f"\n\nPrevious conversation summary:\n{summary}"
return base_prompt
async def chat_session(user_messages: list[str]) -> None:
"""Run a multi-turn chat session with persistence."""
system_prompt = await build_system_prompt()
cost_tracker = CostTracker(budget_usd=2.0)
config = AgentConfig(
system_prompt=system_prompt,
model="sonnet",
runtime="thin",
middleware=(
ToolOutputCompressor(max_result_chars=5000),
cost_tracker,
),
)
agent = Agent(config)
session_key = scoped_key(MemoryScope.AGENT, f"{USER_ID}:{TOPIC_ID}")
# Load previous session state
prev_state = await session_backend.load(session_key)
turn_offset = prev_state.get("turn_count", 0) if prev_state else 0
async with agent.conversation(session_id=f"{USER_ID}:{TOPIC_ID}") as conv:
for i, user_msg in enumerate(user_messages):
turn_num = turn_offset + i + 1
# Send message and get response
result = await conv.say(user_msg)
if result.ok:
print(f"[Turn {turn_num}] User: {user_msg}")
print(f"[Turn {turn_num}] Assistant: {result.text[:200]}")
# Persist to long-term memory
await memory.save_message(USER_ID, TOPIC_ID, "user", user_msg)
await memory.save_message(USER_ID, TOPIC_ID, "assistant", result.text)
else:
print(f"[Turn {turn_num}] Error: {result.error}")
# Save session state for next time
await session_backend.save(session_key, {
"user_id": USER_ID,
"topic_id": TOPIC_ID,
"turn_count": turn_offset + len(user_messages),
"cost_usd": cost_tracker.total_cost_usd,
})
# Summarize if conversation is getting long
msg_count = await memory.count_messages(USER_ID, TOPIC_ID)
if msg_count > 20:
await memory.save_summary(
USER_ID, TOPIC_ID,
summary=f"Extended conversation with {msg_count} messages.",
messages_covered=msg_count,
)
# Trim old messages, keep last 10
await memory.delete_messages_before(USER_ID, TOPIC_ID, keep_last=10)
async def remember_fact(key: str, value: str) -> None:
"""Store a user fact for future sessions."""
await memory.upsert_fact(USER_ID, key, value, topic_id=TOPIC_ID, source="user")
async def main() -> None:
# Session 1: introduce yourself
await remember_fact("name", "Alice")
await remember_fact("language", "Python")
await chat_session([
"Hi, my name is Alice. I write Python.",
"What async frameworks do you recommend?",
"Tell me more about FastAPI.",
])
# Session 2: agent remembers Alice
print("\n--- New session (facts persisted) ---")
await chat_session([
"What was my name again?",
"What language do I use?",
])
if __name__ == "__main__":
asyncio.run(main())
Key Configuration Points¶
agent.conversation(session_id=...)-- creates aConversationthat maintains message history across turns within the same sessionSqliteSessionBackend-- drop-in replacement forInMemorySessionBackend; persists across process restarts with zero configmemory.save_summary()+memory.delete_messages_before(keep_last=10)-- sliding window pattern to keep context manageablebuild_system_prompt()-- inject remembered facts and summaries into each new session's system prompt
5. Cost-Controlled Pipeline¶
Combines: CostBudget + ModelFallbackChain + CancellationToken
When to Use¶
You are running a multi-step LLM pipeline (e.g., research, summarize, format) and need:
- A total cost budget across all steps
- Graceful degradation to cheaper models when budget runs low
- Cooperative cancellation if budget is exceeded mid-pipeline
Architecture¶
Pipeline Start
|
v
+------------------+
| CostBudget | max_cost_usd=2.0
| CostTracker | tracks cumulative spend
+--------+---------+
|
v
+--------+---------+ budget OK?
| Step 1: Research |-----------> continue
| model: sonnet | exceeded?
+--------+---------+-----------> fallback to cheaper model
| or cancel pipeline
v
+--------+---------+
| Step 2: Analyze |
| model: (current) |
+--------+---------+
|
v
+--------+---------+
| Step 3: Format |
| model: (current) |
+--------+---------+
|
v
+------------------+
| CancellationToken| checked between steps
| .is_cancelled | callbacks fire on cancel
+------------------+
|
v
Pipeline Result
Code¶
"""Cost-controlled pipeline with model fallback and cooperative cancellation."""
import asyncio
from cognitia.agent.agent import Agent
from cognitia.agent.config import AgentConfig
from cognitia.agent.middleware import BudgetExceededError, CostTracker
from cognitia.agent.result import Result
from cognitia.retry import ModelFallbackChain
from cognitia.runtime.cancellation import CancellationToken
from cognitia.runtime.cost import CostBudget, CostTracker as RuntimeCostTracker, load_pricing
# --- 1. Budget and pricing setup ---
pricing = load_pricing()
budget = CostBudget(max_cost_usd=2.0, action_on_exceed="error")
cost_tracker = RuntimeCostTracker(budget=budget, pricing=pricing)
# --- 2. Model fallback chain: expensive -> cheap ---
fallback = ModelFallbackChain(
models=[
"claude-sonnet-4-20250514", # best quality
"gpt-4o-mini", # cheaper alternative
"gemini-2.0-flash", # cheapest fallback
],
)
# --- 3. Pipeline steps ---
async def run_step(
step_name: str,
prompt: str,
model: str,
cancel_token: CancellationToken,
middleware_tracker: CostTracker,
) -> tuple[str, str]:
"""Execute a single pipeline step. Returns (output_text, model_used).
Tries the given model first. If budget exceeded, falls back to cheaper model.
Checks cancellation token before executing.
"""
if cancel_token.is_cancelled:
return f"[{step_name}] CANCELLED", model
current_model = model
for _attempt in range(3): # up to 3 fallback attempts
try:
config = AgentConfig(
system_prompt=f"You are executing step: {step_name}. Be concise.",
model=current_model,
runtime="thin",
middleware=(middleware_tracker,),
)
async with Agent(config) as agent:
result = await agent.query(prompt)
if result.ok:
return result.text, current_model
return f"[{step_name}] Error: {result.error}", current_model
except BudgetExceededError:
# Try a cheaper model
next_model = fallback.next_model(current_model)
if next_model is None:
# No more fallbacks -- cancel the entire pipeline
cancel_token.cancel()
return f"[{step_name}] Budget exceeded, no cheaper model available", current_model
print(f" [{step_name}] Budget pressure, falling back: {current_model} -> {next_model}")
current_model = next_model
return f"[{step_name}] All fallbacks exhausted", current_model
async def run_pipeline(topic: str) -> dict[str, str]:
"""Execute a 3-step research pipeline with budget control."""
cancel_token = CancellationToken()
middleware_tracker = CostTracker(budget_usd=2.0)
# Register cleanup callback
cleanup_done = False
def on_pipeline_cancel() -> None:
nonlocal cleanup_done
cleanup_done = True
print(" [Pipeline] Cancellation triggered -- cleaning up resources")
cancel_token.on_cancel(on_pipeline_cancel)
results: dict[str, str] = {}
current_model = "claude-sonnet-4-20250514"
# Step 1: Research
print("Step 1: Research")
output, current_model = await run_step(
"research",
f"Research the topic: {topic}. List 3 key findings.",
current_model,
cancel_token,
middleware_tracker,
)
results["research"] = output
# Step 2: Analyze (uses model from step 1 -- may have degraded)
print("Step 2: Analyze")
output, current_model = await run_step(
"analyze",
f"Based on this research, provide analysis:\n{results['research'][:500]}",
current_model,
cancel_token,
middleware_tracker,
)
results["analyze"] = output
# Step 3: Format
print("Step 3: Format")
output, current_model = await run_step(
"format",
f"Format this analysis as a brief executive summary:\n{results['analyze'][:500]}",
current_model,
cancel_token,
middleware_tracker,
)
results["format"] = output
# Summary
results["_metadata"] = (
f"model_used={current_model}, "
f"total_cost=${middleware_tracker.total_cost_usd:.4f}, "
f"cancelled={cancel_token.is_cancelled}, "
f"cleanup_ran={cleanup_done}"
)
return results
# --- 4. Budget monitoring (standalone) ---
def check_budget_health() -> str:
"""Check budget status using the runtime-level cost tracker."""
status = cost_tracker.check_budget()
remaining = (budget.max_cost_usd or 0) - cost_tracker.total_cost_usd
return f"Status: {status}, spent: ${cost_tracker.total_cost_usd:.4f}, remaining: ${remaining:.4f}"
async def main() -> None:
results = await run_pipeline("The impact of LLMs on software development")
print("\n=== Pipeline Results ===")
for step, output in results.items():
if step.startswith("_"):
print(f"\nMetadata: {output}")
else:
print(f"\n[{step}]\n{output[:300]}")
if __name__ == "__main__":
asyncio.run(main())
Key Configuration Points¶
CostBudget(max_cost_usd=2.0, action_on_exceed="error")-- hard budget cap;"warn"mode logs but continuesModelFallbackChain(models=[...])-- ordered from most expensive to cheapest;next_model()returnsNonewhen chain is exhaustedCancellationToken-- cooperative: each step checksis_cancelledbefore running;on_cancel()callbacks handle cleanupCostTracker(middleware) -- accumulates cost across all Agent calls in the pipeline; raisesBudgetExceededErrorwhen limit is hitRuntimeCostTracker-- lower-level tracker that works withload_pricing()for per-model token cost accounting
Combining Patterns¶
These patterns compose naturally. A production system might combine all five:
+------------------+
| Cost Budget | (Pattern 5)
| CancellationToken|
+--------+---------+
|
v
+--------+---------+
| Orchestrator | (Pattern 3)
| AgentRegistry |
| TaskQueue |
+--------+---------+
|
+---------------+---------------+
| |
v v
+---------+----------+ +---------+----------+
| Research Agent | | Review Agent |
| RAG + Memory | | Guardrails |
| (Pattern 2) | | (Pattern 1) |
+--------------------+ +--------------------+
| |
v v
+---------+----------+ +---------+----------+
| Conversation | | Conversation |
| SessionBackend | | SessionBackend |
| (Pattern 4) | | (Pattern 4) |
+--------------------+ +--------------------+
Each layer is independent and swappable. Start with the pattern that matches your immediate need, then add layers as requirements grow.