Skip to content

spanforge.consumer

Consumer registry for tracking which tools and services depend on which schema namespaces, and enforcing schema-version compatibility at startup.

See the Governance & Consumer Registry user guide for usage patterns.


ConsumerRecord

@dataclass(frozen=True)
class ConsumerRecord:
    tool_name: str
    namespaces: Tuple[str, ...]
    schema_version: str
    contact: str | None = None
    metadata: Dict[str, str] = field(default_factory=dict)

An immutable record representing one registered consumer of the schema.

Attributes:

AttributeTypeDescription
tool_namestrThe name of the tool or service consuming events.
namespacesTuple[str, ...]Namespaces this consumer reads (e.g. ("llm.trace.*", "llm.cost.*")).
schema_versionstrThe minimum schema version this consumer requires (e.g. "1.0").
contactstr | NoneNone
metadataDict[str, str]Arbitrary extra metadata.

ConsumerRegistry

class ConsumerRegistry

Thread-safe registry that tracks all registered ConsumerRecord objects.

Example:

from spanforge.consumer import ConsumerRegistry, ConsumerRecord

registry = ConsumerRegistry()
registry.register(ConsumerRecord(
    tool_name="my-analytics-agent",
    namespaces=("llm.trace.*", "llm.cost.*"),
    schema_version="1.0",
    contact="analytics-team@example.com",
))

Methods

register(tool_name: str, *, namespaces: Sequence[str], schema_version: str, contact: str | None = None, metadata: dict[str, str] | None = None) -> ConsumerRecord

Register a consumer's schema requirements.

Args:

ParameterTypeDescription
tool_namestrName of the consuming tool or service.
namespacesSequence[str]Event namespaces required (e.g. ["trace", "eval"]).
schema_versionstrMinimum schema version required ("MAJOR.MINOR").
contactstr | NoneOptional contact info for compatibility escalations.
metadatadict[str, str] | NoneOptional freeform metadata dict.

Returns: ConsumerRecord

Raises: ValueError — if tool_name is empty, namespaces is empty, or schema_version is not in MAJOR.MINOR format.


all() -> List[ConsumerRecord]

Return a snapshot of all registered consumers.

Returns: List[ConsumerRecord]


by_namespace(namespace: str) -> List[ConsumerRecord]

Return all consumers that list namespace in their namespaces tuple. Supports exact matches and wildcard patterns (e.g. "llm.trace.*").

Args:

ParameterTypeDescription
namespacestrNamespace string to look up.

Returns: List[ConsumerRecord]


by_tool(tool_name: str) -> ConsumerRecord | None

Return the first record registered under the given tool_name, or None if not found.

Args:

ParameterTypeDescription
tool_namestrExact tool name string.

Returns: ConsumerRecord | None — the first matching record, or None if not found.


check_compatible(installed_version: str) -> List[Tuple[str, str]]

Check every registered consumer against installed_version.

A consumer is compatible if:

  • its major version equals the installed major version, and
  • its minor version is ≤ the installed minor version.

Args:

ParameterTypeDescription
installed_versionstrThe currently installed schema version (e.g. "1.1").

Returns: List[Tuple[str, str]] — list of (tool_name, schema_version) tuples for incompatible consumers. An empty list means all consumers are satisfied.


assert_compatible(installed_version: str) -> None

Like check_compatible, but raises if any consumer is incompatible.

Args:

ParameterTypeDescription
installed_versionstrThe currently installed schema version.

Raises: IncompatibleSchemaError — if one or more consumers require a version the installation cannot satisfy.


clear() -> None

Remove all registered consumers. Useful in tests.


IncompatibleSchemaError

class IncompatibleSchemaError(Exception):
    incompatible: List[Tuple[str, str]]

Raised by ConsumerRegistry.assert_compatible() when one or more registered consumers require a schema version the installation cannot satisfy.

Attributes:

AttributeTypeDescription
incompatibleList[Tuple[str, str]]List of (tool_name, required_version) pairs that are incompatible.

Module-level helpers

The module exposes a global registry singleton and convenience wrappers so most callers never need to instantiate ConsumerRegistry directly.

get_registry() -> ConsumerRegistry

Return the global ConsumerRegistry singleton.


register_consumer(tool_name: str, *, namespaces: Sequence[str], schema_version: str, contact: str | None = None, metadata: dict[str, str] | None = None) -> ConsumerRecord

Register a consumer in the global registry.

Args:

ParameterTypeDescription
tool_namestrName of the consuming tool or service.
namespacesSequence[str]Event namespaces required.
schema_versionstrMinimum schema version required ("MAJOR.MINOR").
contactstr | NoneOptional contact info.
metadatadict[str, str] | NoneOptional freeform metadata.

Returns: ConsumerRecord


assert_compatible(installed_version: str = "2.0") -> None

Assert all globally registered consumers are compatible with installed_version.

Defaults to the current library schema version ("2.0").

Raises: IncompatibleSchemaError — if any registered consumer is incompatible.

Example:

from spanforge.consumer import register_consumer, assert_compatible

register_consumer(
    "billing-agent",
    namespaces=("llm.cost.*",),
    schema_version="1.0",
    contact="billing-team@example.com",
)

# Call at startup — raises IncompatibleSchemaError if your tool requires
# a higher schema version than installed.
assert_compatible()