Skip to content

pyagent-trace API Reference

Event Bus

pyagent_trace.events.TraceEvent dataclass

Contract: every trace event pyagent emits.

Attributes:

Name Type Description
timestamp float

Unix timestamp of the event.

event_type str

One of pattern_start, pattern_end, agent_start, agent_end, llm_call, llm_response, routing_decision, compression, error, cost_record.

agent_name str

Name of the agent involved (empty string if N/A).

pattern_type str

Name of the pattern involved (empty string if N/A).

payload dict[str, Any]

Event-specific data (tokens, cost, duration, model, messages, etc.).

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
@dataclass(frozen=True)
class TraceEvent:
    """Contract: every trace event pyagent emits.

    Attributes:
        timestamp: Unix timestamp of the event.
        event_type: One of pattern_start, pattern_end, agent_start, agent_end,
            llm_call, llm_response, routing_decision, compression, error, cost_record.
        agent_name: Name of the agent involved (empty string if N/A).
        pattern_type: Name of the pattern involved (empty string if N/A).
        payload: Event-specific data (tokens, cost, duration, model, messages, etc.).
    """

    timestamp: float
    event_type: str
    agent_name: str = ""
    pattern_type: str = ""
    payload: dict[str, Any] = field(default_factory=dict)

pyagent_trace.events.TraceEventBus

Pub/sub event bus — the contract between trace producers and studio consumers.

Usage

bus = TraceEventBus() sub_id = bus.subscribe(lambda event: print(event)) bus.emit(TraceEvent(timestamp=time.time(), event_type="llm_call", ...)) bus.unsubscribe(sub_id)

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
class TraceEventBus:
    """Pub/sub event bus — the contract between trace producers and studio consumers.

    Usage:
        bus = TraceEventBus()
        sub_id = bus.subscribe(lambda event: print(event))
        bus.emit(TraceEvent(timestamp=time.time(), event_type="llm_call", ...))
        bus.unsubscribe(sub_id)
    """

    def __init__(self) -> None:
        self._subscribers: dict[str, Callable[[TraceEvent], None]] = {}
        self._filtered_subscribers: dict[str, tuple[set[str], Callable[[TraceEvent], None]]] = {}

    def subscribe(self, callback: Callable[[TraceEvent], None]) -> str:
        """Subscribe to all trace events. Returns subscription ID."""
        sub_id = uuid.uuid4().hex
        self._subscribers[sub_id] = callback
        return sub_id

    def subscribe_filter(
        self, event_types: set[str], callback: Callable[[TraceEvent], None]
    ) -> str:
        """Subscribe to specific event types only. Returns subscription ID."""
        sub_id = uuid.uuid4().hex
        self._filtered_subscribers[sub_id] = (event_types, callback)
        return sub_id

    def unsubscribe(self, subscription_id: str) -> None:
        """Remove a subscription by ID."""
        self._subscribers.pop(subscription_id, None)
        self._filtered_subscribers.pop(subscription_id, None)

    def emit(self, event: TraceEvent) -> None:
        """Emit event to all matching subscribers (sync)."""
        for callback in self._subscribers.values():
            callback(event)
        for event_types, callback in self._filtered_subscribers.values():
            if event.event_type in event_types:
                callback(event)

    async def emit_async(self, event: TraceEvent) -> None:
        """Emit event to all matching subscribers, awaiting any coroutine callbacks."""
        for callback in self._subscribers.values():
            result = callback(event)
            if asyncio.iscoroutine(result):
                await result
        for event_types, callback in self._filtered_subscribers.values():
            if event.event_type in event_types:
                result = callback(event)
                if asyncio.iscoroutine(result):
                    await result

emit(event)

Emit event to all matching subscribers (sync).

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
def emit(self, event: TraceEvent) -> None:
    """Emit event to all matching subscribers (sync)."""
    for callback in self._subscribers.values():
        callback(event)
    for event_types, callback in self._filtered_subscribers.values():
        if event.event_type in event_types:
            callback(event)

emit_async(event) async

Emit event to all matching subscribers, awaiting any coroutine callbacks.

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
async def emit_async(self, event: TraceEvent) -> None:
    """Emit event to all matching subscribers, awaiting any coroutine callbacks."""
    for callback in self._subscribers.values():
        result = callback(event)
        if asyncio.iscoroutine(result):
            await result
    for event_types, callback in self._filtered_subscribers.values():
        if event.event_type in event_types:
            result = callback(event)
            if asyncio.iscoroutine(result):
                await result

subscribe(callback)

Subscribe to all trace events. Returns subscription ID.

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
def subscribe(self, callback: Callable[[TraceEvent], None]) -> str:
    """Subscribe to all trace events. Returns subscription ID."""
    sub_id = uuid.uuid4().hex
    self._subscribers[sub_id] = callback
    return sub_id

subscribe_filter(event_types, callback)

Subscribe to specific event types only. Returns subscription ID.

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
def subscribe_filter(
    self, event_types: set[str], callback: Callable[[TraceEvent], None]
) -> str:
    """Subscribe to specific event types only. Returns subscription ID."""
    sub_id = uuid.uuid4().hex
    self._filtered_subscribers[sub_id] = (event_types, callback)
    return sub_id

unsubscribe(subscription_id)

Remove a subscription by ID.

Source code in packages/pyagent-trace/src/pyagent_trace/events.py
def unsubscribe(self, subscription_id: str) -> None:
    """Remove a subscription by ID."""
    self._subscribers.pop(subscription_id, None)
    self._filtered_subscribers.pop(subscription_id, None)

Exporters

pyagent_trace.exporters.base.TraceExporter

Bases: Protocol

Portal-agnostic exporter contract.

Any class implementing these three methods can receive pyagent trace events. Built-in: ConsoleExporter, JsonlExporter, OTelExporter, LangfuseExporter. Users can implement custom exporters for any backend.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/base.py
@runtime_checkable
class TraceExporter(Protocol):
    """Portal-agnostic exporter contract.

    Any class implementing these three methods can receive pyagent trace events.
    Built-in: ConsoleExporter, JsonlExporter, OTelExporter, LangfuseExporter.
    Users can implement custom exporters for any backend.
    """

    def export_event(self, event: TraceEvent) -> None:
        """Export a single trace event to the backend."""
        ...

    def flush(self) -> None:
        """Flush any buffered events to the backend."""
        ...

    def shutdown(self) -> None:
        """Shutdown the exporter, flushing remaining events."""
        ...

export_event(event)

Export a single trace event to the backend.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/base.py
def export_event(self, event: TraceEvent) -> None:
    """Export a single trace event to the backend."""
    ...

flush()

Flush any buffered events to the backend.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/base.py
def flush(self) -> None:
    """Flush any buffered events to the backend."""
    ...

shutdown()

Shutdown the exporter, flushing remaining events.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/base.py
def shutdown(self) -> None:
    """Shutdown the exporter, flushing remaining events."""
    ...

pyagent_trace.exporters.console.ConsoleExporter

Export trace events to stdout (or any file-like object).

Usage

exporter = ConsoleExporter() bus.subscribe(exporter.export_event)

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/console.py
class ConsoleExporter:
    """Export trace events to stdout (or any file-like object).

    Usage:
        exporter = ConsoleExporter()
        bus.subscribe(exporter.export_event)
    """

    def __init__(self, output: IO[str] | None = None) -> None:
        self._output = output or sys.stdout

    def export_event(self, event: TraceEvent) -> None:
        """Print a formatted trace event."""
        parts = [f"[{event.event_type}]"]
        if event.pattern_type:
            parts.append(f"pattern={event.pattern_type}")
        if event.agent_name:
            parts.append(f"agent={event.agent_name}")
        if event.payload:
            payload_str = " ".join(f"{k}={v}" for k, v in event.payload.items())
            parts.append(payload_str)
        line = " ".join(parts)
        self._output.write(line + "\n")

    def flush(self) -> None:
        """Flush the output stream."""
        self._output.flush()

    def shutdown(self) -> None:
        """Flush remaining output."""
        self.flush()

export_event(event)

Print a formatted trace event.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/console.py
def export_event(self, event: TraceEvent) -> None:
    """Print a formatted trace event."""
    parts = [f"[{event.event_type}]"]
    if event.pattern_type:
        parts.append(f"pattern={event.pattern_type}")
    if event.agent_name:
        parts.append(f"agent={event.agent_name}")
    if event.payload:
        payload_str = " ".join(f"{k}={v}" for k, v in event.payload.items())
        parts.append(payload_str)
    line = " ".join(parts)
    self._output.write(line + "\n")

flush()

Flush the output stream.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/console.py
def flush(self) -> None:
    """Flush the output stream."""
    self._output.flush()

shutdown()

Flush remaining output.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/console.py
def shutdown(self) -> None:
    """Flush remaining output."""
    self.flush()

pyagent_trace.exporters.jsonl.JsonlExporter

Export trace events to a JSONL file.

Usage

exporter = JsonlExporter("traces/run_001.jsonl") bus.subscribe(exporter.export_event)

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/jsonl.py
class JsonlExporter:
    """Export trace events to a JSONL file.

    Usage:
        exporter = JsonlExporter("traces/run_001.jsonl")
        bus.subscribe(exporter.export_event)
    """

    def __init__(self, path: str | Path) -> None:
        self._path = Path(path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._file: IO[str] | None = None

    def _ensure_open(self) -> IO[str]:
        if self._file is None or self._file.closed:
            self._file = self._path.open("a")
        return self._file

    def export_event(self, event: TraceEvent) -> None:
        """Append a trace event as a JSON line."""
        f = self._ensure_open()
        f.write(json.dumps(asdict(event), default=str) + "\n")

    def flush(self) -> None:
        """Flush buffered writes to disk."""
        if self._file and not self._file.closed:
            self._file.flush()

    def shutdown(self) -> None:
        """Flush and close the file."""
        if self._file and not self._file.closed:
            self._file.flush()
            self._file.close()

export_event(event)

Append a trace event as a JSON line.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/jsonl.py
def export_event(self, event: TraceEvent) -> None:
    """Append a trace event as a JSON line."""
    f = self._ensure_open()
    f.write(json.dumps(asdict(event), default=str) + "\n")

flush()

Flush buffered writes to disk.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/jsonl.py
def flush(self) -> None:
    """Flush buffered writes to disk."""
    if self._file and not self._file.closed:
        self._file.flush()

shutdown()

Flush and close the file.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/jsonl.py
def shutdown(self) -> None:
    """Flush and close the file."""
    if self._file and not self._file.closed:
        self._file.flush()
        self._file.close()

pyagent_trace.exporters.langfuse.LangfuseExporter

Export trace events to Langfuse.

Usage

exporter = LangfuseExporter( public_key="pk-...", secret_key="sk-...", host="https://cloud.langfuse.com", ) bus.subscribe(exporter.export_event)

Requires: pip install pyagent-trace[langfuse]

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/langfuse.py
class LangfuseExporter:
    """Export trace events to Langfuse.

    Usage:
        exporter = LangfuseExporter(
            public_key="pk-...",
            secret_key="sk-...",
            host="https://cloud.langfuse.com",
        )
        bus.subscribe(exporter.export_event)

    Requires: pip install pyagent-trace[langfuse]
    """

    def __init__(
        self,
        public_key: str = "",
        secret_key: str = "",
        host: str = "https://cloud.langfuse.com",
        client: Any | None = None,
    ) -> None:
        if client is not None:
            self._client = client
        elif Langfuse is not None:
            self._client = Langfuse(
                public_key=public_key,
                secret_key=secret_key,
                host=host,
            )
        else:
            raise ImportError(
                "langfuse package not installed. Install with: pip install pyagent-trace[langfuse]"
            )
        self._active_traces: dict[str, Any] = {}
        self._active_spans: dict[str, Any] = {}

    def export_event(self, event: TraceEvent) -> None:
        """Export a trace event to Langfuse."""
        handler = _EVENT_HANDLERS.get(event.event_type)
        if handler:
            handler(self, event)

    def _handle_pattern_start(self, event: TraceEvent) -> None:
        trace_obj = self._client.trace(
            name=event.pattern_type or "unknown_pattern",
            metadata=event.payload,
        )
        trace_id = event.payload.get("trace_id", event.pattern_type)
        self._active_traces[trace_id] = trace_obj

    def _handle_pattern_end(self, event: TraceEvent) -> None:
        trace_id = event.payload.get("trace_id", event.pattern_type)
        trace_obj = self._active_traces.pop(trace_id, None)
        if trace_obj:
            trace_obj.update(metadata={**event.payload, "completed": True})

    def _handle_agent_start(self, event: TraceEvent) -> None:
        trace_id = event.payload.get("trace_id", event.pattern_type)
        trace_obj = self._active_traces.get(trace_id)
        if trace_obj:
            span = trace_obj.span(
                name=event.agent_name,
                metadata=event.payload,
            )
            self._active_spans[event.agent_name] = span

    def _handle_agent_end(self, event: TraceEvent) -> None:
        span = self._active_spans.pop(event.agent_name, None)
        if span:
            span.end(metadata=event.payload)

    def _handle_llm_call(self, event: TraceEvent) -> None:
        trace_id = event.payload.get("trace_id", event.pattern_type)
        trace_obj = self._active_traces.get(trace_id)
        parent = trace_obj
        if event.agent_name in self._active_spans:
            parent = self._active_spans[event.agent_name]
        if parent:
            parent.generation(
                name=f"llm_call.{event.agent_name}" if event.agent_name else "llm_call",
                model=event.payload.get("model", ""),
                input=event.payload.get("messages_in", ""),
                output=event.payload.get("response", ""),
                usage={
                    "input": event.payload.get("input_tokens", 0),
                    "output": event.payload.get("output_tokens", 0),
                },
                metadata=event.payload,
            )

    def _handle_cost_record(self, event: TraceEvent) -> None:
        trace_id = event.payload.get("trace_id", event.pattern_type)
        trace_obj = self._active_traces.get(trace_id)
        if trace_obj:
            trace_obj.update(
                metadata={
                    "cost_usd": event.payload.get("cost_usd", 0.0),
                    "model": event.payload.get("model", ""),
                    "input_tokens": event.payload.get("input_tokens", 0),
                    "output_tokens": event.payload.get("output_tokens", 0),
                }
            )

    def flush(self) -> None:
        """Flush pending events to Langfuse."""
        self._client.flush()

    def shutdown(self) -> None:
        """Flush and shutdown the Langfuse client."""
        self._client.flush()

export_event(event)

Export a trace event to Langfuse.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/langfuse.py
def export_event(self, event: TraceEvent) -> None:
    """Export a trace event to Langfuse."""
    handler = _EVENT_HANDLERS.get(event.event_type)
    if handler:
        handler(self, event)

flush()

Flush pending events to Langfuse.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/langfuse.py
def flush(self) -> None:
    """Flush pending events to Langfuse."""
    self._client.flush()

shutdown()

Flush and shutdown the Langfuse client.

Source code in packages/pyagent-trace/src/pyagent_trace/exporters/langfuse.py
def shutdown(self) -> None:
    """Flush and shutdown the Langfuse client."""
    self._client.flush()

Spans

pyagent_trace.spans.PatternSpanEmitter

Emit OTel spans for pattern executions.

Usage

emitter = PatternSpanEmitter() with emitter.pattern_span("debate", {"rounds": 3}) as span: # ... pattern logic ... emitter.set_result(span, result)

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
class PatternSpanEmitter:
    """Emit OTel spans for pattern executions.

    Usage:
        emitter = PatternSpanEmitter()
        with emitter.pattern_span("debate", {"rounds": 3}) as span:
            # ... pattern logic ...
            emitter.set_result(span, result)
    """

    def pattern_span(
        self,
        pattern_type: str,
        attributes: dict[str, Any] | None = None,
    ) -> Span:
        """Start a span for a pattern execution."""
        span = _tracer.start_span(
            f"pyagent.pattern.{pattern_type}",
            attributes={
                PyAgentAttributes.PATTERN_TYPE: pattern_type,
                **(attributes or {}),
            },
        )
        return span

    def agent_span(
        self,
        agent_name: str,
        parent_span: Span | None = None,
        attributes: dict[str, Any] | None = None,
    ) -> Span:
        """Start a span for an individual agent call."""
        ctx = trace.set_span_in_context(parent_span) if parent_span else None
        span = _tracer.start_span(
            f"pyagent.agent.{agent_name}",
            context=ctx,
            attributes={
                PyAgentAttributes.AGENT_NAME: agent_name,
                **(attributes or {}),
            },
        )
        return span

    @staticmethod
    def set_pattern_result(
        span: Span,
        output_length: int,
        rounds: int | None = None,
        consensus: float | None = None,
        escalated: bool = False,
        duration_ms: float = 0.0,
        token_estimate: int = 0,
        cost_estimate: float = 0.0,
    ) -> None:
        """Set result attributes on a pattern span."""
        if rounds is not None:
            span.set_attribute(PyAgentAttributes.PATTERN_ROUNDS, rounds)
        if consensus is not None:
            span.set_attribute(PyAgentAttributes.PATTERN_CONSENSUS, consensus)
        span.set_attribute(PyAgentAttributes.PATTERN_ESCALATED, escalated)
        span.set_attribute(PyAgentAttributes.EXEC_DURATION_MS, duration_ms)
        span.set_attribute(PyAgentAttributes.EXEC_TOKEN_ESTIMATE, token_estimate)
        span.set_attribute(PyAgentAttributes.COST_TOTAL_USD, cost_estimate)
        span.set_status(StatusCode.OK)

    @staticmethod
    def set_routing_info(
        span: Span,
        difficulty: int,
        selected_model: str,
        cost_estimate: float,
        category: str = "",
    ) -> None:
        """Set routing attributes on a span."""
        span.set_attribute(PyAgentAttributes.ROUTER_DIFFICULTY, difficulty)
        span.set_attribute(PyAgentAttributes.ROUTER_SELECTED_MODEL, selected_model)
        span.set_attribute(PyAgentAttributes.ROUTER_COST_ESTIMATE, cost_estimate)
        if category:
            span.set_attribute(PyAgentAttributes.ROUTER_DIFFICULTY_CATEGORY, category)

    @staticmethod
    def set_compression_info(
        span: Span,
        input_tokens: int,
        output_tokens: int,
        savings_pct: float,
    ) -> None:
        """Set compression attributes on a span."""
        span.set_attribute(PyAgentAttributes.COMPRESS_INPUT_TOKENS, input_tokens)
        span.set_attribute(PyAgentAttributes.COMPRESS_OUTPUT_TOKENS, output_tokens)
        span.set_attribute(PyAgentAttributes.COMPRESS_SAVINGS_PCT, savings_pct)

    @staticmethod
    def set_error(span: Span, error: Exception) -> None:
        """Record an error on a span."""
        span.set_status(StatusCode.ERROR, str(error))
        span.record_exception(error)

agent_span(agent_name, parent_span=None, attributes=None)

Start a span for an individual agent call.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
def agent_span(
    self,
    agent_name: str,
    parent_span: Span | None = None,
    attributes: dict[str, Any] | None = None,
) -> Span:
    """Start a span for an individual agent call."""
    ctx = trace.set_span_in_context(parent_span) if parent_span else None
    span = _tracer.start_span(
        f"pyagent.agent.{agent_name}",
        context=ctx,
        attributes={
            PyAgentAttributes.AGENT_NAME: agent_name,
            **(attributes or {}),
        },
    )
    return span

pattern_span(pattern_type, attributes=None)

Start a span for a pattern execution.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
def pattern_span(
    self,
    pattern_type: str,
    attributes: dict[str, Any] | None = None,
) -> Span:
    """Start a span for a pattern execution."""
    span = _tracer.start_span(
        f"pyagent.pattern.{pattern_type}",
        attributes={
            PyAgentAttributes.PATTERN_TYPE: pattern_type,
            **(attributes or {}),
        },
    )
    return span

set_compression_info(span, input_tokens, output_tokens, savings_pct) staticmethod

Set compression attributes on a span.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
@staticmethod
def set_compression_info(
    span: Span,
    input_tokens: int,
    output_tokens: int,
    savings_pct: float,
) -> None:
    """Set compression attributes on a span."""
    span.set_attribute(PyAgentAttributes.COMPRESS_INPUT_TOKENS, input_tokens)
    span.set_attribute(PyAgentAttributes.COMPRESS_OUTPUT_TOKENS, output_tokens)
    span.set_attribute(PyAgentAttributes.COMPRESS_SAVINGS_PCT, savings_pct)

set_error(span, error) staticmethod

Record an error on a span.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
@staticmethod
def set_error(span: Span, error: Exception) -> None:
    """Record an error on a span."""
    span.set_status(StatusCode.ERROR, str(error))
    span.record_exception(error)

set_pattern_result(span, output_length, rounds=None, consensus=None, escalated=False, duration_ms=0.0, token_estimate=0, cost_estimate=0.0) staticmethod

Set result attributes on a pattern span.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
@staticmethod
def set_pattern_result(
    span: Span,
    output_length: int,
    rounds: int | None = None,
    consensus: float | None = None,
    escalated: bool = False,
    duration_ms: float = 0.0,
    token_estimate: int = 0,
    cost_estimate: float = 0.0,
) -> None:
    """Set result attributes on a pattern span."""
    if rounds is not None:
        span.set_attribute(PyAgentAttributes.PATTERN_ROUNDS, rounds)
    if consensus is not None:
        span.set_attribute(PyAgentAttributes.PATTERN_CONSENSUS, consensus)
    span.set_attribute(PyAgentAttributes.PATTERN_ESCALATED, escalated)
    span.set_attribute(PyAgentAttributes.EXEC_DURATION_MS, duration_ms)
    span.set_attribute(PyAgentAttributes.EXEC_TOKEN_ESTIMATE, token_estimate)
    span.set_attribute(PyAgentAttributes.COST_TOTAL_USD, cost_estimate)
    span.set_status(StatusCode.OK)

set_routing_info(span, difficulty, selected_model, cost_estimate, category='') staticmethod

Set routing attributes on a span.

Source code in packages/pyagent-trace/src/pyagent_trace/spans.py
@staticmethod
def set_routing_info(
    span: Span,
    difficulty: int,
    selected_model: str,
    cost_estimate: float,
    category: str = "",
) -> None:
    """Set routing attributes on a span."""
    span.set_attribute(PyAgentAttributes.ROUTER_DIFFICULTY, difficulty)
    span.set_attribute(PyAgentAttributes.ROUTER_SELECTED_MODEL, selected_model)
    span.set_attribute(PyAgentAttributes.ROUTER_COST_ESTIMATE, cost_estimate)
    if category:
        span.set_attribute(PyAgentAttributes.ROUTER_DIFFICULTY_CATEGORY, category)

pyagent_trace.attributes.PyAgentAttributes

Attribute key constants for pyagent OTel spans.

Source code in packages/pyagent-trace/src/pyagent_trace/attributes.py
class PyAgentAttributes:
    """Attribute key constants for pyagent OTel spans."""

    # Pattern attributes
    PATTERN_TYPE = "pyagent.pattern.type"
    PATTERN_ROUNDS = "pyagent.pattern.rounds"
    PATTERN_CONSENSUS = "pyagent.pattern.consensus"
    PATTERN_ESCALATED = "pyagent.pattern.escalated"
    PATTERN_ESCALATION_LEVEL = "pyagent.pattern.escalation_level"

    # Router attributes
    ROUTER_DIFFICULTY = "pyagent.router.difficulty"
    ROUTER_DIFFICULTY_CATEGORY = "pyagent.router.difficulty_category"
    ROUTER_SELECTED_MODEL = "pyagent.router.selected_model"
    ROUTER_COST_ESTIMATE = "pyagent.router.cost_estimate"
    ROUTER_ALTERNATIVES = "pyagent.router.alternatives"

    # Compression attributes
    COMPRESS_INPUT_TOKENS = "pyagent.compress.input_tokens"
    COMPRESS_OUTPUT_TOKENS = "pyagent.compress.output_tokens"
    COMPRESS_SAVINGS_PCT = "pyagent.compress.savings_pct"

    # Agent attributes
    AGENT_NAME = "pyagent.agent.name"
    AGENT_ROLE = "pyagent.agent.role"
    AGENT_CONTRIBUTION = "pyagent.agent.contribution"

    # Cost attributes
    COST_TOTAL_USD = "pyagent.cost.total_usd"
    COST_INPUT_TOKENS = "pyagent.cost.input_tokens"
    COST_OUTPUT_TOKENS = "pyagent.cost.output_tokens"
    COST_MODEL = "pyagent.cost.model"

    # Execution attributes
    EXEC_DURATION_MS = "pyagent.exec.duration_ms"
    EXEC_TOKEN_ESTIMATE = "pyagent.exec.token_estimate"
    EXEC_LLM_CALLS = "pyagent.exec.llm_calls"

Decorators

pyagent_trace.decorators.traced_pattern(cls)

Class decorator: auto-emit OTel spans for every pattern.run() call.

Usage

@traced_pattern class MyPattern(Pattern): ...

Source code in packages/pyagent-trace/src/pyagent_trace/decorators.py
def traced_pattern(cls: type[Pattern]) -> type[Pattern]:
    """Class decorator: auto-emit OTel spans for every pattern.run() call.

    Usage:
        @traced_pattern
        class MyPattern(Pattern):
            ...
    """
    original_run = cls.run

    @functools.wraps(original_run)
    async def traced_run(self: Pattern, task: str, context: Context | None = None) -> Result:
        with _tracer.start_as_current_span(
            f"pyagent.pattern.{self.pattern_type}",
            attributes={PyAgentAttributes.PATTERN_TYPE: self.pattern_type},
        ) as span:
            start = time.perf_counter()
            try:
                result = await original_run(self, task, context)

                duration_ms = (time.perf_counter() - start) * 1000
                span.set_attribute(PyAgentAttributes.EXEC_DURATION_MS, duration_ms)
                span.set_attribute(PyAgentAttributes.EXEC_TOKEN_ESTIMATE, result.token_estimate)
                span.set_attribute(PyAgentAttributes.COST_TOTAL_USD, result.cost_estimate)

                if "rounds" in result.metadata:
                    span.set_attribute(PyAgentAttributes.PATTERN_ROUNDS, result.metadata["rounds"])

                span.set_status(trace.StatusCode.OK)
                return result
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise

    cls.run = traced_run  # type: ignore[assignment]
    return cls

pyagent_trace.decorators.traced_agent(agent)

Wrap an Agent instance to emit OTel spans on every run() call.

Usage

agent = traced_agent(Agent("my_agent", llm))

Source code in packages/pyagent-trace/src/pyagent_trace/decorators.py
def traced_agent(agent: Agent) -> Agent:
    """Wrap an Agent instance to emit OTel spans on every run() call.

    Usage:
        agent = traced_agent(Agent("my_agent", llm))
    """
    original_run = agent.run

    @functools.wraps(original_run)
    async def traced_run(messages: list[Message]) -> Message:
        with _tracer.start_as_current_span(
            f"pyagent.agent.{agent.name}",
            attributes={PyAgentAttributes.AGENT_NAME: agent.name},
        ) as span:
            start = time.perf_counter()
            try:
                result = await original_run(messages)
                duration_ms = (time.perf_counter() - start) * 1000
                span.set_attribute(PyAgentAttributes.EXEC_DURATION_MS, duration_ms)
                span.set_status(trace.StatusCode.OK)
                return result
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.record_exception(e)
                raise

    agent.run = traced_run  # type: ignore[assignment]
    return agent

Cost Tracking

pyagent_trace.cost.CostTracker

Track costs across an entire workflow.

Usage

tracker = CostTracker() tracker.record("debate", "bull_agent", "gpt-4o", 500, 200, 0.003) tracker.record("debate", "bear_agent", "gpt-4o-mini", 500, 200, 0.0004) print(tracker.summary())

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
class CostTracker:
    """Track costs across an entire workflow.

    Usage:
        tracker = CostTracker()
        tracker.record("debate", "bull_agent", "gpt-4o", 500, 200, 0.003)
        tracker.record("debate", "bear_agent", "gpt-4o-mini", 500, 200, 0.0004)
        print(tracker.summary())
    """

    def __init__(self, event_bus: TraceEventBus | None = None) -> None:
        self._entries: list[CostEntry] = []
        self._event_bus = event_bus

    def record(
        self,
        pattern_type: str,
        agent_name: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cost_usd: float,
    ) -> None:
        """Record a cost entry."""
        self._entries.append(
            CostEntry(
                pattern_type=pattern_type,
                agent_name=agent_name,
                model=model,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                cost_usd=cost_usd,
            )
        )
        if self._event_bus:
            from pyagent_trace.events import TraceEvent

            self._event_bus.emit(
                TraceEvent(
                    timestamp=time.time(),
                    event_type="cost_record",
                    agent_name=agent_name,
                    pattern_type=pattern_type,
                    payload={
                        "model": model,
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                        "cost_usd": cost_usd,
                    },
                )
            )

    @property
    def total_cost(self) -> float:
        return sum(e.cost_usd for e in self._entries)

    @property
    def total_tokens(self) -> int:
        return sum(e.input_tokens + e.output_tokens for e in self._entries)

    def by_pattern(self) -> dict[str, float]:
        """Cost breakdown by pattern type."""
        result: dict[str, float] = {}
        for e in self._entries:
            result[e.pattern_type] = result.get(e.pattern_type, 0.0) + e.cost_usd
        return result

    def by_agent(self) -> dict[str, float]:
        """Cost breakdown by agent name."""
        result: dict[str, float] = {}
        for e in self._entries:
            result[e.agent_name] = result.get(e.agent_name, 0.0) + e.cost_usd
        return result

    def by_model(self) -> dict[str, float]:
        """Cost breakdown by model."""
        result: dict[str, float] = {}
        for e in self._entries:
            result[e.model] = result.get(e.model, 0.0) + e.cost_usd
        return result

    def summary(self) -> dict[str, object]:
        """Full cost summary."""
        return {
            "total_cost_usd": self.total_cost,
            "total_tokens": self.total_tokens,
            "entries": len(self._entries),
            "by_pattern": self.by_pattern(),
            "by_agent": self.by_agent(),
            "by_model": self.by_model(),
        }

by_agent()

Cost breakdown by agent name.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
def by_agent(self) -> dict[str, float]:
    """Cost breakdown by agent name."""
    result: dict[str, float] = {}
    for e in self._entries:
        result[e.agent_name] = result.get(e.agent_name, 0.0) + e.cost_usd
    return result

by_model()

Cost breakdown by model.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
def by_model(self) -> dict[str, float]:
    """Cost breakdown by model."""
    result: dict[str, float] = {}
    for e in self._entries:
        result[e.model] = result.get(e.model, 0.0) + e.cost_usd
    return result

by_pattern()

Cost breakdown by pattern type.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
def by_pattern(self) -> dict[str, float]:
    """Cost breakdown by pattern type."""
    result: dict[str, float] = {}
    for e in self._entries:
        result[e.pattern_type] = result.get(e.pattern_type, 0.0) + e.cost_usd
    return result

record(pattern_type, agent_name, model, input_tokens, output_tokens, cost_usd)

Record a cost entry.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
def record(
    self,
    pattern_type: str,
    agent_name: str,
    model: str,
    input_tokens: int,
    output_tokens: int,
    cost_usd: float,
) -> None:
    """Record a cost entry."""
    self._entries.append(
        CostEntry(
            pattern_type=pattern_type,
            agent_name=agent_name,
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost_usd,
        )
    )
    if self._event_bus:
        from pyagent_trace.events import TraceEvent

        self._event_bus.emit(
            TraceEvent(
                timestamp=time.time(),
                event_type="cost_record",
                agent_name=agent_name,
                pattern_type=pattern_type,
                payload={
                    "model": model,
                    "input_tokens": input_tokens,
                    "output_tokens": output_tokens,
                    "cost_usd": cost_usd,
                },
            )
        )

summary()

Full cost summary.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
def summary(self) -> dict[str, object]:
    """Full cost summary."""
    return {
        "total_cost_usd": self.total_cost,
        "total_tokens": self.total_tokens,
        "entries": len(self._entries),
        "by_pattern": self.by_pattern(),
        "by_agent": self.by_agent(),
        "by_model": self.by_model(),
    }

pyagent_trace.cost.CostEntry dataclass

A single cost record.

Source code in packages/pyagent-trace/src/pyagent_trace/cost.py
@dataclass
class CostEntry:
    """A single cost record."""

    pattern_type: str
    agent_name: str
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float

Record & Replay

pyagent_trace.recorder.Recorder

Record pattern executions for debugging and replay.

Usage

recorder = Recorder() recorder.start("debate") recorder.record_llm_call("bull", messages, response) recorder.save("debug_trace.jsonl")

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
class Recorder:
    """Record pattern executions for debugging and replay.

    Usage:
        recorder = Recorder()
        recorder.start("debate")
        recorder.record_llm_call("bull", messages, response)
        recorder.save("debug_trace.jsonl")
    """

    def __init__(self, event_bus: TraceEventBus | None = None) -> None:
        self._entries: list[RecordEntry] = []
        self._start_time: float = 0.0
        self._event_bus = event_bus

    def start(self, pattern_type: str) -> None:
        """Mark the start of a pattern execution."""
        self._start_time = time.time()
        self._entries.append(
            RecordEntry(
                timestamp=self._start_time,
                event_type="pattern_start",
                agent_name="",
                messages_in=[],
                response="",
                metadata={"pattern_type": pattern_type},
            )
        )
        if self._event_bus:
            from pyagent_trace.events import TraceEvent

            self._event_bus.emit(
                TraceEvent(
                    timestamp=self._start_time,
                    event_type="pattern_start",
                    pattern_type=pattern_type,
                    payload={"pattern_type": pattern_type},
                )
            )

    def record_llm_call(
        self,
        agent_name: str,
        messages: list[Message],
        response: str,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Record an LLM call and its response."""
        ts = time.time()
        meta = metadata or {}
        self._entries.append(
            RecordEntry(
                timestamp=ts,
                event_type="llm_call",
                agent_name=agent_name,
                messages_in=[
                    {"role": m.role.value, "content": m.content, "name": m.name} for m in messages
                ],
                response=response,
                metadata=meta,
            )
        )
        if self._event_bus:
            from pyagent_trace.events import TraceEvent

            self._event_bus.emit(
                TraceEvent(
                    timestamp=ts,
                    event_type="llm_call",
                    agent_name=agent_name,
                    payload={
                        "response": response,
                        "model": meta.get("model", ""),
                        **meta,
                    },
                )
            )

    def end(self, result_output: str) -> None:
        """Mark the end of a pattern execution."""
        ts = time.time()
        duration = ts - self._start_time
        self._entries.append(
            RecordEntry(
                timestamp=ts,
                event_type="pattern_end",
                agent_name="",
                messages_in=[],
                response=result_output,
                metadata={"duration_seconds": duration},
            )
        )
        if self._event_bus:
            from pyagent_trace.events import TraceEvent

            self._event_bus.emit(
                TraceEvent(
                    timestamp=ts,
                    event_type="pattern_end",
                    payload={
                        "result_output": result_output,
                        "duration_seconds": duration,
                    },
                )
            )

    def save(self, path: str | Path) -> None:
        """Save recorded entries to a JSONL file."""
        p = Path(path)
        p.parent.mkdir(parents=True, exist_ok=True)
        with p.open("w") as f:
            for entry in self._entries:
                f.write(json.dumps(asdict(entry), default=str) + "\n")

    @classmethod
    def load(cls, path: str | Path) -> list[RecordEntry]:
        """Load recorded entries from a JSONL file."""
        entries: list[RecordEntry] = []
        with Path(path).open() as f:
            for line in f:
                data = json.loads(line.strip())
                entries.append(RecordEntry(**data))
        return entries

    @property
    def entries(self) -> list[RecordEntry]:
        return list(self._entries)

    @property
    def llm_calls(self) -> list[RecordEntry]:
        return [e for e in self._entries if e.event_type == "llm_call"]

end(result_output)

Mark the end of a pattern execution.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
def end(self, result_output: str) -> None:
    """Mark the end of a pattern execution."""
    ts = time.time()
    duration = ts - self._start_time
    self._entries.append(
        RecordEntry(
            timestamp=ts,
            event_type="pattern_end",
            agent_name="",
            messages_in=[],
            response=result_output,
            metadata={"duration_seconds": duration},
        )
    )
    if self._event_bus:
        from pyagent_trace.events import TraceEvent

        self._event_bus.emit(
            TraceEvent(
                timestamp=ts,
                event_type="pattern_end",
                payload={
                    "result_output": result_output,
                    "duration_seconds": duration,
                },
            )
        )

load(path) classmethod

Load recorded entries from a JSONL file.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
@classmethod
def load(cls, path: str | Path) -> list[RecordEntry]:
    """Load recorded entries from a JSONL file."""
    entries: list[RecordEntry] = []
    with Path(path).open() as f:
        for line in f:
            data = json.loads(line.strip())
            entries.append(RecordEntry(**data))
    return entries

record_llm_call(agent_name, messages, response, metadata=None)

Record an LLM call and its response.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
def record_llm_call(
    self,
    agent_name: str,
    messages: list[Message],
    response: str,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Record an LLM call and its response."""
    ts = time.time()
    meta = metadata or {}
    self._entries.append(
        RecordEntry(
            timestamp=ts,
            event_type="llm_call",
            agent_name=agent_name,
            messages_in=[
                {"role": m.role.value, "content": m.content, "name": m.name} for m in messages
            ],
            response=response,
            metadata=meta,
        )
    )
    if self._event_bus:
        from pyagent_trace.events import TraceEvent

        self._event_bus.emit(
            TraceEvent(
                timestamp=ts,
                event_type="llm_call",
                agent_name=agent_name,
                payload={
                    "response": response,
                    "model": meta.get("model", ""),
                    **meta,
                },
            )
        )

save(path)

Save recorded entries to a JSONL file.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
def save(self, path: str | Path) -> None:
    """Save recorded entries to a JSONL file."""
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    with p.open("w") as f:
        for entry in self._entries:
            f.write(json.dumps(asdict(entry), default=str) + "\n")

start(pattern_type)

Mark the start of a pattern execution.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
def start(self, pattern_type: str) -> None:
    """Mark the start of a pattern execution."""
    self._start_time = time.time()
    self._entries.append(
        RecordEntry(
            timestamp=self._start_time,
            event_type="pattern_start",
            agent_name="",
            messages_in=[],
            response="",
            metadata={"pattern_type": pattern_type},
        )
    )
    if self._event_bus:
        from pyagent_trace.events import TraceEvent

        self._event_bus.emit(
            TraceEvent(
                timestamp=self._start_time,
                event_type="pattern_start",
                pattern_type=pattern_type,
                payload={"pattern_type": pattern_type},
            )
        )

pyagent_trace.recorder.RecordEntry dataclass

A single recorded event.

Source code in packages/pyagent-trace/src/pyagent_trace/recorder.py
@dataclass
class RecordEntry:
    """A single recorded event."""

    timestamp: float
    event_type: str  # "llm_call", "message", "pattern_start", "pattern_end"
    agent_name: str
    messages_in: list[dict[str, Any]]
    response: str
    metadata: dict[str, Any]