Skip to content

pyagent-providers API Reference

pyagent_providers

PyAgent Providers — multi-provider abstraction with capability negotiation and fallback chains.

HealthStatus

Bases: StrEnum

Health status of a provider.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
class HealthStatus(StrEnum):
    """Health status of a provider."""

    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

ProviderCapabilities dataclass

Declares what a provider can do.

Attributes:

Name Type Description
models list[str]

List of model identifiers this provider exposes.

capabilities set[Capability]

Set of capability tags (reuses pyagent_router.Capability).

max_context int

Maximum context window in tokens across all models.

supports_streaming bool

Whether the provider supports token-level streaming.

supports_tools bool

Whether the provider supports tool/function calling.

supports_vision bool

Whether the provider supports image inputs.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
@dataclass(frozen=True)
class ProviderCapabilities:
    """Declares what a provider can do.

    Attributes:
        models: List of model identifiers this provider exposes.
        capabilities: Set of capability tags (reuses pyagent_router.Capability).
        max_context: Maximum context window in tokens across all models.
        supports_streaming: Whether the provider supports token-level streaming.
        supports_tools: Whether the provider supports tool/function calling.
        supports_vision: Whether the provider supports image inputs.
    """

    models: list[str]
    capabilities: set[Capability] = field(default_factory=lambda: {Capability.GENERAL})
    max_context: int = 128_000
    supports_streaming: bool = True
    supports_tools: bool = False
    supports_vision: bool = False

ProviderInfo dataclass

Snapshot of provider state returned by the registry.

Attributes:

Name Type Description
name str

Provider name.

capabilities ProviderCapabilities

Provider capabilities.

health HealthStatus

Current health status.

metadata dict[str, Any]

Arbitrary extra metadata (e.g. region, version).

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
@dataclass(frozen=True)
class ProviderInfo:
    """Snapshot of provider state returned by the registry.

    Attributes:
        name: Provider name.
        capabilities: Provider capabilities.
        health: Current health status.
        metadata: Arbitrary extra metadata (e.g. region, version).
    """

    name: str
    capabilities: ProviderCapabilities
    health: HealthStatus
    metadata: dict[str, Any] = field(default_factory=dict)

ProviderProtocol

Bases: Protocol

Interface that every LLM provider must implement.

Also satisfies LLMCallable via __call__ which delegates to complete with a default model.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
@runtime_checkable
class ProviderProtocol(Protocol):
    """Interface that every LLM provider must implement.

    Also satisfies ``LLMCallable`` via ``__call__`` which delegates to
    ``complete`` with a default model.
    """

    @property
    def name(self) -> str:
        """Unique identifier for this provider."""
        ...

    @property
    def capabilities(self) -> ProviderCapabilities:
        """Declared capabilities."""
        ...

    async def health(self) -> HealthStatus:
        """Check current health of the provider endpoint."""
        ...

    async def complete(self, messages: list[Message], model: str | None = None) -> str:
        """Generate a completion.

        Args:
            messages: Conversation history.
            model: Optional model override. Uses provider default if ``None``.

        Returns:
            The assistant response text.
        """
        ...

    async def __call__(self, messages: list[Message]) -> str:
        """LLMCallable-compatible entry point — delegates to ``complete``."""
        ...

name property

Unique identifier for this provider.

capabilities property

Declared capabilities.

health() async

Check current health of the provider endpoint.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
async def health(self) -> HealthStatus:
    """Check current health of the provider endpoint."""
    ...

complete(messages, model=None) async

Generate a completion.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history.

required
model str | None

Optional model override. Uses provider default if None.

None

Returns:

Type Description
str

The assistant response text.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
async def complete(self, messages: list[Message], model: str | None = None) -> str:
    """Generate a completion.

    Args:
        messages: Conversation history.
        model: Optional model override. Uses provider default if ``None``.

    Returns:
        The assistant response text.
    """
    ...

__call__(messages) async

LLMCallable-compatible entry point — delegates to complete.

Source code in packages/pyagent-providers/src/pyagent_providers/base.py
async def __call__(self, messages: list[Message]) -> str:
    """LLMCallable-compatible entry point — delegates to ``complete``."""
    ...

CostOptimizer

Compare costs across all registered providers.

Wraps CostEstimator from pyagent-router and iterates over every provider's model list to find the cheapest option.

Parameters:

Name Type Description Default
registry ProviderRegistry

Provider registry to draw models from.

required
cost_estimator CostEstimator | None

Optional CostEstimator instance.

None
Source code in packages/pyagent-providers/src/pyagent_providers/cost.py
class CostOptimizer:
    """Compare costs across all registered providers.

    Wraps ``CostEstimator`` from ``pyagent-router`` and iterates over every
    provider's model list to find the cheapest option.

    Args:
        registry: Provider registry to draw models from.
        cost_estimator: Optional ``CostEstimator`` instance.
    """

    def __init__(
        self,
        registry: ProviderRegistry,
        cost_estimator: CostEstimator | None = None,
    ) -> None:
        self._registry = registry
        self._estimator = cost_estimator or CostEstimator()

    def compare(
        self,
        task: str,
        *,
        healthy_only: bool = True,
        limit: int = 10,
    ) -> list[ProviderCostEstimate]:
        """Estimate cost for every provider + model pair and rank cheapest first.

        Args:
            task: The task text to estimate.
            healthy_only: If ``True`` (default), only include healthy providers.
            limit: Maximum number of results to return.

        Returns:
            Sorted list of ``ProviderCostEstimate`` objects, cheapest first.
        """
        candidates = self._registry.discover(healthy_only=healthy_only)
        results: list[ProviderCostEstimate] = []

        for provider in candidates:
            for model_name in provider.capabilities.models:
                try:
                    est = self._estimator.estimate_from_text(model_name, task)
                    results.append(
                        ProviderCostEstimate(
                            provider_name=provider.name,
                            model=model_name,
                            estimate=est,
                        )
                    )
                except KeyError:
                    continue

        results.sort(key=lambda x: x.estimate.total_cost)
        return results[:limit]

    def cheapest(
        self,
        task: str,
        *,
        healthy_only: bool = True,
    ) -> ProviderCostEstimate | None:
        """Return the single cheapest provider + model for a task.

        Returns:
            The cheapest option, or ``None`` if no estimates could be computed.
        """
        estimates = self.compare(task, healthy_only=healthy_only, limit=1)
        return estimates[0] if estimates else None

    def cheapest_provider(
        self,
        task: str,
        *,
        healthy_only: bool = True,
    ) -> tuple[ProviderProtocol, str] | None:
        """Return the cheapest provider object + model name.

        Convenience method for passing directly to ``Agent`` or patterns.
        """
        est = self.cheapest(task, healthy_only=healthy_only)
        if est is None:
            return None
        provider = self._registry.get(est.provider_name)
        if provider is None:
            return None
        return provider, est.model

compare(task, *, healthy_only=True, limit=10)

Estimate cost for every provider + model pair and rank cheapest first.

Parameters:

Name Type Description Default
task str

The task text to estimate.

required
healthy_only bool

If True (default), only include healthy providers.

True
limit int

Maximum number of results to return.

10

Returns:

Type Description
list[ProviderCostEstimate]

Sorted list of ProviderCostEstimate objects, cheapest first.

Source code in packages/pyagent-providers/src/pyagent_providers/cost.py
def compare(
    self,
    task: str,
    *,
    healthy_only: bool = True,
    limit: int = 10,
) -> list[ProviderCostEstimate]:
    """Estimate cost for every provider + model pair and rank cheapest first.

    Args:
        task: The task text to estimate.
        healthy_only: If ``True`` (default), only include healthy providers.
        limit: Maximum number of results to return.

    Returns:
        Sorted list of ``ProviderCostEstimate`` objects, cheapest first.
    """
    candidates = self._registry.discover(healthy_only=healthy_only)
    results: list[ProviderCostEstimate] = []

    for provider in candidates:
        for model_name in provider.capabilities.models:
            try:
                est = self._estimator.estimate_from_text(model_name, task)
                results.append(
                    ProviderCostEstimate(
                        provider_name=provider.name,
                        model=model_name,
                        estimate=est,
                    )
                )
            except KeyError:
                continue

    results.sort(key=lambda x: x.estimate.total_cost)
    return results[:limit]

cheapest(task, *, healthy_only=True)

Return the single cheapest provider + model for a task.

Returns:

Type Description
ProviderCostEstimate | None

The cheapest option, or None if no estimates could be computed.

Source code in packages/pyagent-providers/src/pyagent_providers/cost.py
def cheapest(
    self,
    task: str,
    *,
    healthy_only: bool = True,
) -> ProviderCostEstimate | None:
    """Return the single cheapest provider + model for a task.

    Returns:
        The cheapest option, or ``None`` if no estimates could be computed.
    """
    estimates = self.compare(task, healthy_only=healthy_only, limit=1)
    return estimates[0] if estimates else None

cheapest_provider(task, *, healthy_only=True)

Return the cheapest provider object + model name.

Convenience method for passing directly to Agent or patterns.

Source code in packages/pyagent-providers/src/pyagent_providers/cost.py
def cheapest_provider(
    self,
    task: str,
    *,
    healthy_only: bool = True,
) -> tuple[ProviderProtocol, str] | None:
    """Return the cheapest provider object + model name.

    Convenience method for passing directly to ``Agent`` or patterns.
    """
    est = self.cheapest(task, healthy_only=healthy_only)
    if est is None:
        return None
    provider = self._registry.get(est.provider_name)
    if provider is None:
        return None
    return provider, est.model

ProviderCostEstimate dataclass

Cost estimate for a specific provider + model pair.

Attributes:

Name Type Description
provider_name str

Provider that would serve this request.

model str

Model within that provider.

estimate CostEstimate

The underlying cost estimate.

Source code in packages/pyagent-providers/src/pyagent_providers/cost.py
@dataclass(frozen=True)
class ProviderCostEstimate:
    """Cost estimate for a specific provider + model pair.

    Attributes:
        provider_name: Provider that would serve this request.
        model: Model within that provider.
        estimate: The underlying cost estimate.
    """

    provider_name: str
    model: str
    estimate: CostEstimate

FallbackChain

Try providers in order until one succeeds.

Optionally integrates with CircuitBreaker from pyagent-patterns to skip providers whose circuits are open.

Parameters:

Name Type Description Default
providers list[ProviderProtocol]

Ordered list of providers to try.

required
circuit_breakers dict[str, Any] | None

Optional mapping of provider name → CircuitBreaker. If a provider's circuit is open, it is skipped.

None
Source code in packages/pyagent-providers/src/pyagent_providers/fallback.py
class FallbackChain:
    """Try providers in order until one succeeds.

    Optionally integrates with ``CircuitBreaker`` from ``pyagent-patterns``
    to skip providers whose circuits are open.

    Args:
        providers: Ordered list of providers to try.
        circuit_breakers: Optional mapping of provider name → CircuitBreaker.
            If a provider's circuit is open, it is skipped.
    """

    def __init__(
        self,
        providers: list[ProviderProtocol],
        circuit_breakers: dict[str, Any] | None = None,
    ) -> None:
        if not providers:
            raise ValueError("FallbackChain requires at least one provider")
        self._providers = list(providers)
        self._circuit_breakers = circuit_breakers or {}

    async def complete(
        self,
        messages: list[Message],
        model: str | None = None,
    ) -> FallbackResult:
        """Try each provider in order.

        Args:
            messages: Conversation messages.
            model: Optional model override passed to each provider.

        Returns:
            ``FallbackResult`` with the first successful output.

        Raises:
            RuntimeError: If all providers fail.
        """
        attempts: list[FallbackAttempt] = []

        for provider in self._providers:
            # Check circuit breaker
            cb = self._circuit_breakers.get(provider.name)
            if cb is not None:
                from pyagent_patterns.recovery import CircuitState

                if hasattr(cb, "state") and cb.state == CircuitState.OPEN:
                    attempts.append(
                        FallbackAttempt(provider.name, success=False, error="circuit_open")
                    )
                    logger.debug("Skipping %s — circuit open", provider.name)
                    continue

            try:
                output = await provider.complete(messages, model)
                attempts.append(FallbackAttempt(provider.name, success=True))

                # Record success on circuit breaker
                if cb is not None and hasattr(cb, "_on_success"):
                    cb._on_success()

                return FallbackResult(
                    output=output,
                    provider_name=provider.name,
                    attempts=attempts,
                )
            except Exception as exc:
                error_msg = f"{type(exc).__name__}: {exc}"
                attempts.append(FallbackAttempt(provider.name, success=False, error=error_msg))
                logger.warning("Provider %s failed: %s", provider.name, error_msg)

                # Record failure on circuit breaker
                if cb is not None and hasattr(cb, "_on_failure"):
                    cb._on_failure()

        failed_names = [a.provider_name for a in attempts]
        raise RuntimeError(
            f"All providers failed in fallback chain: {failed_names}. "
            f"Errors: {[a.error for a in attempts if a.error]}"
        )

    async def __call__(self, messages: list[Message]) -> str:
        """LLMCallable-compatible interface."""
        result = await self.complete(messages)
        return result.output

complete(messages, model=None) async

Try each provider in order.

Parameters:

Name Type Description Default
messages list[Message]

Conversation messages.

required
model str | None

Optional model override passed to each provider.

None

Returns:

Type Description
FallbackResult

FallbackResult with the first successful output.

Raises:

Type Description
RuntimeError

If all providers fail.

Source code in packages/pyagent-providers/src/pyagent_providers/fallback.py
async def complete(
    self,
    messages: list[Message],
    model: str | None = None,
) -> FallbackResult:
    """Try each provider in order.

    Args:
        messages: Conversation messages.
        model: Optional model override passed to each provider.

    Returns:
        ``FallbackResult`` with the first successful output.

    Raises:
        RuntimeError: If all providers fail.
    """
    attempts: list[FallbackAttempt] = []

    for provider in self._providers:
        # Check circuit breaker
        cb = self._circuit_breakers.get(provider.name)
        if cb is not None:
            from pyagent_patterns.recovery import CircuitState

            if hasattr(cb, "state") and cb.state == CircuitState.OPEN:
                attempts.append(
                    FallbackAttempt(provider.name, success=False, error="circuit_open")
                )
                logger.debug("Skipping %s — circuit open", provider.name)
                continue

        try:
            output = await provider.complete(messages, model)
            attempts.append(FallbackAttempt(provider.name, success=True))

            # Record success on circuit breaker
            if cb is not None and hasattr(cb, "_on_success"):
                cb._on_success()

            return FallbackResult(
                output=output,
                provider_name=provider.name,
                attempts=attempts,
            )
        except Exception as exc:
            error_msg = f"{type(exc).__name__}: {exc}"
            attempts.append(FallbackAttempt(provider.name, success=False, error=error_msg))
            logger.warning("Provider %s failed: %s", provider.name, error_msg)

            # Record failure on circuit breaker
            if cb is not None and hasattr(cb, "_on_failure"):
                cb._on_failure()

    failed_names = [a.provider_name for a in attempts]
    raise RuntimeError(
        f"All providers failed in fallback chain: {failed_names}. "
        f"Errors: {[a.error for a in attempts if a.error]}"
    )

__call__(messages) async

LLMCallable-compatible interface.

Source code in packages/pyagent-providers/src/pyagent_providers/fallback.py
async def __call__(self, messages: list[Message]) -> str:
    """LLMCallable-compatible interface."""
    result = await self.complete(messages)
    return result.output

FallbackResult dataclass

Result of a fallback chain execution.

Attributes:

Name Type Description
output str

The completion text from the first successful provider.

provider_name str

Name of the provider that succeeded.

attempts list[FallbackAttempt]

Ordered log of every attempt.

Source code in packages/pyagent-providers/src/pyagent_providers/fallback.py
@dataclass
class FallbackResult:
    """Result of a fallback chain execution.

    Attributes:
        output: The completion text from the first successful provider.
        provider_name: Name of the provider that succeeded.
        attempts: Ordered log of every attempt.
    """

    output: str
    provider_name: str
    attempts: list[FallbackAttempt] = field(default_factory=list)

CapabilityNegotiator

Find the best provider for a given set of task requirements.

Scores providers by capability overlap, context window adequacy, and feature support (streaming, tools, vision).

Parameters:

Name Type Description Default
registry ProviderRegistry

Provider registry to search.

required
Source code in packages/pyagent-providers/src/pyagent_providers/negotiation.py
class CapabilityNegotiator:
    """Find the best provider for a given set of task requirements.

    Scores providers by capability overlap, context window adequacy,
    and feature support (streaming, tools, vision).

    Args:
        registry: Provider registry to search.
    """

    def __init__(self, registry: ProviderRegistry) -> None:
        self._registry = registry

    def negotiate(
        self,
        required_capabilities: set[Capability] | None = None,
        *,
        min_context: int = 0,
        needs_streaming: bool = False,
        needs_tools: bool = False,
        needs_vision: bool = False,
    ) -> NegotiationResult | None:
        """Find the best provider matching the requirements.

        Args:
            required_capabilities: Must-have capabilities.
            min_context: Minimum context window size in tokens.
            needs_streaming: Whether streaming is required.
            needs_tools: Whether tool/function calling is required.
            needs_vision: Whether image input is required.

        Returns:
            Best ``NegotiationResult``, or ``None`` if no provider qualifies.
        """
        required = required_capabilities or set()
        candidates = self._registry.discover(healthy_only=True)

        if not candidates:
            return None

        scored: list[tuple[ProviderProtocol, float, set[Capability], set[Capability]]] = []

        for provider in candidates:
            caps = provider.capabilities

            # Hard filters
            if min_context > 0 and caps.max_context < min_context:
                continue
            if needs_streaming and not caps.supports_streaming:
                continue
            if needs_tools and not caps.supports_tools:
                continue
            if needs_vision and not caps.supports_vision:
                continue

            # Score: capability match ratio
            if required:
                matched = required & caps.capabilities
                missing = required - caps.capabilities
                cap_score = len(matched) / len(required) if required else 1.0
            else:
                matched = caps.capabilities
                missing = set()
                cap_score = 1.0

            # Bonus for extra capabilities
            bonus = min(len(caps.capabilities - required) * 0.05, 0.2) if required else 0.0

            # Bonus for large context
            context_bonus = min(caps.max_context / 1_000_000, 0.1)

            total_score = min(cap_score + bonus + context_bonus, 1.0)
            scored.append((provider, total_score, matched, missing))

        if not scored:
            return None

        scored.sort(key=lambda x: x[1], reverse=True)
        best_provider, best_score, best_matched, best_missing = scored[0]

        # Pick the most capable model from the provider
        models = best_provider.capabilities.models
        model = models[-1] if models else ""

        return NegotiationResult(
            provider=best_provider,
            model=model,
            match_score=best_score,
            matched_capabilities=best_matched,
            missing_capabilities=best_missing,
        )

    def negotiate_all(
        self,
        required_capabilities: set[Capability] | None = None,
        *,
        min_context: int = 0,
        needs_streaming: bool = False,
        needs_tools: bool = False,
        needs_vision: bool = False,
        limit: int = 5,
    ) -> list[NegotiationResult]:
        """Return all matching providers ranked by score.

        Same args as ``negotiate`` plus ``limit`` to cap results.
        """
        required = required_capabilities or set()
        candidates = self._registry.discover(healthy_only=True)
        results: list[NegotiationResult] = []

        for provider in candidates:
            caps = provider.capabilities

            if min_context > 0 and caps.max_context < min_context:
                continue
            if needs_streaming and not caps.supports_streaming:
                continue
            if needs_tools and not caps.supports_tools:
                continue
            if needs_vision and not caps.supports_vision:
                continue

            if required:
                matched = required & caps.capabilities
                missing = required - caps.capabilities
                cap_score = len(matched) / len(required)
            else:
                matched = caps.capabilities
                missing = set()
                cap_score = 1.0

            bonus = min(len(caps.capabilities - required) * 0.05, 0.2) if required else 0.0
            context_bonus = min(caps.max_context / 1_000_000, 0.1)
            total_score = min(cap_score + bonus + context_bonus, 1.0)

            models = provider.capabilities.models
            model = models[-1] if models else ""

            results.append(
                NegotiationResult(
                    provider=provider,
                    model=model,
                    match_score=total_score,
                    matched_capabilities=matched,
                    missing_capabilities=missing,
                )
            )

        results.sort(key=lambda r: r.match_score, reverse=True)
        return results[:limit]

negotiate(required_capabilities=None, *, min_context=0, needs_streaming=False, needs_tools=False, needs_vision=False)

Find the best provider matching the requirements.

Parameters:

Name Type Description Default
required_capabilities set[Capability] | None

Must-have capabilities.

None
min_context int

Minimum context window size in tokens.

0
needs_streaming bool

Whether streaming is required.

False
needs_tools bool

Whether tool/function calling is required.

False
needs_vision bool

Whether image input is required.

False

Returns:

Type Description
NegotiationResult | None

Best NegotiationResult, or None if no provider qualifies.

Source code in packages/pyagent-providers/src/pyagent_providers/negotiation.py
def negotiate(
    self,
    required_capabilities: set[Capability] | None = None,
    *,
    min_context: int = 0,
    needs_streaming: bool = False,
    needs_tools: bool = False,
    needs_vision: bool = False,
) -> NegotiationResult | None:
    """Find the best provider matching the requirements.

    Args:
        required_capabilities: Must-have capabilities.
        min_context: Minimum context window size in tokens.
        needs_streaming: Whether streaming is required.
        needs_tools: Whether tool/function calling is required.
        needs_vision: Whether image input is required.

    Returns:
        Best ``NegotiationResult``, or ``None`` if no provider qualifies.
    """
    required = required_capabilities or set()
    candidates = self._registry.discover(healthy_only=True)

    if not candidates:
        return None

    scored: list[tuple[ProviderProtocol, float, set[Capability], set[Capability]]] = []

    for provider in candidates:
        caps = provider.capabilities

        # Hard filters
        if min_context > 0 and caps.max_context < min_context:
            continue
        if needs_streaming and not caps.supports_streaming:
            continue
        if needs_tools and not caps.supports_tools:
            continue
        if needs_vision and not caps.supports_vision:
            continue

        # Score: capability match ratio
        if required:
            matched = required & caps.capabilities
            missing = required - caps.capabilities
            cap_score = len(matched) / len(required) if required else 1.0
        else:
            matched = caps.capabilities
            missing = set()
            cap_score = 1.0

        # Bonus for extra capabilities
        bonus = min(len(caps.capabilities - required) * 0.05, 0.2) if required else 0.0

        # Bonus for large context
        context_bonus = min(caps.max_context / 1_000_000, 0.1)

        total_score = min(cap_score + bonus + context_bonus, 1.0)
        scored.append((provider, total_score, matched, missing))

    if not scored:
        return None

    scored.sort(key=lambda x: x[1], reverse=True)
    best_provider, best_score, best_matched, best_missing = scored[0]

    # Pick the most capable model from the provider
    models = best_provider.capabilities.models
    model = models[-1] if models else ""

    return NegotiationResult(
        provider=best_provider,
        model=model,
        match_score=best_score,
        matched_capabilities=best_matched,
        missing_capabilities=best_missing,
    )

negotiate_all(required_capabilities=None, *, min_context=0, needs_streaming=False, needs_tools=False, needs_vision=False, limit=5)

Return all matching providers ranked by score.

Same args as negotiate plus limit to cap results.

Source code in packages/pyagent-providers/src/pyagent_providers/negotiation.py
def negotiate_all(
    self,
    required_capabilities: set[Capability] | None = None,
    *,
    min_context: int = 0,
    needs_streaming: bool = False,
    needs_tools: bool = False,
    needs_vision: bool = False,
    limit: int = 5,
) -> list[NegotiationResult]:
    """Return all matching providers ranked by score.

    Same args as ``negotiate`` plus ``limit`` to cap results.
    """
    required = required_capabilities or set()
    candidates = self._registry.discover(healthy_only=True)
    results: list[NegotiationResult] = []

    for provider in candidates:
        caps = provider.capabilities

        if min_context > 0 and caps.max_context < min_context:
            continue
        if needs_streaming and not caps.supports_streaming:
            continue
        if needs_tools and not caps.supports_tools:
            continue
        if needs_vision and not caps.supports_vision:
            continue

        if required:
            matched = required & caps.capabilities
            missing = required - caps.capabilities
            cap_score = len(matched) / len(required)
        else:
            matched = caps.capabilities
            missing = set()
            cap_score = 1.0

        bonus = min(len(caps.capabilities - required) * 0.05, 0.2) if required else 0.0
        context_bonus = min(caps.max_context / 1_000_000, 0.1)
        total_score = min(cap_score + bonus + context_bonus, 1.0)

        models = provider.capabilities.models
        model = models[-1] if models else ""

        results.append(
            NegotiationResult(
                provider=provider,
                model=model,
                match_score=total_score,
                matched_capabilities=matched,
                missing_capabilities=missing,
            )
        )

    results.sort(key=lambda r: r.match_score, reverse=True)
    return results[:limit]

NegotiationResult dataclass

Result of capability negotiation.

Attributes:

Name Type Description
provider ProviderProtocol

The best-matched provider.

model str

Recommended model from that provider.

match_score float

How well the provider matches (0.0-1.0).

matched_capabilities set[Capability]

Which requested capabilities the provider satisfies.

missing_capabilities set[Capability]

Which requested capabilities the provider lacks.

Source code in packages/pyagent-providers/src/pyagent_providers/negotiation.py
@dataclass(frozen=True)
class NegotiationResult:
    """Result of capability negotiation.

    Attributes:
        provider: The best-matched provider.
        model: Recommended model from that provider.
        match_score: How well the provider matches (0.0-1.0).
        matched_capabilities: Which requested capabilities the provider satisfies.
        missing_capabilities: Which requested capabilities the provider lacks.
    """

    provider: ProviderProtocol
    model: str
    match_score: float
    matched_capabilities: set[Capability] = field(default_factory=set)
    missing_capabilities: set[Capability] = field(default_factory=set)

ProviderRegistry

Central registry of all available LLM providers.

Supports registration, discovery by capability, and periodic health checks.

Parameters:

Name Type Description Default
auto_health_check bool

If True, run a health check on registration.

False
Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
class ProviderRegistry:
    """Central registry of all available LLM providers.

    Supports registration, discovery by capability, and periodic health checks.

    Args:
        auto_health_check: If ``True``, run a health check on registration.
    """

    def __init__(self, *, auto_health_check: bool = False) -> None:
        self._providers: dict[str, ProviderProtocol] = {}
        self._health_cache: dict[str, HealthStatus] = {}
        self._metadata: dict[str, dict[str, Any]] = {}
        self._auto_health_check = auto_health_check

    # ------------------------------------------------------------------
    # Registration
    # ------------------------------------------------------------------

    async def register(
        self,
        provider: ProviderProtocol,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Register a provider.

        Args:
            provider: Any object satisfying ``ProviderProtocol``.
            metadata: Optional extra metadata (region, version, etc.).
        """
        self._providers[provider.name] = provider
        self._metadata[provider.name] = metadata or {}
        if self._auto_health_check:
            self._health_cache[provider.name] = await provider.health()
        else:
            self._health_cache[provider.name] = HealthStatus.HEALTHY

    def register_sync(
        self,
        provider: ProviderProtocol,
        metadata: dict[str, Any] | None = None,
    ) -> None:
        """Synchronous registration (skips health check)."""
        self._providers[provider.name] = provider
        self._metadata[provider.name] = metadata or {}
        self._health_cache[provider.name] = HealthStatus.HEALTHY

    def remove(self, name: str) -> None:
        """Remove a provider by name."""
        self._providers.pop(name, None)
        self._health_cache.pop(name, None)
        self._metadata.pop(name, None)

    # ------------------------------------------------------------------
    # Discovery
    # ------------------------------------------------------------------

    def get(self, name: str) -> ProviderProtocol | None:
        """Look up a provider by name."""
        return self._providers.get(name)

    def list_providers(self) -> list[ProviderInfo]:
        """Return info snapshots for all registered providers."""
        return [
            ProviderInfo(
                name=p.name,
                capabilities=p.capabilities,
                health=self._health_cache.get(p.name, HealthStatus.HEALTHY),
                metadata=self._metadata.get(p.name, {}),
            )
            for p in self._providers.values()
        ]

    def discover(
        self,
        required_capabilities: set[Capability] | None = None,
        *,
        healthy_only: bool = True,
    ) -> list[ProviderProtocol]:
        """Find providers matching capability and health requirements.

        Args:
            required_capabilities: If given, only return providers whose
                capabilities are a superset of this set.
            healthy_only: If ``True`` (default), exclude unhealthy providers.

        Returns:
            List of matching providers.
        """
        results: list[ProviderProtocol] = []
        for p in self._providers.values():
            if healthy_only and self._health_cache.get(p.name) == HealthStatus.UNHEALTHY:
                continue
            if required_capabilities and not required_capabilities.issubset(
                p.capabilities.capabilities
            ):
                continue
            results.append(p)
        return results

    # ------------------------------------------------------------------
    # Health
    # ------------------------------------------------------------------

    async def check_health(self, name: str | None = None) -> dict[str, HealthStatus]:
        """Run health checks.

        Args:
            name: If given, check only this provider. Otherwise check all.

        Returns:
            Mapping of provider name → health status.
        """
        targets = (
            {name: self._providers[name]}
            if name and name in self._providers
            else dict(self._providers)
        )

        async def _check(n: str, p: ProviderProtocol) -> tuple[str, HealthStatus]:
            try:
                status = await p.health()
            except Exception:
                status = HealthStatus.UNHEALTHY
            return n, status

        results = await asyncio.gather(*[_check(n, p) for n, p in targets.items()])
        for n, status in results:
            self._health_cache[n] = status
        return dict(results)

    async def remove_unhealthy(self) -> list[str]:
        """Run health checks and remove all unhealthy providers.

        Returns:
            List of removed provider names.
        """
        statuses = await self.check_health()
        removed: list[str] = []
        for name, status in statuses.items():
            if status == HealthStatus.UNHEALTHY:
                self.remove(name)
                removed.append(name)
        return removed

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def count(self) -> int:
        return len(self._providers)

    def __contains__(self, name: str) -> bool:
        return name in self._providers

    def __len__(self) -> int:
        return len(self._providers)

register(provider, metadata=None) async

Register a provider.

Parameters:

Name Type Description Default
provider ProviderProtocol

Any object satisfying ProviderProtocol.

required
metadata dict[str, Any] | None

Optional extra metadata (region, version, etc.).

None
Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
async def register(
    self,
    provider: ProviderProtocol,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Register a provider.

    Args:
        provider: Any object satisfying ``ProviderProtocol``.
        metadata: Optional extra metadata (region, version, etc.).
    """
    self._providers[provider.name] = provider
    self._metadata[provider.name] = metadata or {}
    if self._auto_health_check:
        self._health_cache[provider.name] = await provider.health()
    else:
        self._health_cache[provider.name] = HealthStatus.HEALTHY

register_sync(provider, metadata=None)

Synchronous registration (skips health check).

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
def register_sync(
    self,
    provider: ProviderProtocol,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Synchronous registration (skips health check)."""
    self._providers[provider.name] = provider
    self._metadata[provider.name] = metadata or {}
    self._health_cache[provider.name] = HealthStatus.HEALTHY

remove(name)

Remove a provider by name.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
def remove(self, name: str) -> None:
    """Remove a provider by name."""
    self._providers.pop(name, None)
    self._health_cache.pop(name, None)
    self._metadata.pop(name, None)

get(name)

Look up a provider by name.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
def get(self, name: str) -> ProviderProtocol | None:
    """Look up a provider by name."""
    return self._providers.get(name)

list_providers()

Return info snapshots for all registered providers.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
def list_providers(self) -> list[ProviderInfo]:
    """Return info snapshots for all registered providers."""
    return [
        ProviderInfo(
            name=p.name,
            capabilities=p.capabilities,
            health=self._health_cache.get(p.name, HealthStatus.HEALTHY),
            metadata=self._metadata.get(p.name, {}),
        )
        for p in self._providers.values()
    ]

discover(required_capabilities=None, *, healthy_only=True)

Find providers matching capability and health requirements.

Parameters:

Name Type Description Default
required_capabilities set[Capability] | None

If given, only return providers whose capabilities are a superset of this set.

None
healthy_only bool

If True (default), exclude unhealthy providers.

True

Returns:

Type Description
list[ProviderProtocol]

List of matching providers.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
def discover(
    self,
    required_capabilities: set[Capability] | None = None,
    *,
    healthy_only: bool = True,
) -> list[ProviderProtocol]:
    """Find providers matching capability and health requirements.

    Args:
        required_capabilities: If given, only return providers whose
            capabilities are a superset of this set.
        healthy_only: If ``True`` (default), exclude unhealthy providers.

    Returns:
        List of matching providers.
    """
    results: list[ProviderProtocol] = []
    for p in self._providers.values():
        if healthy_only and self._health_cache.get(p.name) == HealthStatus.UNHEALTHY:
            continue
        if required_capabilities and not required_capabilities.issubset(
            p.capabilities.capabilities
        ):
            continue
        results.append(p)
    return results

check_health(name=None) async

Run health checks.

Parameters:

Name Type Description Default
name str | None

If given, check only this provider. Otherwise check all.

None

Returns:

Type Description
dict[str, HealthStatus]

Mapping of provider name → health status.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
async def check_health(self, name: str | None = None) -> dict[str, HealthStatus]:
    """Run health checks.

    Args:
        name: If given, check only this provider. Otherwise check all.

    Returns:
        Mapping of provider name → health status.
    """
    targets = (
        {name: self._providers[name]}
        if name and name in self._providers
        else dict(self._providers)
    )

    async def _check(n: str, p: ProviderProtocol) -> tuple[str, HealthStatus]:
        try:
            status = await p.health()
        except Exception:
            status = HealthStatus.UNHEALTHY
        return n, status

    results = await asyncio.gather(*[_check(n, p) for n, p in targets.items()])
    for n, status in results:
        self._health_cache[n] = status
    return dict(results)

remove_unhealthy() async

Run health checks and remove all unhealthy providers.

Returns:

Type Description
list[str]

List of removed provider names.

Source code in packages/pyagent-providers/src/pyagent_providers/registry.py
async def remove_unhealthy(self) -> list[str]:
    """Run health checks and remove all unhealthy providers.

    Returns:
        List of removed provider names.
    """
    statuses = await self.check_health()
    removed: list[str] = []
    for name, status in statuses.items():
        if status == HealthStatus.UNHEALTHY:
            self.remove(name)
            removed.append(name)
    return removed

ProviderRouter

Route requests to the best provider + model pair.

Parameters:

Name Type Description Default
registry ProviderRegistry

The provider registry to draw from.

required
strategy RoutingStrategy

Routing strategy to use.

CAPABILITY_FIRST
cost_estimator CostEstimator | None

Optional cost estimator for COST_FIRST strategy.

None
scorer DifficultyScorer | None

Optional difficulty scorer for capability-based strategies.

None
Source code in packages/pyagent-providers/src/pyagent_providers/router.py
class ProviderRouter:
    """Route requests to the best provider + model pair.

    Args:
        registry: The provider registry to draw from.
        strategy: Routing strategy to use.
        cost_estimator: Optional cost estimator for ``COST_FIRST`` strategy.
        scorer: Optional difficulty scorer for capability-based strategies.
    """

    def __init__(
        self,
        registry: ProviderRegistry,
        strategy: RoutingStrategy = RoutingStrategy.CAPABILITY_FIRST,
        cost_estimator: CostEstimator | None = None,
        scorer: DifficultyScorer | None = None,
    ) -> None:
        self._registry = registry
        self._strategy = strategy
        self._estimator = cost_estimator or CostEstimator()
        self._scorer = scorer or DifficultyScorer()
        self._rr_cycle: itertools.cycle[str] | None = None

    async def route(
        self,
        messages: list[Message],
        required: set[Capability] | None = None,
    ) -> tuple[ProviderProtocol, str]:
        """Select the best provider and model for a request.

        Args:
            messages: Conversation messages (last user message used for scoring).
            required: Optional capability requirements.

        Returns:
            Tuple of (provider, model_name).

        Raises:
            RuntimeError: If no suitable provider is found.
        """
        candidates = self._registry.discover(required, healthy_only=True)
        if not candidates:
            raise RuntimeError(f"No healthy provider found matching capabilities={required}")

        if self._strategy == RoutingStrategy.ROUND_ROBIN:
            return self._route_round_robin(candidates)
        if self._strategy == RoutingStrategy.COST_FIRST:
            return self._route_cost_first(candidates, messages)
        if self._strategy == RoutingStrategy.LATENCY_FIRST:
            return self._route_latency_first(candidates)
        # Default: CAPABILITY_FIRST
        return self._route_capability_first(candidates, messages)

    # ------------------------------------------------------------------
    # Strategy implementations
    # ------------------------------------------------------------------

    def _route_capability_first(
        self,
        candidates: list[ProviderProtocol],
        messages: list[Message],
    ) -> tuple[ProviderProtocol, str]:
        """Pick the provider with the broadest capability set, then its best model."""
        task_text = self._extract_task(messages)
        difficulty = self._scorer.score(task_text)

        best: ProviderProtocol | None = None
        best_score = -1
        for p in candidates:
            score = len(p.capabilities.capabilities) + p.capabilities.max_context // 100_000
            if score > best_score:
                best_score = score
                best = p

        provider = best or candidates[0]
        model = self._pick_model(provider, difficulty.score)
        return provider, model

    def _route_cost_first(
        self,
        candidates: list[ProviderProtocol],
        messages: list[Message],
    ) -> tuple[ProviderProtocol, str]:
        """Pick the cheapest provider + model combo."""
        task_text = self._extract_task(messages)
        cheapest_cost = float("inf")
        chosen_provider = candidates[0]
        chosen_model = (
            candidates[0].capabilities.models[0] if candidates[0].capabilities.models else ""
        )

        for p in candidates:
            for model_name in p.capabilities.models:
                try:
                    est = self._estimator.estimate_from_text(model_name, task_text)
                    if est.total_cost < cheapest_cost:
                        cheapest_cost = est.total_cost
                        chosen_provider = p
                        chosen_model = model_name
                except KeyError:
                    continue

        return chosen_provider, chosen_model

    def _route_latency_first(
        self,
        candidates: list[ProviderProtocol],
    ) -> tuple[ProviderProtocol, str]:
        """Pick the provider with the smallest model (proxy for low latency)."""
        best: ProviderProtocol | None = None
        best_ctx = float("inf")
        for p in candidates:
            if p.capabilities.max_context < best_ctx:
                best_ctx = p.capabilities.max_context
                best = p
        provider = best or candidates[0]
        model = provider.capabilities.models[0] if provider.capabilities.models else ""
        return provider, model

    def _route_round_robin(
        self,
        candidates: list[ProviderProtocol],
    ) -> tuple[ProviderProtocol, str]:
        """Cycle through providers in order."""
        names = [p.name for p in candidates]
        if self._rr_cycle is None or set(names) != getattr(self, "_rr_names", set()):
            self._rr_cycle = itertools.cycle(names)
            self._rr_names = set(names)
        chosen_name = next(self._rr_cycle)
        provider = next(p for p in candidates if p.name == chosen_name)
        model = provider.capabilities.models[0] if provider.capabilities.models else ""
        return provider, model

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _extract_task(messages: list[Message]) -> str:
        """Get the last user message as a task string."""
        for m in reversed(messages):
            if m.role.value == "user":
                return m.content
        return ""

    @staticmethod
    def _pick_model(provider: ProviderProtocol, difficulty: int) -> str:
        """Pick the first model from the provider's model list.

        A more sophisticated version could match difficulty ranges per model.
        """
        models = provider.capabilities.models
        if not models:
            return ""
        # Simple heuristic: last model is most capable
        if difficulty >= 7 and len(models) > 1:
            return models[-1]
        return models[0]

route(messages, required=None) async

Select the best provider and model for a request.

Parameters:

Name Type Description Default
messages list[Message]

Conversation messages (last user message used for scoring).

required
required set[Capability] | None

Optional capability requirements.

None

Returns:

Type Description
tuple[ProviderProtocol, str]

Tuple of (provider, model_name).

Raises:

Type Description
RuntimeError

If no suitable provider is found.

Source code in packages/pyagent-providers/src/pyagent_providers/router.py
async def route(
    self,
    messages: list[Message],
    required: set[Capability] | None = None,
) -> tuple[ProviderProtocol, str]:
    """Select the best provider and model for a request.

    Args:
        messages: Conversation messages (last user message used for scoring).
        required: Optional capability requirements.

    Returns:
        Tuple of (provider, model_name).

    Raises:
        RuntimeError: If no suitable provider is found.
    """
    candidates = self._registry.discover(required, healthy_only=True)
    if not candidates:
        raise RuntimeError(f"No healthy provider found matching capabilities={required}")

    if self._strategy == RoutingStrategy.ROUND_ROBIN:
        return self._route_round_robin(candidates)
    if self._strategy == RoutingStrategy.COST_FIRST:
        return self._route_cost_first(candidates, messages)
    if self._strategy == RoutingStrategy.LATENCY_FIRST:
        return self._route_latency_first(candidates)
    # Default: CAPABILITY_FIRST
    return self._route_capability_first(candidates, messages)

RoutingStrategy

Bases: StrEnum

How the router picks a provider + model pair.

Source code in packages/pyagent-providers/src/pyagent_providers/router.py
class RoutingStrategy(StrEnum):
    """How the router picks a provider + model pair."""

    CAPABILITY_FIRST = "capability_first"
    COST_FIRST = "cost_first"
    LATENCY_FIRST = "latency_first"
    ROUND_ROBIN = "round_robin"

TracedProvider

Wrapper that emits trace events for every complete() / __call__ invocation.

Wraps any ProviderProtocol implementation without modifying the original.

Parameters:

Name Type Description Default
provider ProviderProtocol

The underlying provider to delegate calls to.

required
trace_bus Any

A TraceEventBus instance (from pyagent-trace).

required
Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
class TracedProvider:
    """Wrapper that emits trace events for every ``complete()`` / ``__call__`` invocation.

    Wraps any ``ProviderProtocol`` implementation without modifying the original.

    Args:
        provider: The underlying provider to delegate calls to.
        trace_bus: A ``TraceEventBus`` instance (from pyagent-trace).
    """

    def __init__(self, provider: ProviderProtocol, trace_bus: Any) -> None:
        self._provider = provider
        self._trace_bus = trace_bus

    @property
    def name(self) -> str:
        return self._provider.name

    @property
    def capabilities(self) -> ProviderCapabilities:
        return self._provider.capabilities

    async def health(self) -> HealthStatus:
        return await self._provider.health()

    async def complete(self, messages: list[Message], model: str | None = None) -> str:
        """Delegate to wrapped provider and emit trace events."""
        start = time.time()
        self._emit(
            "provider_call_start",
            {
                "provider": self.name,
                "model": model or "default",
                "input_messages": len(messages),
            },
        )

        try:
            result = await self._provider.complete(messages, model)
        except Exception as exc:
            self._emit(
                "provider_call_error",
                {
                    "provider": self.name,
                    "error": str(exc),
                    "duration_seconds": time.time() - start,
                },
            )
            raise

        duration = time.time() - start
        input_tokens = sum(len(m.content) for m in messages) // 4
        output_tokens = len(result) // 4

        self._emit(
            "provider_call_end",
            {
                "provider": self.name,
                "model": model or "default",
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "duration_seconds": duration,
            },
        )

        return result

    async def __call__(self, messages: list[Message]) -> str:
        """LLMCallable-compatible entry point — delegates to ``complete``."""
        return await self.complete(messages)

    def _emit(self, event_type: str, payload: dict[str, Any]) -> None:
        """Emit a trace event to the bus."""
        try:
            from pyagent_trace.events import TraceEvent

            self._trace_bus.emit(
                TraceEvent(
                    timestamp=time.time(),
                    event_type=event_type,
                    agent_name=payload.get("provider", ""),
                    payload=payload,
                )
            )
        except Exception:
            pass  # trace must never break provider execution

complete(messages, model=None) async

Delegate to wrapped provider and emit trace events.

Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
async def complete(self, messages: list[Message], model: str | None = None) -> str:
    """Delegate to wrapped provider and emit trace events."""
    start = time.time()
    self._emit(
        "provider_call_start",
        {
            "provider": self.name,
            "model": model or "default",
            "input_messages": len(messages),
        },
    )

    try:
        result = await self._provider.complete(messages, model)
    except Exception as exc:
        self._emit(
            "provider_call_error",
            {
                "provider": self.name,
                "error": str(exc),
                "duration_seconds": time.time() - start,
            },
        )
        raise

    duration = time.time() - start
    input_tokens = sum(len(m.content) for m in messages) // 4
    output_tokens = len(result) // 4

    self._emit(
        "provider_call_end",
        {
            "provider": self.name,
            "model": model or "default",
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "duration_seconds": duration,
        },
    )

    return result

__call__(messages) async

LLMCallable-compatible entry point — delegates to complete.

Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
async def __call__(self, messages: list[Message]) -> str:
    """LLMCallable-compatible entry point — delegates to ``complete``."""
    return await self.complete(messages)

TracedProvider

pyagent_providers.traced.TracedProvider

Wrapper that emits trace events for every complete() / __call__ invocation.

Wraps any ProviderProtocol implementation without modifying the original.

Parameters:

Name Type Description Default
provider ProviderProtocol

The underlying provider to delegate calls to.

required
trace_bus Any

A TraceEventBus instance (from pyagent-trace).

required
Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
class TracedProvider:
    """Wrapper that emits trace events for every ``complete()`` / ``__call__`` invocation.

    Wraps any ``ProviderProtocol`` implementation without modifying the original.

    Args:
        provider: The underlying provider to delegate calls to.
        trace_bus: A ``TraceEventBus`` instance (from pyagent-trace).
    """

    def __init__(self, provider: ProviderProtocol, trace_bus: Any) -> None:
        self._provider = provider
        self._trace_bus = trace_bus

    @property
    def name(self) -> str:
        return self._provider.name

    @property
    def capabilities(self) -> ProviderCapabilities:
        return self._provider.capabilities

    async def health(self) -> HealthStatus:
        return await self._provider.health()

    async def complete(self, messages: list[Message], model: str | None = None) -> str:
        """Delegate to wrapped provider and emit trace events."""
        start = time.time()
        self._emit(
            "provider_call_start",
            {
                "provider": self.name,
                "model": model or "default",
                "input_messages": len(messages),
            },
        )

        try:
            result = await self._provider.complete(messages, model)
        except Exception as exc:
            self._emit(
                "provider_call_error",
                {
                    "provider": self.name,
                    "error": str(exc),
                    "duration_seconds": time.time() - start,
                },
            )
            raise

        duration = time.time() - start
        input_tokens = sum(len(m.content) for m in messages) // 4
        output_tokens = len(result) // 4

        self._emit(
            "provider_call_end",
            {
                "provider": self.name,
                "model": model or "default",
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "duration_seconds": duration,
            },
        )

        return result

    async def __call__(self, messages: list[Message]) -> str:
        """LLMCallable-compatible entry point — delegates to ``complete``."""
        return await self.complete(messages)

    def _emit(self, event_type: str, payload: dict[str, Any]) -> None:
        """Emit a trace event to the bus."""
        try:
            from pyagent_trace.events import TraceEvent

            self._trace_bus.emit(
                TraceEvent(
                    timestamp=time.time(),
                    event_type=event_type,
                    agent_name=payload.get("provider", ""),
                    payload=payload,
                )
            )
        except Exception:
            pass  # trace must never break provider execution

__call__(messages) async

LLMCallable-compatible entry point — delegates to complete.

Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
async def __call__(self, messages: list[Message]) -> str:
    """LLMCallable-compatible entry point — delegates to ``complete``."""
    return await self.complete(messages)

complete(messages, model=None) async

Delegate to wrapped provider and emit trace events.

Source code in packages/pyagent-providers/src/pyagent_providers/traced.py
async def complete(self, messages: list[Message], model: str | None = None) -> str:
    """Delegate to wrapped provider and emit trace events."""
    start = time.time()
    self._emit(
        "provider_call_start",
        {
            "provider": self.name,
            "model": model or "default",
            "input_messages": len(messages),
        },
    )

    try:
        result = await self._provider.complete(messages, model)
    except Exception as exc:
        self._emit(
            "provider_call_error",
            {
                "provider": self.name,
                "error": str(exc),
                "duration_seconds": time.time() - start,
            },
        )
        raise

    duration = time.time() - start
    input_tokens = sum(len(m.content) for m in messages) // 4
    output_tokens = len(result) // 4

    self._emit(
        "provider_call_end",
        {
            "provider": self.name,
            "model": model or "default",
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "duration_seconds": duration,
        },
    )

    return result