Agent Graph System¶
Cognitia's Agent Graph System models hierarchical multi-agent organizations as directed graphs. Each agent occupies a node with a role, capabilities, and position in a chain of command. A root agent decomposes goals into subtasks, delegates them down the hierarchy, and collects results -- all with governance guardrails, DAG-based task scheduling, and inter-agent messaging.
Overview¶
The Agent Graph System is built around six primitives:
| Primitive | Purpose | Protocol |
|---|---|---|
| Agent Graph | Hierarchical tree of agent nodes | AgentGraphStore, AgentGraphQuery |
| Task Board | Hierarchical tasks with atomic checkout and DAG deps | GraphTaskBoard, GraphTaskScheduler, GraphTaskBlocker |
| Orchestrator | Execution engine: goal decomposition, delegation, retry | GraphOrchestrator, GraphTaskWaiter |
| Communication | Direct, broadcast, and escalation messaging | GraphCommunication |
| Governance | Capability-based permissions and global limits | AgentCapabilities, GraphGovernanceConfig |
| Graph Tools | Agent-callable tools: hire, delegate, escalate | create_graph_tools factory |
Each primitive has a @runtime_checkable protocol in cognitia.protocols and one or more implementations in cognitia.multi_agent.
When to use: You need a structured multi-agent system with reporting lines, permission controls, and task hierarchies -- rather than flat agent-as-tool invocation. Typical use cases: autonomous software teams, research organizations, multi-step workflows with approval gates.
Quick Start¶
Build a minimal two-agent graph and run a task:
from cognitia.multi_agent import InMemoryAgentGraph, InMemoryGraphTaskBoard
from cognitia.multi_agent.graph_builder import GraphBuilder
from cognitia.multi_agent.graph_orchestrator import DefaultGraphOrchestrator
from cognitia.multi_agent.graph_types import AgentCapabilities
# 1. Create storage backends
graph = InMemoryAgentGraph()
task_board = InMemoryGraphTaskBoard()
# 2. Build the agent hierarchy
builder = GraphBuilder(graph)
builder.add_root(
"lead", "Tech Lead", "lead",
system_prompt="You decompose tasks and delegate to engineers.",
capabilities=AgentCapabilities(can_hire=True, can_delegate=True),
)
builder.add_child(
"eng1", "lead", "Engineer", "engineer",
system_prompt="You write Python code.",
allowed_tools=("file_write", "run_tests"),
)
snapshot = await builder.build()
# 3. Define the agent runner (your LLM call)
async def run_agent(agent_id: str, task_id: str, goal: str, system_prompt: str) -> str:
# Call your LLM here with the system_prompt and goal
return f"Result from {agent_id}: done"
# 4. Start the orchestrator
orchestrator = DefaultGraphOrchestrator(
graph=graph,
task_board=task_board,
agent_runner=run_agent,
max_concurrent=5,
max_retries=2,
)
run_id = await orchestrator.start("Build a REST API for user management")
# 5. Wait for the root task and get results
status = await orchestrator.get_status(run_id)
result = await orchestrator.wait_for_task(status.root_task_id, timeout=120.0)
AgentNode¶
AgentNode is a frozen dataclass representing an agent in the graph:
from cognitia.multi_agent.graph_types import AgentNode, AgentCapabilities
node = AgentNode(
id="eng1",
name="Backend Engineer",
role="engineer",
system_prompt="You are a backend engineer. Write clean Python code.",
parent_id="lead",
allowed_tools=("file_write", "run_tests", "git_commit"),
skills=("python", "fastapi"),
mcp_servers=("filesystem", "git"),
capabilities=AgentCapabilities(
can_hire=False,
can_delegate=True,
max_children=3,
),
runtime_config={"model": "sonnet", "temperature": 0.2},
budget_limit_usd=5.0,
metadata={"team": "backend"},
)
AgentNode Fields¶
| Field | Type | Default | Description |
|---|---|---|---|
id | str | required | Unique node identifier |
name | str | required | Human-readable name |
role | str | required | Agent role (e.g. "engineer", "designer") |
system_prompt | str | "" | Instructions for the agent |
parent_id | str \| None | None | Parent node ID (None = root) |
allowed_tools | tuple[str, ...] | () | Tools this agent can use |
skills | tuple[str, ...] | () | Skill identifiers |
mcp_servers | tuple[str, ...] | () | MCP server names available to agent |
capabilities | AgentCapabilities | defaults | Permission flags |
runtime_config | dict \| None | None | Runtime config (None = inherit from parent) |
budget_limit_usd | float \| None | None | Spending cap |
status | AgentStatus | IDLE | Lifecycle status: IDLE, RUNNING, STOPPED |
metadata | dict | {} | Arbitrary key-value data |
AgentCapabilities¶
Per-agent permission flags configured on creation:
from cognitia.multi_agent.graph_types import AgentCapabilities
caps = AgentCapabilities(
can_hire=True, # can create new agent nodes
can_delegate=True, # can delegate tasks to children
max_children=5, # max direct reports (None = unlimited)
can_use_subagents=True, # can spawn subagents outside the graph
allowed_subagent_ids=("researcher-1",), # restrict to specific subagents
can_use_team_mode=False, # can use team coordination mode
)
GraphBuilder¶
The GraphBuilder provides a fluent API for constructing agent hierarchies. It supports programmatic construction, dict-based configuration, and YAML files.
Fluent API¶
from cognitia.multi_agent.graph_builder import GraphBuilder
from cognitia.multi_agent.graph_types import AgentCapabilities
builder = GraphBuilder(store)
# Chain calls fluently
snapshot = await (
builder
.add_root("ceo", "CEO", "executive",
system_prompt="You lead the organization.",
capabilities=AgentCapabilities(can_hire=True, can_delegate=True))
.add_child("cto", "ceo", "CTO", "tech_lead",
capabilities=AgentCapabilities(can_hire=True, can_delegate=True))
.add_child("eng1", "cto", "Engineer 1", "engineer",
allowed_tools=("file_write",))
.add_child("eng2", "cto", "Engineer 2", "engineer",
allowed_tools=("file_write",))
.add_child("designer", "ceo", "Designer", "designer",
skills=("figma",), mcp_servers=("figma-console",))
.build()
)
# snapshot.root_id == "ceo"
# snapshot.nodes has 5 nodes
# snapshot.edges has 4 REPORTS_TO edges
From Dict¶
Build a graph from a nested dictionary structure:
config = {
"id": "ceo", "name": "CEO", "role": "executive",
"system_prompt": "You lead the organization.",
"capabilities": {"can_hire": True, "can_delegate": True},
"children": [
{
"id": "cto", "name": "CTO", "role": "tech_lead",
"children": [
{"id": "eng1", "name": "Engineer", "role": "engineer",
"allowed_tools": ["file_write", "run_tests"]},
],
},
],
}
snapshot = await GraphBuilder.from_dict(config, store)
From YAML¶
Example org.yaml:
id: ceo
name: CEO
role: executive
system_prompt: You lead the organization.
capabilities:
can_hire: true
can_delegate: true
children:
- id: cto
name: CTO
role: tech_lead
children:
- id: eng1
name: Engineer
role: engineer
allowed_tools: [file_write, run_tests]
GraphBuilder API¶
| Method | Signature | Description |
|---|---|---|
add_root | (id, name, role, **kwargs) -> GraphBuilder | Add the root node |
add_child | (id, parent_id, name, role, **kwargs) -> GraphBuilder | Add a child node |
build | () -> GraphSnapshot | Flush nodes to store, return snapshot |
from_dict | (config, store) -> GraphSnapshot | Class method: build from nested dict |
from_yaml | (path, store) -> GraphSnapshot | Class method: build from YAML file |
Governance¶
Governance enforces global limits and per-agent permissions before graph mutations.
GraphGovernanceConfig¶
Global limits for the entire graph:
from cognitia.multi_agent.graph_governance import GraphGovernanceConfig
governance = GraphGovernanceConfig(
max_agents=50, # maximum total nodes in the graph
max_depth=5, # maximum hierarchy depth
default_capabilities=AgentCapabilities(),
allow_dynamic_hiring=True, # can agents create new nodes at runtime?
allow_dynamic_delegation=True, # can agents delegate tasks at runtime?
)
Governance Checks¶
Two enforcement functions validate operations before they execute:
from cognitia.multi_agent.graph_governance import (
check_hire_allowed,
check_delegate_allowed,
GovernanceError,
)
# Check if a parent agent can hire a new child
error = await check_hire_allowed(governance, parent_node, graph)
if error:
print(f"Denied: {error}")
# e.g. "Agent 'Engineer' does not have can_hire permission"
# e.g. "Max graph depth (5) would be exceeded"
# e.g. "Max agents (50) would be exceeded"
# Check if an agent can delegate tasks
error = check_delegate_allowed(governance, agent_node)
if error:
print(f"Denied: {error}")
# e.g. "Dynamic delegation is globally disabled"
Checks performed by check_hire_allowed:
- Global
allow_dynamic_hiringis enabled - Parent agent has
can_hire=Truein capabilities - Parent has not reached
max_childrenlimit - Adding a child would not exceed
max_depth - Total agent count would not exceed
max_agents
Task Board¶
The GraphTaskBoard manages hierarchical tasks with parent-child relationships, atomic checkout, DAG dependencies, and automatic progress propagation.
GraphTaskItem¶
A task in the hierarchical board:
from cognitia.multi_agent.graph_task_types import GraphTaskItem
from cognitia.multi_agent.task_types import TaskStatus, TaskPriority
task = GraphTaskItem(
id="task-001",
title="Implement user authentication",
description="Add JWT-based auth with refresh tokens",
priority=TaskPriority.HIGH,
assignee_agent_id="eng1",
parent_task_id="task-root",
dependencies=("task-db-setup", "task-models"), # DAG edges
dod_criteria=("Unit tests pass", "Integration test with DB"),
estimated_effort="M", # XS/S/M/L/XL
stage="development",
)
GraphTaskItem Fields¶
| Field | Type | Default | Description |
|---|---|---|---|
id | str | required | Unique task identifier |
title | str | required | Task title |
description | str | "" | Detailed description |
status | TaskStatus | TODO | TODO, IN_PROGRESS, DONE, CANCELLED, BLOCKED |
priority | TaskPriority | MEDIUM | LOW, MEDIUM, HIGH, CRITICAL |
assignee_agent_id | str \| None | None | Assigned agent |
parent_task_id | str \| None | None | Parent task for hierarchy |
dependencies | tuple[str, ...] | () | Task IDs that must be DONE first |
dod_criteria | tuple[str, ...] | () | Definition of Done checklist |
checkout_agent_id | str \| None | None | Atomic lock -- agent currently working |
progress | float | 0.0 | 0.0 to 1.0, auto-propagated from children |
blocked_reason | str | "" | Why the task is blocked |
stage | str | "" | Custom workflow stage name |
Task Lifecycle¶
from cognitia.multi_agent import InMemoryGraphTaskBoard
from cognitia.multi_agent.graph_task_types import GraphTaskItem
board = InMemoryGraphTaskBoard()
# Create a task
await board.create_task(GraphTaskItem(id="t1", title="Build API"))
# Atomic checkout -- only one agent can claim a task
claimed = await board.checkout_task("t1", "eng1")
# claimed is the updated task with status=IN_PROGRESS, or None if already claimed
# Complete -- auto-propagates progress to parent
await board.complete_task("t1")
# List with filters
from cognitia.multi_agent.task_types import TaskStatus
tasks = await board.list_tasks(status=TaskStatus.TODO, assignee_agent_id="eng1")
DAG Dependencies¶
Tasks can declare dependencies that must be DONE before the task becomes ready:
# Task "deploy" depends on "build" and "test"
await board.create_task(GraphTaskItem(id="build", title="Build"))
await board.create_task(GraphTaskItem(id="test", title="Test"))
await board.create_task(GraphTaskItem(
id="deploy", title="Deploy",
dependencies=("build", "test"),
))
# Query ready tasks (all deps DONE, status TODO, not checked out)
ready = await board.get_ready_tasks()
# "deploy" will NOT appear until "build" and "test" are DONE
# See what blocks a task
blockers = await board.get_blocked_by("deploy")
# Returns the GraphTaskItem objects for "build" and "test" if not DONE
Blocking and Unblocking¶
Explicitly block a task with a mandatory reason:
# Block a task -- releases checkout, sets status to BLOCKED
await board.block_task("t1", reason="Waiting for API credentials")
# Unblock -- returns task to TODO status
await board.unblock_task("t1")
Progress Propagation¶
When a child task is completed, the parent's progress is automatically recalculated as the average of its children's progress. If all children are DONE, the parent is also marked DONE -- recursively up the tree.
WorkflowConfig¶
Map custom workflow stages to core TaskStatus values:
from cognitia.multi_agent.graph_task_types import WorkflowConfig, WorkflowStage
from cognitia.multi_agent.task_types import TaskStatus
workflow = WorkflowConfig(
name="software-development",
stages=(
WorkflowStage(name="backlog", maps_to=TaskStatus.TODO, order=0),
WorkflowStage(name="development", maps_to=TaskStatus.IN_PROGRESS, order=1),
WorkflowStage(name="code-review", maps_to=TaskStatus.IN_PROGRESS, order=2),
WorkflowStage(name="testing", maps_to=TaskStatus.IN_PROGRESS, order=3),
WorkflowStage(name="deployed", maps_to=TaskStatus.DONE, order=4),
),
)
# Lookup
stage = workflow.stage_for("code-review")
# stage.maps_to == TaskStatus.IN_PROGRESS
# Get all stages for a status
in_progress = workflow.stages_for_status(TaskStatus.IN_PROGRESS)
# Returns development, code-review, testing
Task Comments¶
Attach audit-trail comments to tasks:
from cognitia.multi_agent.graph_task_types import TaskComment
await board.add_comment(TaskComment(
id="c1", task_id="t1", author_agent_id="eng1",
content="Started implementation. Using FastAPI.",
))
comments = await board.get_comments("t1")
# All comments on this specific task
thread = await board.get_thread("t1")
# All comments on t1 AND its subtasks (recursive)
Communication¶
Inter-agent messaging with three channel types: direct, broadcast, and escalation.
Channel Types¶
| Channel | Description | Scope |
|---|---|---|
DIRECT | Point-to-point message | One sender, one recipient |
BROADCAST | Downward announcement | From agent to entire subtree |
ESCALATION | Upward issue report | From agent to all ancestors |
InMemoryGraphCommunication¶
from cognitia.multi_agent.graph_communication import InMemoryGraphCommunication
from cognitia.multi_agent.graph_comm_types import GraphMessage, ChannelType
comm = InMemoryGraphCommunication(graph_query=graph)
# Direct message between agents
await comm.send_direct(GraphMessage(
id="msg-1",
from_agent_id="lead",
to_agent_id="eng1",
content="Please prioritize the auth module.",
task_id="task-001",
))
# Broadcast to all descendants
await comm.broadcast_subtree(
"lead", "Sprint goal updated: focus on security.",
task_id="task-root",
)
# Escalate to all ancestors in chain of command
await comm.escalate(
"eng1", "Blocked: missing API credentials.",
task_id="task-001",
)
# Read inbox
messages = await comm.get_inbox("eng1")
# Get all messages for a task
thread = await comm.get_thread("task-001")
GraphMessage Fields¶
| Field | Type | Default | Description |
|---|---|---|---|
id | str | required | Unique message ID |
from_agent_id | str | required | Sender agent ID |
to_agent_id | str \| None | None | Recipient (None for broadcast) |
channel | ChannelType | DIRECT | DIRECT, BROADCAST, ESCALATION |
content | str | "" | Message body |
task_id | str \| None | None | Related task for threading |
metadata | dict | {} | Arbitrary key-value data |
All communication methods optionally emit events via EventBus if one is provided during construction.
Context Builder¶
The GraphContextBuilder enriches each agent's system prompt with its position in the graph, chain of command, team, available tools, skills, MCP servers, and goal ancestry. Context is built automatically by the orchestrator before each agent execution.
AgentExecutionContext¶
The structured context passed to the agent runner:
from cognitia.multi_agent.graph_execution_context import AgentExecutionContext
# Built automatically by the orchestrator, but can be constructed manually:
ctx = AgentExecutionContext(
agent_id="eng1",
task_id="task-001",
goal="Implement JWT authentication",
system_prompt="## Your Identity\nYou are Engineer 1, role: engineer.\n...",
tools=("file_write", "run_tests"),
skills=("python", "fastapi"),
mcp_servers=("filesystem",),
runtime_config={"model": "sonnet"},
budget_limit_usd=5.0,
)
Context Propagation¶
Tools, skills, and MCP servers are inherited from ancestors: an agent receives its own plus all ancestors' tools/skills/MCP servers (deduplicated). Runtime config uses nearest-ancestor inheritance: the first non-None config walking up the chain is used.
from cognitia.multi_agent.graph_context import GraphContextBuilder
ctx_builder = GraphContextBuilder(
graph_query=graph,
task_board=task_board,
token_budget=4000, # truncates shared knowledge to fit
)
# Build a full context snapshot
snapshot = await ctx_builder.build_context("eng1", task_id="task-001")
# snapshot.chain_of_command == ("CEO", "CTO", "Engineer 1")
# snapshot.sibling_agents == ("Engineer 2",)
# snapshot.available_tools == ("file_write", "run_tests", ...) # own + inherited
# Render as a system prompt
prompt = ctx_builder.render_system_prompt(snapshot)
# Includes: Identity, Chain of Command, Goal Ancestry, Team, Tools, Skills,
# MCP Servers, Permissions, Instructions
Context-Aware Runner¶
The orchestrator auto-detects whether your runner accepts the full AgentExecutionContext or the legacy 4-argument signature:
# Context-aware runner (recommended) -- 1 required parameter
async def run_agent(ctx: AgentExecutionContext) -> str:
# ctx.system_prompt includes graph position, team, permissions
# ctx.tools, ctx.skills, ctx.mcp_servers are ready to use
# ctx.runtime_config has the resolved model/temperature
return await call_llm(
system_prompt=ctx.system_prompt,
user_message=ctx.goal,
model=ctx.runtime_config.get("model", "sonnet"),
)
# Legacy runner -- 4 positional parameters (still supported)
async def run_agent_legacy(
agent_id: str, task_id: str, goal: str, system_prompt: str,
) -> str:
return await call_llm(system_prompt=system_prompt, user_message=goal)
Graph Tools¶
The create_graph_tools factory produces tool definitions that agents can invoke to dynamically modify the organization graph, delegate work, or escalate issues.
from cognitia.multi_agent.graph_tools import create_graph_tools
tools = create_graph_tools(
graph=graph,
task_board=task_board,
orchestrator=orchestrator,
governance=governance, # optional: enforce global limits
approval_gate=approval_gate, # optional: human-in-the-loop
communication=comm, # optional: escalation messaging
)
# Returns list of 3 ToolDefinition objects
graph_hire_agent¶
Dynamically create a new agent node under an existing parent:
# Called by an agent as a tool:
# graph_hire_agent(
# name="Security Auditor",
# role="auditor",
# parent_id="cto",
# system_prompt="You audit code for security vulnerabilities.",
# allowed_tools="file_read,security_scan",
# )
Governance checks before hiring: - Parent has can_hire=True - max_children not exceeded - max_depth not exceeded - max_agents not exceeded - Approval gate passes (if configured)
graph_delegate_task¶
Delegate a task to a specific agent via the orchestrator:
# Called by an agent as a tool:
# graph_delegate_task(
# agent_id="eng1",
# goal="Write unit tests for the auth module",
# parent_task_id="task-001",
# caller_agent_id="lead",
# stage="testing",
# )
Creates a new GraphTaskItem and launches async execution through the orchestrator.
graph_escalate¶
Escalate an issue up the chain of command:
# Called by an agent as a tool:
# graph_escalate(
# from_agent_id="eng1",
# message="Cannot proceed: database connection is down.",
# task_id="task-001",
# )
Sends escalation messages to all ancestors in the chain of command via GraphCommunication.
Storage Backends¶
InMemoryAgentGraph¶
Zero-dependency, thread-safe via asyncio.Lock. Implements both AgentGraphStore and AgentGraphQuery:
from cognitia.multi_agent import InMemoryAgentGraph
graph = InMemoryAgentGraph()
await graph.add_node(root_node)
await graph.add_node(child_node)
# Query
root = await graph.get_root()
children = await graph.get_children("ceo")
chain = await graph.get_chain_of_command("eng1") # [eng1, cto, ceo]
subtree = await graph.get_subtree("cto") # cto + all descendants
engineers = await graph.find_by_role("engineer")
# Mutation
await graph.update_node("eng1", status=AgentStatus.RUNNING)
await graph.remove_node("eng1") # cascades to subtree
# Snapshot
snapshot = await graph.snapshot()
# GraphSnapshot(nodes=(...), edges=(...), root_id="ceo")
SqliteAgentGraph¶
File-based persistence using SQLite with recursive CTEs for efficient tree traversal. Uses asyncio.to_thread() for non-blocking I/O:
from cognitia.multi_agent.graph_store_sqlite import SqliteAgentGraph
# File-based (persists across restarts)
graph = SqliteAgentGraph(db_path="agents.db")
# In-memory (for tests)
graph = SqliteAgentGraph(db_path=":memory:")
# API is identical to InMemoryAgentGraph
await graph.add_node(root_node)
chain = await graph.get_chain_of_command("eng1")
The SQLite backend uses PRAGMA journal_mode=WAL for concurrent read performance and recursive CTEs for get_chain_of_command and get_subtree queries.
InMemoryGraphTaskBoard¶
Implements GraphTaskBoard, GraphTaskScheduler, GraphTaskBlocker, and TaskCommentStore:
Orchestrator¶
The DefaultGraphOrchestrator ties all components together into an execution engine:
from cognitia.multi_agent.graph_orchestrator import DefaultGraphOrchestrator
orchestrator = DefaultGraphOrchestrator(
graph=graph,
task_board=task_board,
agent_runner=run_agent, # your LLM call
event_bus=event_bus, # optional: lifecycle events
communication=comm, # optional: escalation on failure
max_concurrent=5, # semaphore-bounded parallelism
max_retries=2, # retry per agent before escalation
approval_gate=approval_gate, # optional: HITL for delegation
)
Execution Flow¶
start(goal)-- finds the graph root, creates a root task, launches root agent- Root agent decomposes the goal, calls
graph_delegate_taskfor subtasks - Each delegated task launches an agent execution (bounded by semaphore)
- On success, results are stored and task is marked
DONE - On failure after retries, the agent escalates to its parent
- Results bubble up -- parent task auto-completes when all children are
DONE
Run Status¶
from cognitia.multi_agent.graph_orchestrator_types import OrchestratorRunState
status = await orchestrator.get_status(run_id)
# status.state: PENDING | RUNNING | COMPLETED | FAILED | STOPPED
# status.executions: tuple of AgentExecution snapshots
# status.completed_count: how many agents finished successfully
# status.failed_count: how many agents failed
# Wait for a specific task
result = await orchestrator.wait_for_task("task-001", timeout=60.0)
# Stop a run (cancels pending tasks)
await orchestrator.stop(run_id)
Protocols¶
All graph components are defined as @runtime_checkable protocols in cognitia.protocols:
AgentGraphStore¶
Mutation operations on the agent graph (5 methods):
from cognitia.protocols.agent_graph import AgentGraphStore
@runtime_checkable
class AgentGraphStore(Protocol):
async def add_node(self, node: AgentNode) -> None: ...
async def remove_node(self, node_id: str) -> bool: ...
async def get_node(self, node_id: str) -> AgentNode | None: ...
async def get_children(self, node_id: str) -> list[AgentNode]: ...
async def snapshot(self) -> GraphSnapshot: ...
AgentGraphQuery¶
Read-only traversal of the agent graph (4 methods):
from cognitia.protocols.agent_graph import AgentGraphQuery
@runtime_checkable
class AgentGraphQuery(Protocol):
async def get_chain_of_command(self, node_id: str) -> list[AgentNode]: ...
async def get_subtree(self, node_id: str) -> list[AgentNode]: ...
async def get_root(self) -> AgentNode | None: ...
async def find_by_role(self, role: str) -> list[AgentNode]: ...
AgentNodeUpdater¶
Partial update without remove+add (1 method):
from cognitia.protocols.agent_graph import AgentNodeUpdater
@runtime_checkable
class AgentNodeUpdater(Protocol):
async def update_node(self, node_id: str, **updates: Any) -> AgentNode | None: ...
GraphTaskBoard¶
Hierarchical task management with atomic checkout (5 methods):
from cognitia.protocols.graph_task import GraphTaskBoard
@runtime_checkable
class GraphTaskBoard(Protocol):
async def create_task(self, task: GraphTaskItem) -> None: ...
async def checkout_task(self, task_id: str, agent_id: str) -> GraphTaskItem | None: ...
async def complete_task(self, task_id: str) -> bool: ...
async def get_subtasks(self, task_id: str) -> list[GraphTaskItem]: ...
async def list_tasks(self, **filters: Any) -> list[GraphTaskItem]: ...
GraphTaskScheduler¶
DAG-aware task scheduling (2 methods):
from cognitia.protocols.graph_task import GraphTaskScheduler
@runtime_checkable
class GraphTaskScheduler(Protocol):
async def get_ready_tasks(self) -> list[GraphTaskItem]: ...
async def get_blocked_by(self, task_id: str) -> list[GraphTaskItem]: ...
GraphTaskBlocker¶
Block/unblock tasks (2 methods):
from cognitia.protocols.graph_task import GraphTaskBlocker
@runtime_checkable
class GraphTaskBlocker(Protocol):
async def block_task(self, task_id: str, reason: str) -> bool: ...
async def unblock_task(self, task_id: str) -> bool: ...
TaskCommentStore¶
Persistent comment threads on tasks (3 methods):
from cognitia.protocols.graph_task import TaskCommentStore
@runtime_checkable
class TaskCommentStore(Protocol):
async def add_comment(self, comment: TaskComment) -> None: ...
async def get_comments(self, task_id: str) -> list[TaskComment]: ...
async def get_thread(self, task_id: str) -> list[TaskComment]: ...
GraphCommunication¶
Inter-agent messaging (5 methods):
from cognitia.protocols.graph_comm import GraphCommunication
@runtime_checkable
class GraphCommunication(Protocol):
async def send_direct(self, msg: GraphMessage) -> None: ...
async def broadcast_subtree(self, from_id: str, content: str, *, task_id: str | None = None) -> None: ...
async def escalate(self, from_id: str, content: str, *, task_id: str | None = None) -> None: ...
async def get_inbox(self, agent_id: str) -> list[GraphMessage]: ...
async def get_thread(self, task_id: str) -> list[GraphMessage]: ...
GraphOrchestrator¶
Hierarchical execution engine (5 methods):
from cognitia.protocols.graph_orchestrator import GraphOrchestrator
@runtime_checkable
class GraphOrchestrator(Protocol):
async def start(self, goal: str) -> str: ...
async def delegate(self, request: DelegationRequest) -> None: ...
async def collect_result(self, task_id: str) -> str | None: ...
async def get_status(self, run_id: str) -> OrchestratorRunStatus: ...
async def stop(self, run_id: str) -> None: ...
GraphTaskWaiter¶
Wait for task completion (1 method, separated from GraphOrchestrator for ISP compliance):
from cognitia.protocols.graph_orchestrator import GraphTaskWaiter
@runtime_checkable
class GraphTaskWaiter(Protocol):
async def wait_for_task(self, task_id: str, timeout: float | None = None) -> str | None: ...
Custom Implementations¶
To create a custom backend (e.g. PostgreSQL, Redis), implement the corresponding protocol. Use isinstance() checks at runtime thanks to @runtime_checkable:
from cognitia.protocols.agent_graph import AgentGraphStore, AgentGraphQuery
from cognitia.multi_agent.graph_types import AgentNode, GraphSnapshot
class PostgresAgentGraph:
"""PostgreSQL-backed agent graph with CTE traversal."""
def __init__(self, pool) -> None:
self._pool = pool
# AgentGraphStore methods
async def add_node(self, node: AgentNode) -> None: ...
async def remove_node(self, node_id: str) -> bool: ...
async def get_node(self, node_id: str) -> AgentNode | None: ...
async def get_children(self, node_id: str) -> list[AgentNode]: ...
async def snapshot(self) -> GraphSnapshot: ...
# AgentGraphQuery methods
async def get_chain_of_command(self, node_id: str) -> list[AgentNode]: ...
async def get_subtree(self, node_id: str) -> list[AgentNode]: ...
async def get_root(self) -> AgentNode | None: ...
async def find_by_role(self, role: str) -> list[AgentNode]: ...
# Verify compliance
pg = PostgresAgentGraph(pool)
assert isinstance(pg, AgentGraphStore)
assert isinstance(pg, AgentGraphQuery)
Next Steps¶
- Multi-Agent Coordination -- flat agent-as-tool, task queues, agent registry
- Orchestration -- planning mode, subagents, team coordination
- Tools and Skills -- tool decorator, MCP skills
- Architecture -- Clean Architecture layers and protocol design