Multi-Agent Coordination¶
Cognitia provides building blocks for multi-agent systems: agent-as-tool invocation, task queues for work distribution, and an agent registry for lifecycle management. All components follow the protocol-first approach with swappable implementations.
Overview¶
Multi-agent coordination in Cognitia is built around three primitives:
| Primitive | Purpose | Protocol |
|---|---|---|
| Agent-as-Tool | Run one agent as a tool callable by another | AgentTool |
| Task Queue | Distribute work items between agents | TaskQueue |
| Agent Registry | Track agent lifecycle and metadata | AgentRegistry |
Each primitive has a protocol in cognitia.protocols.multi_agent and one or more implementations in cognitia.multi_agent.
Agent-as-Tool¶
The agent-as-tool pattern lets an orchestrating agent call a sub-agent as if it were a regular tool. The sub-agent runs to completion, and the orchestrator receives its output as a tool result.
Creating a Tool Spec¶
Use create_agent_tool_spec to define a tool that represents a sub-agent:
from cognitia.multi_agent import create_agent_tool_spec
spec = create_agent_tool_spec(
name="researcher",
description="Research a topic and return a summary",
)
# spec is a ToolSpec with a single required "query" parameter
The returned ToolSpec has a JSON Schema with one required string parameter query -- the prompt sent to the sub-agent.
Executing the Sub-Agent¶
Use execute_agent_tool to run a sub-agent runtime and collect the final result:
from cognitia.multi_agent import execute_agent_tool
result = await execute_agent_tool(
run_fn=sub_agent_runtime.run,
query="Summarize recent advances in quantum computing",
system_prompt="You are a research assistant. Be concise.",
timeout_seconds=120.0,
)
if result.success:
print(result.output)
else:
print(f"Failed: {result.error}")
Parameters:
run_fn-- async generator with signature(messages, system_prompt, active_tools) -> AsyncIterator[RuntimeEvent]query-- the user message sent to the sub-agentsystem_prompt-- system prompt for the sub-agent (default:"You are a helpful assistant.")timeout_seconds-- maximum execution time (default:60.0)
AgentToolResult¶
The result is a frozen dataclass with these fields:
from cognitia.multi_agent import AgentToolResult
# Fields:
# success: bool -- True if completed without error
# output: str -- final text from the sub-agent
# error: str | None -- error message if failed
# agent_id: str -- identifier of the agent (default: "")
# tokens_used: int -- token consumption (default: 0)
# cost_usd: float -- cost in USD (default: 0.0)
Full Example¶
from cognitia import Agent, AgentConfig
from cognitia.multi_agent import create_agent_tool_spec, execute_agent_tool
# 1. Create the sub-agent
sub_agent = Agent(AgentConfig(
system_prompt="You are a code reviewer. Review code for bugs and style issues.",
runtime="thin",
model="sonnet",
))
# 2. Create a tool spec for the orchestrator
reviewer_tool = create_agent_tool_spec(
name="code_reviewer",
description="Review code for bugs and style issues",
)
# 3. In the orchestrator's tool handler, execute the sub-agent
async def handle_tool_call(tool_name: str, args: dict) -> str:
if tool_name == "code_reviewer":
result = await execute_agent_tool(
run_fn=sub_agent._runtime.run,
query=args["query"],
system_prompt="You are a code reviewer.",
timeout_seconds=90.0,
)
return result.output if result.success else f"Error: {result.error}"
return "Unknown tool"
Task Queue¶
The task queue distributes work items between agents. Tasks have priority-based scheduling and lifecycle tracking.
Domain Types¶
from cognitia.multi_agent import TaskItem, TaskStatus, TaskPriority, TaskFilter
# Create a task
task = TaskItem(
id="task-001",
title="Analyze sales data",
description="Generate Q4 report from sales.csv",
priority=TaskPriority.HIGH,
assignee_agent_id="analyst-1",
)
# Filter tasks
pending = TaskFilter(status=TaskStatus.TODO)
my_tasks = TaskFilter(assignee_agent_id="analyst-1")
urgent = TaskFilter(priority=TaskPriority.CRITICAL)
TaskStatus values: TODO, IN_PROGRESS, DONE, CANCELLED.
TaskPriority values: LOW, MEDIUM, HIGH, CRITICAL.
InMemoryTaskQueue¶
Zero-dependency, thread-safe via asyncio.Lock:
from cognitia.multi_agent import InMemoryTaskQueue, TaskItem, TaskPriority
queue = InMemoryTaskQueue()
# Add tasks
await queue.put(TaskItem(id="t1", title="Research", priority=TaskPriority.HIGH))
await queue.put(TaskItem(id="t2", title="Write report", priority=TaskPriority.MEDIUM))
# Claim highest-priority unassigned task
task = await queue.get()
# task.id == "t1" and task.status == TaskStatus.IN_PROGRESS
# Claim a task pre-assigned to a specific agent
assigned = await queue.get(TaskFilter(assignee_agent_id="analyst-1"))
# Mark complete
await queue.complete("t1") # returns True
# List all tasks
all_tasks = await queue.list_tasks()
# List filtered
from cognitia.multi_agent import TaskFilter, TaskStatus
pending = await queue.list_tasks(TaskFilter(status=TaskStatus.TODO))
SqliteTaskQueue¶
File-based persistence using SQLite. Uses asyncio.to_thread() for non-blocking I/O:
from cognitia.multi_agent import SqliteTaskQueue, TaskItem, TaskPriority
queue = SqliteTaskQueue(db_path="tasks.db")
await queue.put(TaskItem(id="t1", title="Process data", priority=TaskPriority.HIGH))
task = await queue.get()
# Clean up when done
queue.close()
The API is identical to InMemoryTaskQueue. The SQLite backend persists tasks across restarts.
Task Queue API¶
All implementations expose these 5 methods (ISP-compliant):
| Method | Signature | Description |
|---|---|---|
put | (item: TaskItem) -> None | Add a task to the queue |
get | (filters?) -> TaskItem \| None | Claim highest-priority matching TODO task |
complete | (task_id: str) -> bool | Mark task as DONE |
cancel | (task_id: str) -> bool | Mark task as CANCELLED |
list_tasks | (filters?) -> list[TaskItem] | List tasks matching filters |
get() only claims tasks with status TODO. Without assignee_agent_id, it only returns unassigned tasks. With TaskFilter(assignee_agent_id="..."), it only returns tasks pre-assigned to that agent. A claimed task is persisted as IN_PROGRESS. Tasks in DONE or CANCELLED status are terminal and cannot be completed or cancelled again.
Agent Registry¶
The agent registry tracks registered agents, their roles, statuses, and metadata.
Domain Types¶
from cognitia.multi_agent import AgentRecord, AgentStatus, AgentFilter
# Register an agent
record = AgentRecord(
id="researcher-1",
name="Research Agent",
role="researcher",
runtime_name="thin",
runtime_config={"model": "sonnet"},
budget_limit_usd=5.0,
)
# Filter agents
idle_researchers = AgentFilter(role="researcher", status=AgentStatus.IDLE)
AgentStatus values: IDLE, RUNNING, STOPPED.
AgentRecord fields:
| Field | Type | Default | Description |
|---|---|---|---|
id | str | required | Unique agent identifier |
name | str | required | Human-readable name |
role | str | required | Agent role (e.g. "researcher", "coder") |
parent_id | str \| None | None | Parent agent ID for hierarchies |
runtime_name | str | "thin" | Runtime to use |
runtime_config | dict | {} | Runtime-specific config |
status | AgentStatus | IDLE | Current lifecycle status |
budget_limit_usd | float \| None | None | Spending cap |
metadata | dict | {} | Arbitrary key-value data |
InMemoryAgentRegistry¶
Thread-safe in-memory implementation:
from cognitia.multi_agent import InMemoryAgentRegistry, AgentRecord, AgentStatus, AgentFilter
registry = InMemoryAgentRegistry()
# Register agents
await registry.register(AgentRecord(
id="coder-1", name="Coder", role="coder",
))
await registry.register(AgentRecord(
id="reviewer-1", name="Reviewer", role="reviewer",
))
# Look up by ID
agent = await registry.get("coder-1")
# List all agents with a specific role
coders = await registry.list_agents(AgentFilter(role="coder"))
# Update lifecycle status
await registry.update_status("coder-1", AgentStatus.RUNNING)
# Remove an agent
await registry.remove("reviewer-1")
Duplicate IDs
register() raises ValueError if an agent with the same id is already registered.
Agent Registry API¶
All implementations expose these 5 methods (ISP-compliant):
| Method | Signature | Description |
|---|---|---|
register | (record: AgentRecord) -> None | Register a new agent |
get | (agent_id: str) -> AgentRecord \| None | Get agent by ID |
list_agents | (filters?) -> list[AgentRecord] | List agents matching filters |
update_status | (agent_id, status) -> bool | Update agent status |
remove | (agent_id: str) -> bool | Remove agent from registry |
Protocols¶
All multi-agent components are defined as @runtime_checkable protocols in cognitia.protocols.multi_agent:
AgentTool Protocol¶
Single-method protocol for exposing an agent as a tool:
@runtime_checkable
class AgentTool(Protocol):
def as_tool(self, name: str, description: str) -> ToolSpec: ...
TaskQueue Protocol¶
Five async methods for task lifecycle:
@runtime_checkable
class TaskQueue(Protocol):
async def put(self, item: TaskItem) -> None: ...
async def get(self, filters: TaskFilter | None = None) -> TaskItem | None: ...
async def complete(self, task_id: str) -> bool: ...
async def cancel(self, task_id: str) -> bool: ...
async def list_tasks(self, filters: TaskFilter | None = None) -> list[TaskItem]: ...
AgentRegistry Protocol¶
Five async methods for agent lifecycle management:
@runtime_checkable
class AgentRegistry(Protocol):
async def register(self, record: AgentRecord) -> None: ...
async def get(self, agent_id: str) -> AgentRecord | None: ...
async def list_agents(self, filters: AgentFilter | None = None) -> list[AgentRecord]: ...
async def update_status(self, agent_id: str, status: AgentStatus) -> bool: ...
async def remove(self, agent_id: str) -> bool: ...
Custom Implementations¶
To create a custom implementation, implement the corresponding protocol. Use isinstance() checks at runtime thanks to @runtime_checkable:
from cognitia.protocols.multi_agent import TaskQueue
from cognitia.multi_agent import TaskItem, TaskFilter
class RedisTaskQueue:
"""Redis-backed task queue."""
def __init__(self, redis_url: str) -> None:
self._url = redis_url
# Initialize Redis connection
async def put(self, item: TaskItem) -> None:
# Store task in Redis sorted set by priority
...
async def get(self, filters: TaskFilter | None = None) -> TaskItem | None:
# Atomically claim highest-priority matching TODO task
...
async def complete(self, task_id: str) -> bool:
# Move task to completed set
...
async def cancel(self, task_id: str) -> bool:
# Move task to cancelled set
...
async def list_tasks(self, filters: TaskFilter | None = None) -> list[TaskItem]:
# Query tasks matching filters
...
# Verify protocol compliance
assert isinstance(RedisTaskQueue("redis://localhost"), TaskQueue)
The same pattern applies for AgentRegistry -- implement all 5 methods with matching signatures.
Next Steps¶
- CLI Runtime -- run external CLI agents as subprocesses
- Orchestration -- planning mode, subagents, team coordination
- Runtimes -- available runtime backends
- Architecture -- Clean Architecture layers and protocol design