Skip to content

pyagent-compress API Reference

pyagent_compress.compressor.MessageCompressor

Compress inter-agent messages by removing filler and extracting key sentences.

Parameters:

Name Type Description Default
target_ratio float

Target compression ratio (0.3 = keep 30% of tokens).

0.5
min_sentence_length int

Minimum characters for a sentence to be kept.

20
remove_filler bool

Whether to strip filler phrases.

True
Source code in packages/pyagent-compress/src/pyagent_compress/compressor.py
class MessageCompressor:
    """Compress inter-agent messages by removing filler and extracting key sentences.

    Args:
        target_ratio: Target compression ratio (0.3 = keep 30% of tokens).
        min_sentence_length: Minimum characters for a sentence to be kept.
        remove_filler: Whether to strip filler phrases.
    """

    def __init__(
        self,
        target_ratio: float = 0.5,
        min_sentence_length: int = 20,
        remove_filler: bool = True,
    ) -> None:
        self._target_ratio = target_ratio
        self._min_sentence_length = min_sentence_length
        self._remove_filler = remove_filler

    def compress(self, text: str) -> CompressionResult:
        """Compress a message text.

        Strategy:
        1. Remove filler phrases
        2. Split into sentences
        3. Score sentences by information density
        4. Keep top sentences until target ratio met
        """
        original_tokens = len(text) // 4

        if original_tokens <= 20:
            # Too short to compress meaningfully
            return CompressionResult(text, text, original_tokens, original_tokens)

        working = text

        # Step 1: Remove filler
        if self._remove_filler:
            for pattern in _FILLER_PATTERNS:
                working = re.sub(pattern, "", working)

        # Step 2: Split into sentences
        sentences = re.split(r"(?<=[.!?])\s+", working)
        sentences = [s.strip() for s in sentences if len(s.strip()) >= self._min_sentence_length]

        if not sentences:
            compressed_tokens = len(working) // 4
            return CompressionResult(text, working.strip(), original_tokens, compressed_tokens)

        # Step 3: Score sentences
        scored = [(self._score_sentence(s), s) for s in sentences]
        scored.sort(key=lambda x: x[0], reverse=True)

        # Step 4: Keep top sentences until target ratio
        target_tokens = int(original_tokens * self._target_ratio)
        kept: list[str] = []
        running_tokens = 0

        for _score, sentence in scored:
            sent_tokens = len(sentence) // 4
            if running_tokens + sent_tokens > target_tokens and kept:
                break
            kept.append(sentence)
            running_tokens += sent_tokens

        # Restore original order
        ordered = [s for s in sentences if s in kept]
        compressed = " ".join(ordered)
        compressed_tokens = len(compressed) // 4

        return CompressionResult(text, compressed, original_tokens, compressed_tokens)

    @staticmethod
    def _score_sentence(sentence: str) -> float:
        """Score a sentence by information density (higher = more informative)."""
        score = 0.0

        # Longer sentences tend to carry more information (diminishing returns)
        score += min(len(sentence.split()) / 20, 1.0) * 0.3

        # Sentences with numbers/data are more informative
        numbers = len(re.findall(r"\d+\.?\d*", sentence))
        score += min(numbers / 3, 1.0) * 0.3

        # Sentences with technical terms
        technical = len(
            re.findall(
                r"(?i)\b(result|conclusion|therefore|because|shows|indicates|found|"
                r"evidence|data|percent|increase|decrease|significant)\b",
                sentence,
            )
        )
        score += min(technical / 3, 1.0) * 0.2

        # Shorter sentences with substance are preferred
        if 10 <= len(sentence.split()) <= 25:
            score += 0.2

        return score

compress(text)

Compress a message text.

Strategy: 1. Remove filler phrases 2. Split into sentences 3. Score sentences by information density 4. Keep top sentences until target ratio met

Source code in packages/pyagent-compress/src/pyagent_compress/compressor.py
def compress(self, text: str) -> CompressionResult:
    """Compress a message text.

    Strategy:
    1. Remove filler phrases
    2. Split into sentences
    3. Score sentences by information density
    4. Keep top sentences until target ratio met
    """
    original_tokens = len(text) // 4

    if original_tokens <= 20:
        # Too short to compress meaningfully
        return CompressionResult(text, text, original_tokens, original_tokens)

    working = text

    # Step 1: Remove filler
    if self._remove_filler:
        for pattern in _FILLER_PATTERNS:
            working = re.sub(pattern, "", working)

    # Step 2: Split into sentences
    sentences = re.split(r"(?<=[.!?])\s+", working)
    sentences = [s.strip() for s in sentences if len(s.strip()) >= self._min_sentence_length]

    if not sentences:
        compressed_tokens = len(working) // 4
        return CompressionResult(text, working.strip(), original_tokens, compressed_tokens)

    # Step 3: Score sentences
    scored = [(self._score_sentence(s), s) for s in sentences]
    scored.sort(key=lambda x: x[0], reverse=True)

    # Step 4: Keep top sentences until target ratio
    target_tokens = int(original_tokens * self._target_ratio)
    kept: list[str] = []
    running_tokens = 0

    for _score, sentence in scored:
        sent_tokens = len(sentence) // 4
        if running_tokens + sent_tokens > target_tokens and kept:
            break
        kept.append(sentence)
        running_tokens += sent_tokens

    # Restore original order
    ordered = [s for s in sentences if s in kept]
    compressed = " ".join(ordered)
    compressed_tokens = len(compressed) // 4

    return CompressionResult(text, compressed, original_tokens, compressed_tokens)

pyagent_compress.compressor.CompressionResult dataclass

Result of message compression.

Attributes:

Name Type Description
original str

The original text.

compressed str

The compressed text.

original_tokens int

Estimated original token count.

compressed_tokens int

Estimated compressed token count.

savings_pct float

Percentage of tokens saved (0-1).

Source code in packages/pyagent-compress/src/pyagent_compress/compressor.py
@dataclass(frozen=True)
class CompressionResult:
    """Result of message compression.

    Attributes:
        original: The original text.
        compressed: The compressed text.
        original_tokens: Estimated original token count.
        compressed_tokens: Estimated compressed token count.
        savings_pct: Percentage of tokens saved (0-1).
    """

    original: str
    compressed: str
    original_tokens: int
    compressed_tokens: int

    @property
    def savings_pct(self) -> float:
        if self.original_tokens == 0:
            return 0.0
        return 1.0 - (self.compressed_tokens / self.original_tokens)

pyagent_compress.pruner.AgentPruner

Detect and flag non-contributing agents for removal.

Scores each agent's contribution based on: - Unique information (not repeated from other agents) - Response diversity (different from own previous responses) - Task relevance (overlap with original task keywords)

Parameters:

Name Type Description Default
min_contribution float

Minimum contribution score (0-1) to keep an agent.

0.3
window_size int

Number of recent messages to analyze per agent.

5
Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
class AgentPruner:
    """Detect and flag non-contributing agents for removal.

    Scores each agent's contribution based on:
    - Unique information (not repeated from other agents)
    - Response diversity (different from own previous responses)
    - Task relevance (overlap with original task keywords)

    Args:
        min_contribution: Minimum contribution score (0-1) to keep an agent.
        window_size: Number of recent messages to analyze per agent.
    """

    def __init__(self, min_contribution: float = 0.3, window_size: int = 5) -> None:
        self._min_contribution = min_contribution
        self._window_size = window_size

    def score_agents(
        self,
        messages: list[Message],
        task: str,
    ) -> list[ContributionScore]:
        """Score each agent's contribution from message history."""
        # Group messages by agent name
        by_agent: dict[str, list[str]] = {}
        for msg in messages:
            name = msg.name or "unknown"
            by_agent.setdefault(name, []).append(msg.content)

        [m.content for m in messages]
        task_words = set(task.lower().split())

        scores: list[ContributionScore] = []
        for agent_name, contents in by_agent.items():
            recent = contents[-self._window_size :]

            # Unique info: how much of this agent's content is NOT in other agents' content
            other_text = " ".join(
                c for name, cs in by_agent.items() if name != agent_name for c in cs
            ).lower()
            unique_words = set()
            for content in recent:
                for word in content.lower().split():
                    if word not in other_text and len(word) > 3:
                        unique_words.add(word)

            total_words = sum(len(c.split()) for c in recent)
            unique_ratio = len(unique_words) / max(total_words, 1)

            # Task relevance
            agent_words = set(" ".join(recent).lower().split())
            relevance = len(agent_words & task_words) / max(len(task_words), 1)

            # Self-diversity (are recent messages different from each other?)
            diversity = 1.0 - self._similarity(recent[-1], recent[-2]) if len(recent) >= 2 else 1.0

            score = 0.4 * unique_ratio + 0.3 * relevance + 0.3 * diversity
            scores.append(
                ContributionScore(
                    agent_name=agent_name,
                    score=min(score, 1.0),
                    unique_info=unique_ratio,
                    message_count=len(contents),
                )
            )

        return scores

    def should_prune(self, scores: list[ContributionScore]) -> list[str]:
        """Return agent names that should be pruned (below min contribution)."""
        return [s.agent_name for s in scores if s.score < self._min_contribution]

    @staticmethod
    def _similarity(a: str, b: str) -> float:
        """Simple word-overlap similarity between two texts."""
        words_a = set(a.lower().split())
        words_b = set(b.lower().split())
        if not words_a or not words_b:
            return 0.0
        return len(words_a & words_b) / len(words_a | words_b)

score_agents(messages, task)

Score each agent's contribution from message history.

Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
def score_agents(
    self,
    messages: list[Message],
    task: str,
) -> list[ContributionScore]:
    """Score each agent's contribution from message history."""
    # Group messages by agent name
    by_agent: dict[str, list[str]] = {}
    for msg in messages:
        name = msg.name or "unknown"
        by_agent.setdefault(name, []).append(msg.content)

    [m.content for m in messages]
    task_words = set(task.lower().split())

    scores: list[ContributionScore] = []
    for agent_name, contents in by_agent.items():
        recent = contents[-self._window_size :]

        # Unique info: how much of this agent's content is NOT in other agents' content
        other_text = " ".join(
            c for name, cs in by_agent.items() if name != agent_name for c in cs
        ).lower()
        unique_words = set()
        for content in recent:
            for word in content.lower().split():
                if word not in other_text and len(word) > 3:
                    unique_words.add(word)

        total_words = sum(len(c.split()) for c in recent)
        unique_ratio = len(unique_words) / max(total_words, 1)

        # Task relevance
        agent_words = set(" ".join(recent).lower().split())
        relevance = len(agent_words & task_words) / max(len(task_words), 1)

        # Self-diversity (are recent messages different from each other?)
        diversity = 1.0 - self._similarity(recent[-1], recent[-2]) if len(recent) >= 2 else 1.0

        score = 0.4 * unique_ratio + 0.3 * relevance + 0.3 * diversity
        scores.append(
            ContributionScore(
                agent_name=agent_name,
                score=min(score, 1.0),
                unique_info=unique_ratio,
                message_count=len(contents),
            )
        )

    return scores

should_prune(scores)

Return agent names that should be pruned (below min contribution).

Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
def should_prune(self, scores: list[ContributionScore]) -> list[str]:
    """Return agent names that should be pruned (below min contribution)."""
    return [s.agent_name for s in scores if s.score < self._min_contribution]

pyagent_compress.pruner.InteractionPruner

Detect early consensus and skip remaining communication rounds.

Analyzes agent outputs to determine if consensus has been reached, allowing patterns like Debate or Voting to terminate early.

Parameters:

Name Type Description Default
consensus_threshold float

Minimum similarity between agent outputs to consider consensus reached (0-1).

0.7
min_rounds int

Minimum rounds before early termination is allowed.

1
Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
class InteractionPruner:
    """Detect early consensus and skip remaining communication rounds.

    Analyzes agent outputs to determine if consensus has been reached,
    allowing patterns like Debate or Voting to terminate early.

    Args:
        consensus_threshold: Minimum similarity between agent outputs
            to consider consensus reached (0-1).
        min_rounds: Minimum rounds before early termination is allowed.
    """

    def __init__(
        self,
        consensus_threshold: float = 0.7,
        min_rounds: int = 1,
    ) -> None:
        self._threshold = consensus_threshold
        self._min_rounds = min_rounds

    def has_consensus(self, outputs: list[str], current_round: int) -> bool:
        """Check if agents have reached consensus.

        Args:
            outputs: Current round's outputs from all agents.
            current_round: The current round number (1-indexed).

        Returns:
            True if consensus is reached and we can stop early.
        """
        if current_round < self._min_rounds:
            return False

        if len(outputs) < 2:
            return True

        # Check pairwise similarity
        similarities: list[float] = []
        for i in range(len(outputs)):
            for j in range(i + 1, len(outputs)):
                sim = self._similarity(outputs[i], outputs[j])
                similarities.append(sim)

        avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0
        return avg_similarity >= self._threshold

    @staticmethod
    def _similarity(a: str, b: str) -> float:
        words_a = set(a.lower().split())
        words_b = set(b.lower().split())
        if not words_a or not words_b:
            return 0.0
        return len(words_a & words_b) / len(words_a | words_b)

has_consensus(outputs, current_round)

Check if agents have reached consensus.

Parameters:

Name Type Description Default
outputs list[str]

Current round's outputs from all agents.

required
current_round int

The current round number (1-indexed).

required

Returns:

Type Description
bool

True if consensus is reached and we can stop early.

Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
def has_consensus(self, outputs: list[str], current_round: int) -> bool:
    """Check if agents have reached consensus.

    Args:
        outputs: Current round's outputs from all agents.
        current_round: The current round number (1-indexed).

    Returns:
        True if consensus is reached and we can stop early.
    """
    if current_round < self._min_rounds:
        return False

    if len(outputs) < 2:
        return True

    # Check pairwise similarity
    similarities: list[float] = []
    for i in range(len(outputs)):
        for j in range(i + 1, len(outputs)):
            sim = self._similarity(outputs[i], outputs[j])
            similarities.append(sim)

    avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0
    return avg_similarity >= self._threshold

pyagent_compress.pruner.ContributionScore dataclass

How much an agent is contributing to the conversation.

Source code in packages/pyagent-compress/src/pyagent_compress/pruner.py
@dataclass(frozen=True)
class ContributionScore:
    """How much an agent is contributing to the conversation."""

    agent_name: str
    score: float  # 0-1
    unique_info: float  # 0-1 — how much unique information vs repetition
    message_count: int

pyagent_compress.budget.TokenBudget dataclass

Manage token budgets across a multi-agent workflow.

Parameters:

Name Type Description Default
workflow_limit int

Total token limit across all agents.

100000
per_agent_limit int

Default per-agent token limit.

20000
strict bool

If True, raises BudgetExceededError on limit violations.

True
Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
@dataclass
class TokenBudget:
    """Manage token budgets across a multi-agent workflow.

    Args:
        workflow_limit: Total token limit across all agents.
        per_agent_limit: Default per-agent token limit.
        strict: If True, raises BudgetExceededError on limit violations.
    """

    workflow_limit: int = 100_000
    per_agent_limit: int = 20_000
    strict: bool = True
    _agents: dict[str, AgentBudget] = field(default_factory=dict)
    _total_used: int = 0

    def register_agent(self, name: str, limit: int | None = None) -> None:
        """Register an agent with an optional custom limit."""
        self._agents[name] = AgentBudget(name=name, limit=limit or self.per_agent_limit)

    def consume(self, agent_name: str, tokens: int) -> None:
        """Record token consumption for an agent.

        Args:
            agent_name: The consuming agent's name.
            tokens: Number of tokens consumed.
        """
        if agent_name not in self._agents:
            self.register_agent(agent_name)

        self._agents[agent_name].consume(tokens, strict=self.strict)
        self._total_used += tokens

        if self.strict and self._total_used > self.workflow_limit:
            raise BudgetExceededError("workflow", self._total_used, self.workflow_limit)

    def remaining(self, agent_name: str | None = None) -> int:
        """Get remaining tokens for an agent or the whole workflow."""
        if agent_name:
            budget = self._agents.get(agent_name)
            return budget.remaining if budget else self.per_agent_limit
        return max(0, self.workflow_limit - self._total_used)

    @property
    def total_used(self) -> int:
        return self._total_used

    @property
    def workflow_utilization(self) -> float:
        return self._total_used / self.workflow_limit if self.workflow_limit > 0 else 0.0

    def summary(self) -> dict[str, dict[str, int | float]]:
        """Return a summary of all agent budgets."""
        result: dict[str, dict[str, int | float]] = {
            "workflow": {
                "limit": self.workflow_limit,
                "used": self._total_used,
                "remaining": self.remaining(),
                "utilization": self.workflow_utilization,
            }
        }
        for name, budget in self._agents.items():
            result[name] = {
                "limit": budget.limit,
                "used": budget.used,
                "remaining": budget.remaining,
                "utilization": budget.utilization,
            }
        return result

consume(agent_name, tokens)

Record token consumption for an agent.

Parameters:

Name Type Description Default
agent_name str

The consuming agent's name.

required
tokens int

Number of tokens consumed.

required
Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
def consume(self, agent_name: str, tokens: int) -> None:
    """Record token consumption for an agent.

    Args:
        agent_name: The consuming agent's name.
        tokens: Number of tokens consumed.
    """
    if agent_name not in self._agents:
        self.register_agent(agent_name)

    self._agents[agent_name].consume(tokens, strict=self.strict)
    self._total_used += tokens

    if self.strict and self._total_used > self.workflow_limit:
        raise BudgetExceededError("workflow", self._total_used, self.workflow_limit)

register_agent(name, limit=None)

Register an agent with an optional custom limit.

Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
def register_agent(self, name: str, limit: int | None = None) -> None:
    """Register an agent with an optional custom limit."""
    self._agents[name] = AgentBudget(name=name, limit=limit or self.per_agent_limit)

remaining(agent_name=None)

Get remaining tokens for an agent or the whole workflow.

Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
def remaining(self, agent_name: str | None = None) -> int:
    """Get remaining tokens for an agent or the whole workflow."""
    if agent_name:
        budget = self._agents.get(agent_name)
        return budget.remaining if budget else self.per_agent_limit
    return max(0, self.workflow_limit - self._total_used)

summary()

Return a summary of all agent budgets.

Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
def summary(self) -> dict[str, dict[str, int | float]]:
    """Return a summary of all agent budgets."""
    result: dict[str, dict[str, int | float]] = {
        "workflow": {
            "limit": self.workflow_limit,
            "used": self._total_used,
            "remaining": self.remaining(),
            "utilization": self.workflow_utilization,
        }
    }
    for name, budget in self._agents.items():
        result[name] = {
            "limit": budget.limit,
            "used": budget.used,
            "remaining": budget.remaining,
            "utilization": budget.utilization,
        }
    return result

pyagent_compress.budget.AgentBudget dataclass

Budget tracking for a single agent.

Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
@dataclass
class AgentBudget:
    """Budget tracking for a single agent."""

    name: str
    limit: int
    used: int = 0

    @property
    def remaining(self) -> int:
        return max(0, self.limit - self.used)

    @property
    def utilization(self) -> float:
        return self.used / self.limit if self.limit > 0 else 0.0

    def consume(self, tokens: int, strict: bool = True) -> None:
        """Consume tokens from budget.

        Args:
            tokens: Number of tokens consumed.
            strict: If True, raises BudgetExceededError. If False, just tracks.
        """
        self.used += tokens
        if strict and self.used > self.limit:
            raise BudgetExceededError(self.name, self.used, self.limit)

consume(tokens, strict=True)

Consume tokens from budget.

Parameters:

Name Type Description Default
tokens int

Number of tokens consumed.

required
strict bool

If True, raises BudgetExceededError. If False, just tracks.

True
Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
def consume(self, tokens: int, strict: bool = True) -> None:
    """Consume tokens from budget.

    Args:
        tokens: Number of tokens consumed.
        strict: If True, raises BudgetExceededError. If False, just tracks.
    """
    self.used += tokens
    if strict and self.used > self.limit:
        raise BudgetExceededError(self.name, self.used, self.limit)

pyagent_compress.budget.BudgetExceededError

Bases: Exception

Raised when a token budget is exceeded.

Source code in packages/pyagent-compress/src/pyagent_compress/budget.py
class BudgetExceededError(Exception):
    """Raised when a token budget is exceeded."""

    def __init__(self, agent: str, used: int, limit: int) -> None:
        self.agent = agent
        self.used = used
        self.limit = limit
        super().__init__(f"Agent '{agent}' exceeded budget: {used}/{limit} tokens")

pyagent_compress.middleware.CompressMiddleware

Middleware that wraps agents with automatic output compression.

Usage

middleware = CompressMiddleware(target_ratio=0.5) compressed_agent = middleware.wrap(my_agent)

Parameters:

Name Type Description Default
compressor MessageCompressor | None

Optional MessageCompressor. Created with defaults if None.

None
budget TokenBudget | None

Optional TokenBudget for workflow-wide tracking.

None
target_ratio float

Target compression ratio (used if compressor not provided).

0.5
Source code in packages/pyagent-compress/src/pyagent_compress/middleware.py
class CompressMiddleware:
    """Middleware that wraps agents with automatic output compression.

    Usage:
        middleware = CompressMiddleware(target_ratio=0.5)
        compressed_agent = middleware.wrap(my_agent)

    Args:
        compressor: Optional MessageCompressor. Created with defaults if None.
        budget: Optional TokenBudget for workflow-wide tracking.
        target_ratio: Target compression ratio (used if compressor not provided).
    """

    def __init__(
        self,
        compressor: MessageCompressor | None = None,
        budget: TokenBudget | None = None,
        target_ratio: float = 0.5,
    ) -> None:
        self._compressor = compressor or MessageCompressor(target_ratio=target_ratio)
        self._budget = budget

    def wrap(self, agent: Agent) -> CompressedAgent:
        """Wrap an agent with compression."""
        return CompressedAgent(
            agent=agent,
            compressor=self._compressor,
            budget=self._budget,
        )

    def wrap_all(self, agents: list[Agent]) -> list[CompressedAgent]:
        """Wrap multiple agents."""
        return [self.wrap(a) for a in agents]

wrap(agent)

Wrap an agent with compression.

Source code in packages/pyagent-compress/src/pyagent_compress/middleware.py
def wrap(self, agent: Agent) -> CompressedAgent:
    """Wrap an agent with compression."""
    return CompressedAgent(
        agent=agent,
        compressor=self._compressor,
        budget=self._budget,
    )

wrap_all(agents)

Wrap multiple agents.

Source code in packages/pyagent-compress/src/pyagent_compress/middleware.py
def wrap_all(self, agents: list[Agent]) -> list[CompressedAgent]:
    """Wrap multiple agents."""
    return [self.wrap(a) for a in agents]

pyagent_compress.middleware.CompressedAgent

Bases: Agent

Agent wrapper that compresses output messages.

Parameters:

Name Type Description Default
agent Agent

The original agent.

required
compressor MessageCompressor

MessageCompressor instance.

required
budget TokenBudget | None

Optional TokenBudget for tracking.

None
Source code in packages/pyagent-compress/src/pyagent_compress/middleware.py
class CompressedAgent(Agent):
    """Agent wrapper that compresses output messages.

    Args:
        agent: The original agent.
        compressor: MessageCompressor instance.
        budget: Optional TokenBudget for tracking.
    """

    def __init__(
        self,
        agent: Agent,
        compressor: MessageCompressor,
        budget: TokenBudget | None = None,
    ) -> None:
        super().__init__(
            name=agent.name,
            llm=agent.llm,
            system_prompt=agent.system_prompt,
            description=agent.description,
        )
        self._original = agent
        self._compressor = compressor
        self._budget = budget
        self.compression_log: list[dict[str, int | float]] = []

    async def run(self, messages: list[Message]) -> Message:
        """Run the agent and compress the output."""
        result = await self._original.run(messages)

        # Compress the output
        compressed = self._compressor.compress(result.content)
        self.compression_log.append(
            {
                "original_tokens": compressed.original_tokens,
                "compressed_tokens": compressed.compressed_tokens,
                "savings_pct": compressed.savings_pct,
            }
        )

        # Track budget if available
        if self._budget:
            self._budget.consume(self._original.name, compressed.compressed_tokens)

        return Message(
            role=result.role,
            content=compressed.compressed,
            name=result.name,
            metadata={
                **result.metadata,
                "compressed": True,
                "original_tokens": compressed.original_tokens,
                "compressed_tokens": compressed.compressed_tokens,
                "savings_pct": compressed.savings_pct,
            },
        )

run(messages) async

Run the agent and compress the output.

Source code in packages/pyagent-compress/src/pyagent_compress/middleware.py
async def run(self, messages: list[Message]) -> Message:
    """Run the agent and compress the output."""
    result = await self._original.run(messages)

    # Compress the output
    compressed = self._compressor.compress(result.content)
    self.compression_log.append(
        {
            "original_tokens": compressed.original_tokens,
            "compressed_tokens": compressed.compressed_tokens,
            "savings_pct": compressed.savings_pct,
        }
    )

    # Track budget if available
    if self._budget:
        self._budget.consume(self._original.name, compressed.compressed_tokens)

    return Message(
        role=result.role,
        content=compressed.compressed,
        name=result.name,
        metadata={
            **result.metadata,
            "compressed": True,
            "original_tokens": compressed.original_tokens,
            "compressed_tokens": compressed.compressed_tokens,
            "savings_pct": compressed.savings_pct,
        },
    )