Skip to content

pyagent-context API Reference

pyagent_context

PyAgent Context — three-tier memory with trust-aware context ledger.

CompressionPolicy

Bases: StrEnum

Available compression strategies.

Source code in packages/pyagent-context/src/pyagent_context/compression.py
class CompressionPolicy(StrEnum):
    """Available compression strategies."""

    NONE = "none"
    FIFO = "fifo"  # drop oldest items
    SEMANTIC_LOSSLESS = "semantic_lossless"  # compress text, keep Knowledge block
    SAWTOOTH = "sawtooth"  # compress to floor, grow again

ContextCompressor

Apply compression policies to a ContextLedger.

The compressor monitors token usage and compresses when a threshold is reached.

Parameters:

Name Type Description Default
policy CompressionPolicy

Compression strategy.

FIFO
threshold_tokens int

Token count that triggers compression.

10000
floor_tokens int

Token target after compression (for SAWTOOTH/FIFO).

5000
preserve_trust TrustLevel

Items at or above this trust level are never dropped.

VERIFIED
Source code in packages/pyagent-context/src/pyagent_context/compression.py
class ContextCompressor:
    """Apply compression policies to a ContextLedger.

    The compressor monitors token usage and compresses when a threshold
    is reached.

    Args:
        policy: Compression strategy.
        threshold_tokens: Token count that triggers compression.
        floor_tokens: Token target after compression (for SAWTOOTH/FIFO).
        preserve_trust: Items at or above this trust level are never dropped.
    """

    def __init__(
        self,
        policy: CompressionPolicy = CompressionPolicy.FIFO,
        threshold_tokens: int = 10_000,
        floor_tokens: int = 5_000,
        preserve_trust: TrustLevel = TrustLevel.VERIFIED,
    ) -> None:
        self._policy = policy
        self._threshold = threshold_tokens
        self._floor = floor_tokens
        self._preserve_trust = preserve_trust

    @property
    def policy(self) -> CompressionPolicy:
        return self._policy

    def should_compress(self, ledger: ContextLedger) -> bool:
        """Check if the ledger's token count exceeds the threshold."""
        if self._policy == CompressionPolicy.NONE:
            return False
        return ledger.total_tokens >= self._threshold

    def compress(self, ledger: ContextLedger) -> ContextLedger:
        """Apply the compression policy and return a new (compressed) ledger.

        The original ledger is not mutated.
        """
        if self._policy == CompressionPolicy.NONE:
            return ledger
        if self._policy == CompressionPolicy.FIFO:
            return self._compress_fifo(ledger)
        if self._policy == CompressionPolicy.SEMANTIC_LOSSLESS:
            return self._compress_semantic(ledger)
        if self._policy == CompressionPolicy.SAWTOOTH:
            return self._compress_sawtooth(ledger)
        return ledger

    def _compress_fifo(self, ledger: ContextLedger) -> ContextLedger:
        """Drop oldest items until under floor, preserving high-trust items."""
        items = list(ledger.items)
        tokens = ledger.total_tokens

        # Remove from front (oldest) until under floor
        new_items: list[ContextItem] = []
        for item in items:
            if tokens <= self._floor:
                new_items.append(item)
            elif item.trust_level >= self._preserve_trust:
                new_items.append(item)  # keep high-trust
            else:
                tokens -= item.token_estimate  # drop

        return ContextLedger(items=new_items)

    def _compress_semantic(self, ledger: ContextLedger) -> ContextLedger:
        """Compress item content text while preserving verified items unchanged.

        Uses simple sentence extraction — for full compression, integrate
        ``MessageCompressor`` from ``pyagent-compress``.
        """
        items = list(ledger.items)
        new_items: list[ContextItem] = []

        for item in items:
            if item.trust_level >= self._preserve_trust:
                new_items.append(item)
            else:
                # Simple compression: keep first sentence only
                sentences = item.content.split(". ")
                compressed = sentences[0] + ("." if len(sentences) > 1 else "")
                new_item = ContextItem(
                    content=compressed,
                    source=item.source,
                    timestamp=item.timestamp,
                    trust_level=item.trust_level,
                    sensitivity=item.sensitivity,
                    expires_at=item.expires_at,
                    derived_from=item.id,
                )
                new_items.append(new_item)

        return ContextLedger(items=new_items)

    def _compress_sawtooth(self, ledger: ContextLedger) -> ContextLedger:
        """Compress to floor, then allow growth again.

        Combines FIFO eviction with semantic compression for remaining items.
        """
        # First pass: FIFO eviction
        fifo_result = self._compress_fifo(ledger)

        # If still over floor, apply semantic compression
        if fifo_result.total_tokens > self._floor:
            return self._compress_semantic(fifo_result)

        return fifo_result

should_compress(ledger)

Check if the ledger's token count exceeds the threshold.

Source code in packages/pyagent-context/src/pyagent_context/compression.py
def should_compress(self, ledger: ContextLedger) -> bool:
    """Check if the ledger's token count exceeds the threshold."""
    if self._policy == CompressionPolicy.NONE:
        return False
    return ledger.total_tokens >= self._threshold

compress(ledger)

Apply the compression policy and return a new (compressed) ledger.

The original ledger is not mutated.

Source code in packages/pyagent-context/src/pyagent_context/compression.py
def compress(self, ledger: ContextLedger) -> ContextLedger:
    """Apply the compression policy and return a new (compressed) ledger.

    The original ledger is not mutated.
    """
    if self._policy == CompressionPolicy.NONE:
        return ledger
    if self._policy == CompressionPolicy.FIFO:
        return self._compress_fifo(ledger)
    if self._policy == CompressionPolicy.SEMANTIC_LOSSLESS:
        return self._compress_semantic(ledger)
    if self._policy == CompressionPolicy.SAWTOOTH:
        return self._compress_sawtooth(ledger)
    return ledger

ContextItem dataclass

A single piece of context with trust and lifecycle metadata.

Attributes:

Name Type Description
content str

The text content.

source str

Origin — agent name, tool name, or "user".

timestamp float

Creation time (time.time()).

trust_level TrustLevel

How much to trust this item.

sensitivity Sensitivity

Data classification for redaction decisions.

expires_at float | None

Expiration timestamp, or None for never.

derived_from str | None

Parent item ID if this was derived from another.

token_estimate int

Rough token count (len(content) // 4).

Source code in packages/pyagent-context/src/pyagent_context/item.py
@dataclass
class ContextItem:
    """A single piece of context with trust and lifecycle metadata.

    Attributes:
        content: The text content.
        source: Origin — agent name, tool name, or ``"user"``.
        timestamp: Creation time (``time.time()``).
        trust_level: How much to trust this item.
        sensitivity: Data classification for redaction decisions.
        expires_at: Expiration timestamp, or ``None`` for never.
        derived_from: Parent item ID if this was derived from another.
        token_estimate: Rough token count (``len(content) // 4``).
    """

    content: str
    source: str
    timestamp: float = field(default_factory=time.time)
    trust_level: TrustLevel = TrustLevel.INFERRED
    sensitivity: Sensitivity = Sensitivity.INTERNAL
    expires_at: float | None = None
    derived_from: str | None = None
    token_estimate: int = 0
    _id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])

    def __post_init__(self) -> None:
        if self.token_estimate == 0:
            self.token_estimate = max(1, len(self.content) // 4)

    @property
    def id(self) -> str:
        return self._id

    @property
    def is_expired(self) -> bool:
        """Check if this item has passed its expiry time."""
        if self.expires_at is None:
            return False
        return time.time() > self.expires_at

    @property
    def age_seconds(self) -> float:
        """Time since creation in seconds."""
        return time.time() - self.timestamp

    def to_dict(self) -> dict:
        """Serialize to a JSON-compatible dict."""
        return {
            "id": self._id,
            "content": self.content,
            "source": self.source,
            "timestamp": self.timestamp,
            "trust_level": self.trust_level.value,
            "sensitivity": self.sensitivity.value,
            "expires_at": self.expires_at,
            "derived_from": self.derived_from,
            "token_estimate": self.token_estimate,
        }

    @classmethod
    def from_dict(cls, data: dict) -> ContextItem:
        """Deserialize from a dict."""
        item = cls(
            content=data["content"],
            source=data["source"],
            timestamp=data["timestamp"],
            trust_level=TrustLevel(data["trust_level"]),
            sensitivity=Sensitivity(data["sensitivity"]),
            expires_at=data.get("expires_at"),
            derived_from=data.get("derived_from"),
            token_estimate=data.get("token_estimate", 0),
        )
        item._id = data["id"]
        return item

is_expired property

Check if this item has passed its expiry time.

age_seconds property

Time since creation in seconds.

to_dict()

Serialize to a JSON-compatible dict.

Source code in packages/pyagent-context/src/pyagent_context/item.py
def to_dict(self) -> dict:
    """Serialize to a JSON-compatible dict."""
    return {
        "id": self._id,
        "content": self.content,
        "source": self.source,
        "timestamp": self.timestamp,
        "trust_level": self.trust_level.value,
        "sensitivity": self.sensitivity.value,
        "expires_at": self.expires_at,
        "derived_from": self.derived_from,
        "token_estimate": self.token_estimate,
    }

from_dict(data) classmethod

Deserialize from a dict.

Source code in packages/pyagent-context/src/pyagent_context/item.py
@classmethod
def from_dict(cls, data: dict) -> ContextItem:
    """Deserialize from a dict."""
    item = cls(
        content=data["content"],
        source=data["source"],
        timestamp=data["timestamp"],
        trust_level=TrustLevel(data["trust_level"]),
        sensitivity=Sensitivity(data["sensitivity"]),
        expires_at=data.get("expires_at"),
        derived_from=data.get("derived_from"),
        token_estimate=data.get("token_estimate", 0),
    )
    item._id = data["id"]
    return item

Sensitivity

Bases: StrEnum

Data sensitivity classification.

Source code in packages/pyagent-context/src/pyagent_context/item.py
class Sensitivity(StrEnum):
    """Data sensitivity classification."""

    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

TrustLevel

Bases: StrEnum

Trust classification for context items.

Source code in packages/pyagent-context/src/pyagent_context/item.py
class TrustLevel(StrEnum):
    """Trust classification for context items."""

    VERIFIED = "verified"  # from trusted source, validated
    INFERRED = "inferred"  # LLM-generated, not validated
    USER_PROVIDED = "user"  # direct user input
    EXTERNAL = "external"  # from tool/API call

    def __ge__(self, other: TrustLevel) -> bool:
        order = {
            TrustLevel.INFERRED: 0,
            TrustLevel.EXTERNAL: 1,
            TrustLevel.USER_PROVIDED: 2,
            TrustLevel.VERIFIED: 3,
        }
        return order.get(self, 0) >= order.get(other, 0)

    def __gt__(self, other: TrustLevel) -> bool:
        return self != other and self.__ge__(other)

    def __le__(self, other: TrustLevel) -> bool:
        return other.__ge__(self)

    def __lt__(self, other: TrustLevel) -> bool:
        return self != other and self.__le__(other)

ContextLedger

Append-only ledger of context items.

Supports querying by trust level, age, and source. Converts items to Message lists for use with patterns.

Parameters:

Name Type Description Default
items list[ContextItem] | None

Optional initial items.

None
Source code in packages/pyagent-context/src/pyagent_context/ledger.py
class ContextLedger:
    """Append-only ledger of context items.

    Supports querying by trust level, age, and source. Converts items
    to ``Message`` lists for use with patterns.

    Args:
        items: Optional initial items.
    """

    def __init__(self, items: list[ContextItem] | None = None) -> None:
        self._items: list[ContextItem] = list(items) if items else []

    def append(self, item: ContextItem) -> None:
        """Add an item to the ledger."""
        self._items.append(item)

    def add(
        self,
        content: str,
        source: str,
        trust_level: TrustLevel = TrustLevel.INFERRED,
        **kwargs: Any,
    ) -> ContextItem:
        """Create and append a new item in one step.

        Returns:
            The created ``ContextItem``.
        """
        item = ContextItem(content=content, source=source, trust_level=trust_level, **kwargs)
        self._items.append(item)
        return item

    def query(
        self,
        *,
        min_trust: TrustLevel | None = None,
        max_age_seconds: float | None = None,
        source: str | None = None,
    ) -> list[ContextItem]:
        """Filter items by trust level, age, and/or source.

        Args:
            min_trust: Only return items at or above this trust level.
            max_age_seconds: Only return items newer than this.
            source: Only return items from this source.

        Returns:
            Filtered list of items (chronological order).
        """
        now = time.time()
        results: list[ContextItem] = []
        for item in self._items:
            if min_trust is not None and TRUST_ORDER.get(item.trust_level, 0) < TRUST_ORDER.get(
                min_trust, 0
            ):
                continue
            if max_age_seconds is not None and (now - item.timestamp) > max_age_seconds:
                continue
            if source is not None and item.source != source:
                continue
            results.append(item)
        return results

    @property
    def total_tokens(self) -> int:
        """Sum of token estimates across all items."""
        return sum(item.token_estimate for item in self._items)

    def to_messages(self, max_tokens: int | None = None) -> list[Message]:
        """Convert ledger items to a list of Messages.

        If ``max_tokens`` is set, include items from most recent backward
        until the budget is exhausted.

        Args:
            max_tokens: Optional token budget.

        Returns:
            List of ``Message`` objects.
        """
        if max_tokens is None:
            return [Message.assistant(item.content, name=item.source) for item in self._items]

        # Walk backward, accumulate until budget exhausted
        selected: list[ContextItem] = []
        budget = max_tokens
        for item in reversed(self._items):
            if item.token_estimate <= budget:
                selected.append(item)
                budget -= item.token_estimate
            else:
                break

        selected.reverse()
        return [Message.assistant(item.content, name=item.source) for item in selected]

    def snapshot(self) -> dict:
        """Serialize the full ledger to a JSON-compatible dict."""
        return {
            "items": [item.to_dict() for item in self._items],
            "total_tokens": self.total_tokens,
            "count": len(self._items),
        }

    @classmethod
    def from_snapshot(cls, data: dict) -> ContextLedger:
        """Restore a ledger from a snapshot dict."""
        items = [ContextItem.from_dict(d) for d in data["items"]]
        return cls(items=items)

    @property
    def items(self) -> list[ContextItem]:
        return list(self._items)

    def __len__(self) -> int:
        return len(self._items)

    def __bool__(self) -> bool:
        return len(self._items) > 0

    def clear(self) -> None:
        """Remove all items."""
        self._items.clear()

total_tokens property

Sum of token estimates across all items.

append(item)

Add an item to the ledger.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def append(self, item: ContextItem) -> None:
    """Add an item to the ledger."""
    self._items.append(item)

add(content, source, trust_level=TrustLevel.INFERRED, **kwargs)

Create and append a new item in one step.

Returns:

Type Description
ContextItem

The created ContextItem.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def add(
    self,
    content: str,
    source: str,
    trust_level: TrustLevel = TrustLevel.INFERRED,
    **kwargs: Any,
) -> ContextItem:
    """Create and append a new item in one step.

    Returns:
        The created ``ContextItem``.
    """
    item = ContextItem(content=content, source=source, trust_level=trust_level, **kwargs)
    self._items.append(item)
    return item

query(*, min_trust=None, max_age_seconds=None, source=None)

Filter items by trust level, age, and/or source.

Parameters:

Name Type Description Default
min_trust TrustLevel | None

Only return items at or above this trust level.

None
max_age_seconds float | None

Only return items newer than this.

None
source str | None

Only return items from this source.

None

Returns:

Type Description
list[ContextItem]

Filtered list of items (chronological order).

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def query(
    self,
    *,
    min_trust: TrustLevel | None = None,
    max_age_seconds: float | None = None,
    source: str | None = None,
) -> list[ContextItem]:
    """Filter items by trust level, age, and/or source.

    Args:
        min_trust: Only return items at or above this trust level.
        max_age_seconds: Only return items newer than this.
        source: Only return items from this source.

    Returns:
        Filtered list of items (chronological order).
    """
    now = time.time()
    results: list[ContextItem] = []
    for item in self._items:
        if min_trust is not None and TRUST_ORDER.get(item.trust_level, 0) < TRUST_ORDER.get(
            min_trust, 0
        ):
            continue
        if max_age_seconds is not None and (now - item.timestamp) > max_age_seconds:
            continue
        if source is not None and item.source != source:
            continue
        results.append(item)
    return results

to_messages(max_tokens=None)

Convert ledger items to a list of Messages.

If max_tokens is set, include items from most recent backward until the budget is exhausted.

Parameters:

Name Type Description Default
max_tokens int | None

Optional token budget.

None

Returns:

Type Description
list[Message]

List of Message objects.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def to_messages(self, max_tokens: int | None = None) -> list[Message]:
    """Convert ledger items to a list of Messages.

    If ``max_tokens`` is set, include items from most recent backward
    until the budget is exhausted.

    Args:
        max_tokens: Optional token budget.

    Returns:
        List of ``Message`` objects.
    """
    if max_tokens is None:
        return [Message.assistant(item.content, name=item.source) for item in self._items]

    # Walk backward, accumulate until budget exhausted
    selected: list[ContextItem] = []
    budget = max_tokens
    for item in reversed(self._items):
        if item.token_estimate <= budget:
            selected.append(item)
            budget -= item.token_estimate
        else:
            break

    selected.reverse()
    return [Message.assistant(item.content, name=item.source) for item in selected]

snapshot()

Serialize the full ledger to a JSON-compatible dict.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def snapshot(self) -> dict:
    """Serialize the full ledger to a JSON-compatible dict."""
    return {
        "items": [item.to_dict() for item in self._items],
        "total_tokens": self.total_tokens,
        "count": len(self._items),
    }

from_snapshot(data) classmethod

Restore a ledger from a snapshot dict.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
@classmethod
def from_snapshot(cls, data: dict) -> ContextLedger:
    """Restore a ledger from a snapshot dict."""
    items = [ContextItem.from_dict(d) for d in data["items"]]
    return cls(items=items)

clear()

Remove all items.

Source code in packages/pyagent-context/src/pyagent_context/ledger.py
def clear(self) -> None:
    """Remove all items."""
    self._items.clear()

ContextLifecycle

Manage the lifecycle of context items.

Provides: - Expiry sweep: remove items past their expires_at. - Freshness decay: reduce token budgets for old items. - Consolidation: merge items from the same source with similar content.

Parameters:

Name Type Description Default
consolidation_threshold float

Minimum keyword overlap ratio (0.0-1.0) to consider two items similar enough to merge.

0.6
Source code in packages/pyagent-context/src/pyagent_context/lifecycle.py
class ContextLifecycle:
    """Manage the lifecycle of context items.

    Provides:
    - **Expiry sweep**: remove items past their ``expires_at``.
    - **Freshness decay**: reduce token budgets for old items.
    - **Consolidation**: merge items from the same source with similar content.

    Args:
        consolidation_threshold: Minimum keyword overlap ratio (0.0-1.0)
            to consider two items similar enough to merge.
    """

    def __init__(self, consolidation_threshold: float = 0.6) -> None:
        self._consolidation_threshold = consolidation_threshold

    def sweep_expired(self, ledger: ContextLedger) -> tuple[ContextLedger, list[ContextItem]]:
        """Remove expired items from the ledger.

        Args:
            ledger: The context ledger.

        Returns:
            Tuple of (new ledger without expired items, list of expired items).
        """
        now = time.time()
        kept: list[ContextItem] = []
        expired: list[ContextItem] = []

        for item in ledger.items:
            if item.expires_at is not None and now > item.expires_at:
                expired.append(item)
            else:
                kept.append(item)

        return ContextLedger(items=kept), expired

    def apply_freshness_decay(
        self,
        ledger: ContextLedger,
        half_life_seconds: float = 3600.0,
        min_tokens: int = 1,
    ) -> ContextLedger:
        """Reduce token estimates based on age.

        Older items get smaller token budgets, making them less likely
        to survive compression. Items retain at least ``min_tokens``.

        Args:
            ledger: The context ledger.
            half_life_seconds: Seconds for token estimate to halve.
            min_tokens: Minimum token estimate after decay.

        Returns:
            New ledger with decayed token estimates.
        """
        import math

        now = time.time()
        new_items: list[ContextItem] = []

        for item in ledger.items:
            age = now - item.timestamp
            decay_factor = math.exp(-age / half_life_seconds) if half_life_seconds > 0 else 1.0
            decayed_tokens = max(min_tokens, int(item.token_estimate * decay_factor))

            new_item = ContextItem(
                content=item.content,
                source=item.source,
                timestamp=item.timestamp,
                trust_level=item.trust_level,
                sensitivity=item.sensitivity,
                expires_at=item.expires_at,
                derived_from=item.derived_from,
                token_estimate=decayed_tokens,
            )
            new_item._id = item.id
            new_items.append(new_item)

        return ContextLedger(items=new_items)

    def consolidate(self, ledger: ContextLedger) -> ContextLedger:
        """Merge similar items from the same source.

        When two items from the same source have high keyword overlap,
        they are merged into one with combined content and the higher
        trust level.

        Args:
            ledger: The context ledger.

        Returns:
            New ledger with consolidated items.
        """
        by_source: dict[str, list[ContextItem]] = defaultdict(list)
        for item in ledger.items:
            by_source[item.source].append(item)

        consolidated: list[ContextItem] = []

        for source, items in by_source.items():
            merged_indices: set[int] = set()

            for i, item_a in enumerate(items):
                if i in merged_indices:
                    continue
                merged_content = item_a.content
                best_trust = item_a.trust_level
                latest_time = item_a.timestamp

                for j in range(i + 1, len(items)):
                    if j in merged_indices:
                        continue
                    item_b = items[j]
                    if (
                        self._similarity(item_a.content, item_b.content)
                        >= self._consolidation_threshold
                    ):
                        merged_content = f"{merged_content}\n{item_b.content}"
                        if item_b.trust_level > best_trust:
                            best_trust = item_b.trust_level
                        latest_time = max(latest_time, item_b.timestamp)
                        merged_indices.add(j)

                new_item = ContextItem(
                    content=merged_content,
                    source=source,
                    timestamp=latest_time,
                    trust_level=best_trust,
                    sensitivity=item_a.sensitivity,
                    expires_at=item_a.expires_at,
                    derived_from=item_a.id,
                )
                consolidated.append(new_item)

        # Sort by timestamp to maintain chronological order
        consolidated.sort(key=lambda x: x.timestamp)
        return ContextLedger(items=consolidated)

    @staticmethod
    def _similarity(text_a: str, text_b: str) -> float:
        """Keyword overlap ratio (Jaccard-like)."""
        words_a = set(text_a.lower().split())
        words_b = set(text_b.lower().split())
        if not words_a or not words_b:
            return 0.0
        intersection = words_a & words_b
        union = words_a | words_b
        return len(intersection) / len(union)

sweep_expired(ledger)

Remove expired items from the ledger.

Parameters:

Name Type Description Default
ledger ContextLedger

The context ledger.

required

Returns:

Type Description
tuple[ContextLedger, list[ContextItem]]

Tuple of (new ledger without expired items, list of expired items).

Source code in packages/pyagent-context/src/pyagent_context/lifecycle.py
def sweep_expired(self, ledger: ContextLedger) -> tuple[ContextLedger, list[ContextItem]]:
    """Remove expired items from the ledger.

    Args:
        ledger: The context ledger.

    Returns:
        Tuple of (new ledger without expired items, list of expired items).
    """
    now = time.time()
    kept: list[ContextItem] = []
    expired: list[ContextItem] = []

    for item in ledger.items:
        if item.expires_at is not None and now > item.expires_at:
            expired.append(item)
        else:
            kept.append(item)

    return ContextLedger(items=kept), expired

apply_freshness_decay(ledger, half_life_seconds=3600.0, min_tokens=1)

Reduce token estimates based on age.

Older items get smaller token budgets, making them less likely to survive compression. Items retain at least min_tokens.

Parameters:

Name Type Description Default
ledger ContextLedger

The context ledger.

required
half_life_seconds float

Seconds for token estimate to halve.

3600.0
min_tokens int

Minimum token estimate after decay.

1

Returns:

Type Description
ContextLedger

New ledger with decayed token estimates.

Source code in packages/pyagent-context/src/pyagent_context/lifecycle.py
def apply_freshness_decay(
    self,
    ledger: ContextLedger,
    half_life_seconds: float = 3600.0,
    min_tokens: int = 1,
) -> ContextLedger:
    """Reduce token estimates based on age.

    Older items get smaller token budgets, making them less likely
    to survive compression. Items retain at least ``min_tokens``.

    Args:
        ledger: The context ledger.
        half_life_seconds: Seconds for token estimate to halve.
        min_tokens: Minimum token estimate after decay.

    Returns:
        New ledger with decayed token estimates.
    """
    import math

    now = time.time()
    new_items: list[ContextItem] = []

    for item in ledger.items:
        age = now - item.timestamp
        decay_factor = math.exp(-age / half_life_seconds) if half_life_seconds > 0 else 1.0
        decayed_tokens = max(min_tokens, int(item.token_estimate * decay_factor))

        new_item = ContextItem(
            content=item.content,
            source=item.source,
            timestamp=item.timestamp,
            trust_level=item.trust_level,
            sensitivity=item.sensitivity,
            expires_at=item.expires_at,
            derived_from=item.derived_from,
            token_estimate=decayed_tokens,
        )
        new_item._id = item.id
        new_items.append(new_item)

    return ContextLedger(items=new_items)

consolidate(ledger)

Merge similar items from the same source.

When two items from the same source have high keyword overlap, they are merged into one with combined content and the higher trust level.

Parameters:

Name Type Description Default
ledger ContextLedger

The context ledger.

required

Returns:

Type Description
ContextLedger

New ledger with consolidated items.

Source code in packages/pyagent-context/src/pyagent_context/lifecycle.py
def consolidate(self, ledger: ContextLedger) -> ContextLedger:
    """Merge similar items from the same source.

    When two items from the same source have high keyword overlap,
    they are merged into one with combined content and the higher
    trust level.

    Args:
        ledger: The context ledger.

    Returns:
        New ledger with consolidated items.
    """
    by_source: dict[str, list[ContextItem]] = defaultdict(list)
    for item in ledger.items:
        by_source[item.source].append(item)

    consolidated: list[ContextItem] = []

    for source, items in by_source.items():
        merged_indices: set[int] = set()

        for i, item_a in enumerate(items):
            if i in merged_indices:
                continue
            merged_content = item_a.content
            best_trust = item_a.trust_level
            latest_time = item_a.timestamp

            for j in range(i + 1, len(items)):
                if j in merged_indices:
                    continue
                item_b = items[j]
                if (
                    self._similarity(item_a.content, item_b.content)
                    >= self._consolidation_threshold
                ):
                    merged_content = f"{merged_content}\n{item_b.content}"
                    if item_b.trust_level > best_trust:
                        best_trust = item_b.trust_level
                    latest_time = max(latest_time, item_b.timestamp)
                    merged_indices.add(j)

            new_item = ContextItem(
                content=merged_content,
                source=source,
                timestamp=latest_time,
                trust_level=best_trust,
                sensitivity=item_a.sensitivity,
                expires_at=item_a.expires_at,
                derived_from=item_a.id,
            )
            consolidated.append(new_item)

    # Sort by timestamp to maintain chronological order
    consolidated.sort(key=lambda x: x.timestamp)
    return ContextLedger(items=consolidated)

InMemorySemanticStore

In-memory semantic store using TF-IDF cosine similarity.

No external dependencies — suitable for testing and small datasets. For production, use a vector DB adapter (ChromaDB, Pinecone, etc.).

Parameters:

Name Type Description Default
stop_words set[str] | None

Optional set of words to ignore in scoring.

None
Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
class InMemorySemanticStore:
    """In-memory semantic store using TF-IDF cosine similarity.

    No external dependencies — suitable for testing and small datasets.
    For production, use a vector DB adapter (ChromaDB, Pinecone, etc.).

    Args:
        stop_words: Optional set of words to ignore in scoring.
    """

    def __init__(self, stop_words: set[str] | None = None) -> None:
        self._items: dict[str, ContextItem] = {}
        self._tf_cache: dict[str, dict[str, float]] = {}
        self._stop_words = stop_words or {
            "a",
            "an",
            "the",
            "is",
            "are",
            "was",
            "were",
            "be",
            "been",
            "being",
            "have",
            "has",
            "had",
            "do",
            "does",
            "did",
            "will",
            "would",
            "could",
            "should",
            "may",
            "might",
            "can",
            "shall",
            "to",
            "of",
            "in",
            "for",
            "on",
            "with",
            "at",
            "by",
            "from",
            "as",
            "into",
            "through",
            "during",
            "before",
            "after",
            "and",
            "but",
            "or",
            "nor",
            "not",
            "so",
            "yet",
            "both",
            "either",
            "neither",
            "each",
            "every",
            "all",
            "any",
            "few",
            "more",
            "most",
            "other",
            "some",
            "such",
            "no",
            "only",
            "own",
            "same",
            "than",
            "too",
            "very",
            "just",
            "because",
            "about",
            "between",
            "it",
            "its",
            "this",
            "that",
            "these",
            "those",
            "i",
            "me",
            "my",
            "we",
            "our",
            "you",
            "your",
            "he",
            "him",
            "his",
            "she",
            "her",
            "they",
            "them",
            "their",
            "what",
            "which",
            "who",
        }

    def add(self, item: ContextItem) -> None:
        self._items[item.id] = item
        self._tf_cache[item.id] = self._compute_tf(item.content)

    def search(self, query: str, top_k: int = 5) -> list[SearchResult]:
        if not self._items:
            return []

        query_tf = self._compute_tf(query)
        idf = self._compute_idf()

        query_tfidf = {w: tf * idf.get(w, 0.0) for w, tf in query_tf.items()}

        results: list[SearchResult] = []
        for item_id, item in self._items.items():
            doc_tf = self._tf_cache.get(item_id, {})
            doc_tfidf = {w: tf * idf.get(w, 0.0) for w, tf in doc_tf.items()}
            score = self._cosine_similarity(query_tfidf, doc_tfidf)
            if score > 0:
                results.append(SearchResult(item=item, score=score))

        results.sort(key=lambda r: r.score, reverse=True)
        return results[:top_k]

    def remove(self, item_id: str) -> bool:
        if item_id in self._items:
            del self._items[item_id]
            self._tf_cache.pop(item_id, None)
            return True
        return False

    def clear(self) -> None:
        self._items.clear()
        self._tf_cache.clear()

    def __len__(self) -> int:
        return len(self._items)

    # ------------------------------------------------------------------
    # TF-IDF helpers
    # ------------------------------------------------------------------

    def _tokenize(self, text: str) -> list[str]:
        words = text.lower().split()
        return [
            w.strip(".,!?;:\"'()[]{}")
            for w in words
            if w.strip(".,!?;:\"'()[]{}") not in self._stop_words
        ]

    def _compute_tf(self, text: str) -> dict[str, float]:
        tokens = self._tokenize(text)
        if not tokens:
            return {}
        counts = Counter(tokens)
        total = len(tokens)
        return {word: count / total for word, count in counts.items()}

    def _compute_idf(self) -> dict[str, float]:
        n_docs = len(self._items)
        if n_docs == 0:
            return {}
        doc_freq: Counter[str] = Counter()
        for tf in self._tf_cache.values():
            for word in tf:
                doc_freq[word] += 1
        return {word: math.log(n_docs / (1 + freq)) for word, freq in doc_freq.items()}

    @staticmethod
    def _cosine_similarity(a: dict[str, float], b: dict[str, float]) -> float:
        if not a or not b:
            return 0.0
        common_keys = set(a) & set(b)
        if not common_keys:
            return 0.0
        dot = sum(a[k] * b[k] for k in common_keys)
        mag_a = math.sqrt(sum(v**2 for v in a.values()))
        mag_b = math.sqrt(sum(v**2 for v in b.values()))
        if mag_a == 0 or mag_b == 0:
            return 0.0
        return dot / (mag_a * mag_b)

SemanticMemoryProtocol

Bases: Protocol

Interface for any vector-backed semantic memory store.

Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
@runtime_checkable
class SemanticMemoryProtocol(Protocol):
    """Interface for any vector-backed semantic memory store."""

    def add(self, item: ContextItem) -> None:
        """Index an item for later retrieval."""
        ...

    def search(self, query: str, top_k: int = 5) -> list[SearchResult]:
        """Find the most relevant items for a query."""
        ...

    def remove(self, item_id: str) -> bool:
        """Remove an item by ID. Returns True if found."""
        ...

    def clear(self) -> None:
        """Remove all items."""
        ...

    def __len__(self) -> int: ...

add(item)

Index an item for later retrieval.

Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
def add(self, item: ContextItem) -> None:
    """Index an item for later retrieval."""
    ...

search(query, top_k=5)

Find the most relevant items for a query.

Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
def search(self, query: str, top_k: int = 5) -> list[SearchResult]:
    """Find the most relevant items for a query."""
    ...

remove(item_id)

Remove an item by ID. Returns True if found.

Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
def remove(self, item_id: str) -> bool:
    """Remove an item by ID. Returns True if found."""
    ...

clear()

Remove all items.

Source code in packages/pyagent-context/src/pyagent_context/memory/semantic.py
def clear(self) -> None:
    """Remove all items."""
    ...

SessionMemory

Session-scoped memory that persists across turns.

Supports two backends: - json: simple JSON file per session (default) - sqlite: SQLite database for concurrent-safe access

Parameters:

Name Type Description Default
session_id str

Unique session identifier.

required
backend str

"json" or "sqlite".

'json'
storage_path str | Path

Directory for storage files. Defaults to ".pyagent_sessions".

'.pyagent_sessions'
Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
class SessionMemory:
    """Session-scoped memory that persists across turns.

    Supports two backends:
    - ``json``: simple JSON file per session (default)
    - ``sqlite``: SQLite database for concurrent-safe access

    Args:
        session_id: Unique session identifier.
        backend: ``"json"`` or ``"sqlite"``.
        storage_path: Directory for storage files. Defaults to ``".pyagent_sessions"``.
    """

    def __init__(
        self,
        session_id: str,
        backend: str = "json",
        storage_path: str | Path = ".pyagent_sessions",
    ) -> None:
        self._session_id = session_id
        self._backend = backend
        self._storage_path = Path(storage_path)
        self._storage_path.mkdir(parents=True, exist_ok=True)
        self._items: list[ContextItem] = []
        self._loaded = False

    @property
    def session_id(self) -> str:
        return self._session_id

    def add(self, item: ContextItem) -> None:
        """Add an item to the session."""
        self._ensure_loaded()
        self._items.append(item)

    def get_all(self) -> list[ContextItem]:
        """Return all items in this session."""
        self._ensure_loaded()
        return list(self._items)

    def save(self) -> None:
        """Persist current items to storage."""
        if self._backend == "sqlite":
            self._save_sqlite()
        else:
            self._save_json()

    def load(self) -> None:
        """Load items from storage."""
        if self._backend == "sqlite":
            self._load_sqlite()
        else:
            self._load_json()
        self._loaded = True

    def clear(self) -> None:
        """Remove all items and delete persisted data."""
        self._items.clear()
        path = self._file_path
        if path.exists():
            path.unlink()

    # ------------------------------------------------------------------
    # JSON backend
    # ------------------------------------------------------------------

    def _save_json(self) -> None:
        path = self._file_path
        data = [item.to_dict() for item in self._items]
        path.write_text(json.dumps(data, indent=2))

    def _load_json(self) -> None:
        path = self._file_path
        if path.exists():
            data = json.loads(path.read_text())
            self._items = [ContextItem.from_dict(d) for d in data]
        else:
            self._items = []

    # ------------------------------------------------------------------
    # SQLite backend
    # ------------------------------------------------------------------

    def _save_sqlite(self) -> None:
        db_path = self._storage_path / "sessions.db"
        conn = sqlite3.connect(str(db_path))
        try:
            conn.execute(
                "CREATE TABLE IF NOT EXISTS context_items "
                "(session_id TEXT, item_id TEXT PRIMARY KEY, data TEXT)"
            )
            conn.execute(
                "DELETE FROM context_items WHERE session_id = ?",
                (self._session_id,),
            )
            for item in self._items:
                conn.execute(
                    "INSERT INTO context_items (session_id, item_id, data) VALUES (?, ?, ?)",
                    (self._session_id, item.id, json.dumps(item.to_dict())),
                )
            conn.commit()
        finally:
            conn.close()

    def _load_sqlite(self) -> None:
        db_path = self._storage_path / "sessions.db"
        if not db_path.exists():
            self._items = []
            return
        conn = sqlite3.connect(str(db_path))
        try:
            conn.execute(
                "CREATE TABLE IF NOT EXISTS context_items "
                "(session_id TEXT, item_id TEXT PRIMARY KEY, data TEXT)"
            )
            cursor = conn.execute(
                "SELECT data FROM context_items WHERE session_id = ? ORDER BY rowid",
                (self._session_id,),
            )
            self._items = [ContextItem.from_dict(json.loads(row[0])) for row in cursor]
        finally:
            conn.close()

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    @property
    def _file_path(self) -> Path:
        suffix = ".db" if self._backend == "sqlite" else ".json"
        return self._storage_path / f"{self._session_id}{suffix}"

    def _ensure_loaded(self) -> None:
        if not self._loaded:
            self.load()

    def __len__(self) -> int:
        self._ensure_loaded()
        return len(self._items)

add(item)

Add an item to the session.

Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
def add(self, item: ContextItem) -> None:
    """Add an item to the session."""
    self._ensure_loaded()
    self._items.append(item)

get_all()

Return all items in this session.

Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
def get_all(self) -> list[ContextItem]:
    """Return all items in this session."""
    self._ensure_loaded()
    return list(self._items)

save()

Persist current items to storage.

Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
def save(self) -> None:
    """Persist current items to storage."""
    if self._backend == "sqlite":
        self._save_sqlite()
    else:
        self._save_json()

load()

Load items from storage.

Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
def load(self) -> None:
    """Load items from storage."""
    if self._backend == "sqlite":
        self._load_sqlite()
    else:
        self._load_json()
    self._loaded = True

clear()

Remove all items and delete persisted data.

Source code in packages/pyagent-context/src/pyagent_context/memory/session.py
def clear(self) -> None:
    """Remove all items and delete persisted data."""
    self._items.clear()
    path = self._file_path
    if path.exists():
        path.unlink()

WorkingMemory

Bounded working memory for a single pattern run.

Evicts the oldest items when capacity is exceeded (either by count or by total token estimate).

Parameters:

Name Type Description Default
max_items int

Maximum number of items to keep.

100
max_tokens int

Maximum total token estimate before eviction.

50000
Source code in packages/pyagent-context/src/pyagent_context/memory/working.py
class WorkingMemory:
    """Bounded working memory for a single pattern run.

    Evicts the oldest items when capacity is exceeded (either by count
    or by total token estimate).

    Args:
        max_items: Maximum number of items to keep.
        max_tokens: Maximum total token estimate before eviction.
    """

    def __init__(
        self,
        max_items: int = 100,
        max_tokens: int = 50_000,
    ) -> None:
        self._max_items = max_items
        self._max_tokens = max_tokens
        self._items: deque[ContextItem] = deque()

    def add(self, item: ContextItem) -> list[ContextItem]:
        """Add an item, evicting oldest if necessary.

        Returns:
            List of evicted items (empty if none were evicted).
        """
        evicted: list[ContextItem] = []
        self._items.append(item)

        # Evict by count
        while len(self._items) > self._max_items:
            evicted.append(self._items.popleft())

        # Evict by token budget
        while self.total_tokens > self._max_tokens and len(self._items) > 1:
            evicted.append(self._items.popleft())

        return evicted

    @property
    def total_tokens(self) -> int:
        return sum(item.token_estimate for item in self._items)

    @property
    def items(self) -> list[ContextItem]:
        return list(self._items)

    @property
    def utilization(self) -> float:
        """Token utilization as a fraction of max_tokens."""
        return self.total_tokens / self._max_tokens if self._max_tokens > 0 else 0.0

    def clear(self) -> None:
        self._items.clear()

    def __len__(self) -> int:
        return len(self._items)

    def __bool__(self) -> bool:
        return len(self._items) > 0

utilization property

Token utilization as a fraction of max_tokens.

add(item)

Add an item, evicting oldest if necessary.

Returns:

Type Description
list[ContextItem]

List of evicted items (empty if none were evicted).

Source code in packages/pyagent-context/src/pyagent_context/memory/working.py
def add(self, item: ContextItem) -> list[ContextItem]:
    """Add an item, evicting oldest if necessary.

    Returns:
        List of evicted items (empty if none were evicted).
    """
    evicted: list[ContextItem] = []
    self._items.append(item)

    # Evict by count
    while len(self._items) > self._max_items:
        evicted.append(self._items.popleft())

    # Evict by token budget
    while self.total_tokens > self._max_tokens and len(self._items) > 1:
        evicted.append(self._items.popleft())

    return evicted

ContextRedactor

Redact or filter context items based on sensitivity.

Items above the allowed sensitivity threshold are either fully redacted (content replaced) or excluded entirely.

Parameters:

Name Type Description Default
max_sensitivity Sensitivity

Maximum allowed sensitivity level. Items above this are redacted.

INTERNAL
redaction_text str

Replacement text for redacted items.

'[REDACTED]'
exclude_above bool

If True, items above max_sensitivity are excluded instead of redacted.

False
Source code in packages/pyagent-context/src/pyagent_context/redaction.py
class ContextRedactor:
    """Redact or filter context items based on sensitivity.

    Items above the allowed sensitivity threshold are either fully redacted
    (content replaced) or excluded entirely.

    Args:
        max_sensitivity: Maximum allowed sensitivity level. Items above
            this are redacted.
        redaction_text: Replacement text for redacted items.
        exclude_above: If ``True``, items above max_sensitivity are
            excluded instead of redacted.
    """

    def __init__(
        self,
        max_sensitivity: Sensitivity = Sensitivity.INTERNAL,
        redaction_text: str = "[REDACTED]",
        exclude_above: bool = False,
    ) -> None:
        self._max_sensitivity = max_sensitivity
        self._redaction_text = redaction_text
        self._exclude_above = exclude_above

    def redact_item(self, item: ContextItem) -> ContextItem | None:
        """Redact a single item if it exceeds the sensitivity threshold.

        Returns:
            The original item if within threshold, a redacted copy if above,
            or ``None`` if ``exclude_above`` is ``True`` and the item exceeds.
        """
        item_level = SENSITIVITY_ORDER.get(item.sensitivity, 0)
        max_level = SENSITIVITY_ORDER.get(self._max_sensitivity, 0)

        if item_level <= max_level:
            return item

        if self._exclude_above:
            return None

        return ContextItem(
            content=self._redaction_text,
            source=item.source,
            timestamp=item.timestamp,
            trust_level=item.trust_level,
            sensitivity=item.sensitivity,
            expires_at=item.expires_at,
            derived_from=item.id,
            token_estimate=max(1, len(self._redaction_text) // 4),
        )

    def redact_ledger(self, ledger: ContextLedger) -> ContextLedger:
        """Apply redaction to all items in a ledger.

        Returns:
            New ledger with redacted/filtered items.
        """
        new_items: list[ContextItem] = []
        for item in ledger.items:
            result = self.redact_item(item)
            if result is not None:
                new_items.append(result)
        return ContextLedger(items=new_items)

redact_item(item)

Redact a single item if it exceeds the sensitivity threshold.

Returns:

Type Description
ContextItem | None

The original item if within threshold, a redacted copy if above,

ContextItem | None

or None if exclude_above is True and the item exceeds.

Source code in packages/pyagent-context/src/pyagent_context/redaction.py
def redact_item(self, item: ContextItem) -> ContextItem | None:
    """Redact a single item if it exceeds the sensitivity threshold.

    Returns:
        The original item if within threshold, a redacted copy if above,
        or ``None`` if ``exclude_above`` is ``True`` and the item exceeds.
    """
    item_level = SENSITIVITY_ORDER.get(item.sensitivity, 0)
    max_level = SENSITIVITY_ORDER.get(self._max_sensitivity, 0)

    if item_level <= max_level:
        return item

    if self._exclude_above:
        return None

    return ContextItem(
        content=self._redaction_text,
        source=item.source,
        timestamp=item.timestamp,
        trust_level=item.trust_level,
        sensitivity=item.sensitivity,
        expires_at=item.expires_at,
        derived_from=item.id,
        token_estimate=max(1, len(self._redaction_text) // 4),
    )

redact_ledger(ledger)

Apply redaction to all items in a ledger.

Returns:

Type Description
ContextLedger

New ledger with redacted/filtered items.

Source code in packages/pyagent-context/src/pyagent_context/redaction.py
def redact_ledger(self, ledger: ContextLedger) -> ContextLedger:
    """Apply redaction to all items in a ledger.

    Returns:
        New ledger with redacted/filtered items.
    """
    new_items: list[ContextItem] = []
    for item in ledger.items:
        result = self.redact_item(item)
        if result is not None:
            new_items.append(result)
    return ContextLedger(items=new_items)

ScoredItem dataclass

A context item with a composite retrieval score.

Attributes:

Name Type Description
item ContextItem

The context item.

score float

Composite score (0.0-1.0).

trust_score float

Trust component of the score.

recency_score float

Recency component of the score.

relevance_score float

Relevance component of the score.

Source code in packages/pyagent-context/src/pyagent_context/retrieval.py
@dataclass(frozen=True)
class ScoredItem:
    """A context item with a composite retrieval score.

    Attributes:
        item: The context item.
        score: Composite score (0.0-1.0).
        trust_score: Trust component of the score.
        recency_score: Recency component of the score.
        relevance_score: Relevance component of the score.
    """

    item: ContextItem
    score: float
    trust_score: float
    recency_score: float
    relevance_score: float

TrustAwareRetriever

Retrieve context items scored by trust, recency, and keyword relevance.

Scoring formula

score = w_trust * trust + w_recency * recency + w_relevance * relevance

Where: - trust = TRUST_ORDER[item.trust_level] / 3.0 - recency = exp(-age_seconds / half_life) - relevance = keyword overlap ratio with query

Parameters:

Name Type Description Default
weight_trust float

Weight for trust component.

0.3
weight_recency float

Weight for recency component.

0.3
weight_relevance float

Weight for relevance component.

0.4
recency_half_life float

Seconds for recency score to halve.

3600.0
Source code in packages/pyagent-context/src/pyagent_context/retrieval.py
class TrustAwareRetriever:
    """Retrieve context items scored by trust, recency, and keyword relevance.

    Scoring formula:
        ``score = w_trust * trust + w_recency * recency + w_relevance * relevance``

    Where:
    - **trust** = ``TRUST_ORDER[item.trust_level] / 3.0``
    - **recency** = ``exp(-age_seconds / half_life)``
    - **relevance** = keyword overlap ratio with query

    Args:
        weight_trust: Weight for trust component.
        weight_recency: Weight for recency component.
        weight_relevance: Weight for relevance component.
        recency_half_life: Seconds for recency score to halve.
    """

    def __init__(
        self,
        weight_trust: float = 0.3,
        weight_recency: float = 0.3,
        weight_relevance: float = 0.4,
        recency_half_life: float = 3600.0,
    ) -> None:
        self._w_trust = weight_trust
        self._w_recency = weight_recency
        self._w_relevance = weight_relevance
        self._half_life = recency_half_life

    def retrieve(
        self,
        ledger: ContextLedger,
        query: str,
        *,
        top_k: int = 10,
        min_score: float = 0.0,
    ) -> list[ScoredItem]:
        """Score and rank all items in the ledger against a query.

        Args:
            ledger: The context ledger to search.
            query: The query string for relevance scoring.
            top_k: Maximum results to return.
            min_score: Minimum composite score to include.

        Returns:
            Ranked list of ``ScoredItem`` objects.
        """
        now = time.time()
        query_words = set(query.lower().split())
        results: list[ScoredItem] = []

        for item in ledger.items:
            if item.is_expired:
                continue

            trust = TRUST_ORDER.get(item.trust_level, 0) / 3.0
            age = now - item.timestamp
            recency = math.exp(-age / self._half_life) if self._half_life > 0 else 0.0
            relevance = self._keyword_relevance(item.content, query_words)

            score = (
                self._w_trust * trust + self._w_recency * recency + self._w_relevance * relevance
            )

            if score >= min_score:
                results.append(
                    ScoredItem(
                        item=item,
                        score=score,
                        trust_score=trust,
                        recency_score=recency,
                        relevance_score=relevance,
                    )
                )

        results.sort(key=lambda r: r.score, reverse=True)
        return results[:top_k]

    @staticmethod
    def _keyword_relevance(content: str, query_words: set[str]) -> float:
        """Simple keyword overlap ratio."""
        if not query_words:
            return 0.0
        content_words = set(content.lower().split())
        overlap = query_words & content_words
        return len(overlap) / len(query_words) if query_words else 0.0

retrieve(ledger, query, *, top_k=10, min_score=0.0)

Score and rank all items in the ledger against a query.

Parameters:

Name Type Description Default
ledger ContextLedger

The context ledger to search.

required
query str

The query string for relevance scoring.

required
top_k int

Maximum results to return.

10
min_score float

Minimum composite score to include.

0.0

Returns:

Type Description
list[ScoredItem]

Ranked list of ScoredItem objects.

Source code in packages/pyagent-context/src/pyagent_context/retrieval.py
def retrieve(
    self,
    ledger: ContextLedger,
    query: str,
    *,
    top_k: int = 10,
    min_score: float = 0.0,
) -> list[ScoredItem]:
    """Score and rank all items in the ledger against a query.

    Args:
        ledger: The context ledger to search.
        query: The query string for relevance scoring.
        top_k: Maximum results to return.
        min_score: Minimum composite score to include.

    Returns:
        Ranked list of ``ScoredItem`` objects.
    """
    now = time.time()
    query_words = set(query.lower().split())
    results: list[ScoredItem] = []

    for item in ledger.items:
        if item.is_expired:
            continue

        trust = TRUST_ORDER.get(item.trust_level, 0) / 3.0
        age = now - item.timestamp
        recency = math.exp(-age / self._half_life) if self._half_life > 0 else 0.0
        relevance = self._keyword_relevance(item.content, query_words)

        score = (
            self._w_trust * trust + self._w_recency * recency + self._w_relevance * relevance
        )

        if score >= min_score:
            results.append(
                ScoredItem(
                    item=item,
                    score=score,
                    trust_score=trust,
                    recency_score=recency,
                    relevance_score=relevance,
                )
            )

    results.sort(key=lambda r: r.score, reverse=True)
    return results[:top_k]