Observability¶
Lightweight event bus and structured tracing for runtime instrumentation.
Event Bus¶
A pub-sub event bus for internal runtime events. Subscribers (tracing, metrics, UI) receive events without coupling to the runtime.
from cognitia.observability.event_bus import InMemoryEventBus
from cognitia.runtime.types import RuntimeConfig
bus = InMemoryEventBus()
# Subscribe to events
metrics = []
bus.subscribe("llm_call_end", lambda data: metrics.append(data))
# Wire into runtime
config = RuntimeConfig(runtime_name="thin", event_bus=bus)
Automatic Events¶
When event_bus is set in RuntimeConfig, ThinRuntime emits these events automatically:
| Event | When | Data fields |
|---|---|---|
llm_call_start | Before LLM request | model |
llm_call_end | After LLM response | model, error (if failed) |
tool_call_start | Before tool execution | name, correlation_id |
tool_call_end | After tool execution | name, ok, correlation_id |
EventBus Protocol¶
class EventBus(Protocol):
def subscribe(self, event_type: str, callback) -> str: ... # returns subscription ID
def unsubscribe(self, subscription_id: str) -> None: ...
async def emit(self, event_type: str, data: dict) -> None: ...
- Supports both sync and async callbacks
- Errors in callbacks are caught and ignored (fire-and-forget semantics)
- Unsubscribing a non-existent ID is a no-op
Tracing¶
Span-based structured tracing via the Tracer protocol. TracingSubscriber bridges EventBus events to Tracer spans automatically.
from cognitia.observability.event_bus import InMemoryEventBus
from cognitia.observability.tracer import ConsoleTracer, TracingSubscriber
from cognitia.runtime.types import RuntimeConfig
bus = InMemoryEventBus()
tracer = ConsoleTracer()
# Bridge: EventBus events โ Tracer spans
subscriber = TracingSubscriber(bus, tracer)
subscriber.attach()
config = RuntimeConfig(runtime_name="thin", event_bus=bus, tracer=tracer)
# After execution, inspect spans:
# tracer._spans contains all recorded spans with timing
subscriber.detach()
Built-in Tracers¶
| Tracer | Description |
|---|---|
NoopTracer | Zero-overhead stub for production without tracing |
ConsoleTracer | Logs spans via structlog with duration_ms timing |
Tracer Protocol¶
class Tracer(Protocol):
def start_span(self, name: str, attrs: dict | None = None) -> str: ... # span_id
def end_span(self, span_id: str) -> None: ...
def add_event(self, span_id: str, name: str, attrs: dict | None = None) -> None: ...
Custom Tracers¶
Bridge to OpenTelemetry, Datadog, or any observability platform:
from opentelemetry import trace
class OTelTracer:
def __init__(self):
self._tracer = trace.get_tracer("cognitia")
self._spans = {}
def start_span(self, name, attrs=None):
span = self._tracer.start_span(name, attributes=attrs or {})
span_id = f"otel_{id(span)}"
self._spans[span_id] = span
return span_id
def end_span(self, span_id):
if span_id in self._spans:
self._spans.pop(span_id).end()
def add_event(self, span_id, name, attrs=None):
if span_id in self._spans:
self._spans[span_id].add_event(name, attributes=attrs or {})
TracingSubscriber Lifecycle¶
subscriber = TracingSubscriber(bus, tracer)
subscriber.attach() # subscribes to llm_call_start/end, tool_call_start/end
# ... runtime execution ...
subscriber.detach() # removes all subscriptions
attach() / detach() provide explicit lifecycle control โ the subscriber can be reused across multiple runs.
Combining with Other Features¶
Event bus integrates naturally with other Cognitia features:
from cognitia.runtime.cost import CostBudget
from cognitia.observability.event_bus import InMemoryEventBus
from cognitia.observability.tracer import ConsoleTracer, TracingSubscriber
bus = InMemoryEventBus()
tracer = ConsoleTracer()
TracingSubscriber(bus, tracer).attach()
config = RuntimeConfig(
runtime_name="thin",
event_bus=bus,
tracer=tracer,
cost_budget=CostBudget(max_cost_usd=10.0),
)
# Cost events, tool calls, and LLM calls are all traced automatically
OpenTelemetry Integration¶
OTelExporter bridges EventBus events directly to OpenTelemetry spans, following the GenAI Semantic Conventions. Every LLM call and tool call becomes a first-class OTel span with standardized attributes.
Installation¶
This installs opentelemetry-api and opentelemetry-sdk.
Quick Start¶
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from cognitia.observability.event_bus import InMemoryEventBus
from cognitia.observability.otel_exporter import OTelExporter
from cognitia.runtime.types import RuntimeConfig
bus = InMemoryEventBus()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
otel = OTelExporter(bus, tracer_provider=provider, service_name="my-agent")
otel.attach()
config = RuntimeConfig(runtime_name="thin", event_bus=bus)
# ... run your agent โ spans are exported automatically ...
otel.detach()
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
bus | EventBus | (required) | EventBus instance to subscribe to |
tracer_provider | TracerProvider | global provider | OTel TracerProvider for span creation |
service_name | str | "cognitia" | Value for gen_ai.system on every span |
runtime_name | str \| None | None | Runtime identifier (e.g. "thin", "claude_code") |
session_id | str \| None | None | Session ID for correlating spans across turns |
GenAI Semantic Convention Attributes¶
Every span created by OTelExporter includes standardized attributes:
| Attribute | Span Type | Description |
|---|---|---|
gen_ai.system | all | Service name (always present) |
gen_ai.request.model | LLM | Model requested (e.g. "sonnet") |
gen_ai.response.model | LLM | Model that responded |
gen_ai.usage.input_tokens | LLM | Input token count (when available) |
gen_ai.usage.output_tokens | LLM | Output token count (when available) |
gen_ai.runtime | all | Runtime name (when configured) |
session.id | all | Session identifier (when configured) |
tool.name | Tool | Tool name (e.g. "web_search") |
Exporting to Observability Backends¶
Datadog -- Use the OTLP exporter. Datadog Agent (v6.32+/v7.32+) accepts OTLP over gRPC on port 4317 by default:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
processor = SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
Jaeger -- Jaeger supports OTLP natively since v1.35. Point the OTLP exporter at Jaeger's collector endpoint (default localhost:4317).
Grafana Tempo -- Tempo accepts OTLP traces directly. Configure OTLPSpanExporter(endpoint="http://tempo:4317") and spans appear in Grafana's Explore view.
Coexistence with ConsoleTracer / TracingSubscriber¶
OTelExporter and TracingSubscriber can both subscribe to the same EventBus simultaneously. They operate independently -- events are delivered to all subscribers with no interference:
from cognitia.observability.event_bus import InMemoryEventBus
from cognitia.observability.tracer import ConsoleTracer, TracingSubscriber
from cognitia.observability.otel_exporter import OTelExporter
bus = InMemoryEventBus()
# Internal tracing (structlog)
tracer = ConsoleTracer()
TracingSubscriber(bus, tracer).attach()
# OpenTelemetry export (production)
otel = OTelExporter(bus, tracer_provider=provider, service_name="my-agent")
otel.attach()
# Both receive events from the same bus
config = RuntimeConfig(runtime_name="thin", event_bus=bus, tracer=tracer)
Lifecycle¶
otel = OTelExporter(bus, tracer_provider=provider)
otel.attach() # subscribes to llm_call_start/end, tool_call_start/end
# ... agent execution ...
otel.detach() # unsubscribes, ends any orphaned spans
attach() / detach() mirror the TracingSubscriber API. The exporter can be reattached after detaching.