Skip to content

spanforge.sdk.observe — Observability Client

Module: spanforge.sdk.observe
Added in: 2.0.5 (Phase 6: Observability Named SDK)

spanforge.sdk.observe provides the Phase 6 observability SDK client. It handles span export (local buffer + OTLP / Datadog / Grafana / Splunk / Elastic), structured annotation storage, W3C TraceContext / Baggage injection, OTel GenAI semantic conventions, deterministic sampling, and health probes.

The pre-built sf_observe singleton is available at the top level:

from spanforge.sdk import sf_observe

Quick example

from spanforge.sdk import sf_observe

# Emit a span for an LLM call
span_id = sf_observe.emit_span(
    "chat.completion",
    {
        "gen_ai.system": "openai",
        "gen_ai.request.model": "gpt-4o",
        "gen_ai.usage.input_tokens": 512,
        "gen_ai.usage.output_tokens": 64,
    },
)
print(span_id)  # 16-hex span ID, e.g. "a3f1b2c4d5e6f708"

# Add a deploy annotation
annotation_id = sf_observe.add_annotation(
    "model_deployed",
    {"model": "gpt-4o", "version": "2024-11", "environment": "production"},
    project_id="my-project",
)
print(annotation_id)  # UUID string

# Retrieve recent annotations
from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
annotations = sf_observe.get_annotations(
    "model_deployed",
    (now - timedelta(hours=1)).isoformat(),
    now.isoformat(),
    project_id="my-project",
)
for ann in annotations:
    print(ann.event_type, ann.created_at)

# Export a batch of pre-built spans to an external OTLP endpoint
from spanforge.sdk import sf_observe, ReceiverConfig
result = sf_observe.export_spans(
    my_span_list,
    receiver_config=ReceiverConfig(
        endpoint="https://otel.collector.example.com/v1/traces",
        headers={"Authorization": "Bearer my-token"},
        timeout_seconds=10.0,
    ),
)
print(result.exported_count, result.backend)

# Health check
print(sf_observe.healthy)        # True / False
print(sf_observe.last_export_at) # ISO-8601 string or None

SFObserveClient

class SFObserveClient(SFServiceClient)

All public methods are thread-safe. A threading.Lock() guards the in-memory annotation store and session statistics.

Constructor

SFObserveClient(config: SFClientConfig)

Reads the following environment variables at construction time:

VariableMeaningDefault
SPANFORGE_OBSERVE_BACKENDExport backend"local"
SPANFORGE_OBSERVE_SAMPLERSampling strategy"always_on"
SPANFORGE_OBSERVE_SAMPLE_RATERatio sampler rate [0.0, 1.0]1.0
SPANFORGE_ENVdeployment.environment OTel resource attribute"production"

export_spans

def export_spans(
    spans: list[dict[str, Any]],
    *,
    receiver_config: ReceiverConfig | None = None,
) -> ExportResult

Export a list of span dicts to the active backend (OBS-001).

ParameterDescription
spansList of OTLP-compatible span dicts. Empty list returns ExportResult(0, 0, backend, ...).
receiver_configOptional per-call override. When set, spans are POSTed to receiver_config.endpoint as OTLP JSON, ignoring the global backend.

Returns: ExportResult(exported_count, failed_count, backend, exported_at).

Raises: SFObserveExportError on transport/HTTP failure (unless local_fallback_enabled=True on the config, in which case spans are buffered locally).


emit_span

def emit_span(name: str, attributes: dict[str, Any]) -> str

Build and export a single span with OTel GenAI conventions (OBS-004).

  • Injects W3C traceparent and baggage (OBS-011, OBS-012).
  • Adds OTel resource attributes (OBS-013).
  • Applies the active sampling strategy (OBS-031).
  • Inherits traceId from attributes["traceparent"] when present.
  • Sets status.code = STATUS_CODE_ERROR on error (OBS-015).

Returns: 16-hex span ID (always returned even when sampled out).

Raises: SFObserveEmitError on invalid name (empty) or attributes (non-dict).


add_annotation

def add_annotation(
    event_type: str,
    payload: dict[str, Any],
    *,
    project_id: str,
) -> str

Store a structured annotation in the in-memory log (OBS-002).

Returns: UUID annotation ID string.

Raises: SFObserveAnnotationError if event_type is empty or payload is not a dict.


get_annotations

def get_annotations(
    event_type: str,
    from_dt: str,
    to_dt: str,
    *,
    project_id: str = "",
) -> list[Annotation]

Retrieve annotations from the in-memory store (OBS-003).

ParameterDescription
event_typeFilter by event type. Use "*" to return all types.
from_dtISO-8601 datetime string (inclusive lower bound).
to_dtISO-8601 datetime string (inclusive upper bound).
project_idOptional project ID filter. Empty string = no filter.

Raises: SFObserveAnnotationError if from_dt or to_dt is not a valid ISO-8601 datetime.


get_status

def get_status() -> ObserveStatusInfo

Return current service health and session statistics.


healthy

@property
def healthy(self) -> bool

True unless the most recent export attempt raised an unrecovered error (OBS-043).


last_export_at

@property
def last_export_at(self) -> str | None

ISO-8601 timestamp of the last successful export_spans call, or None if no export has occurred yet (OBS-043).


W3C TraceContext helpers

from spanforge.sdk.observe import make_traceparent, extract_traceparent

make_traceparent

def make_traceparent(
    trace_id_hex: str,
    span_id_hex: str,
    *,
    sampled: bool,
) -> str

Encode a W3C traceparent header value (OBS-011).

00-<32 hex trace_id>-<16 hex span_id>-{01|00}

Raises: ValueError if trace_id_hex is not 32 hex chars or span_id_hex is not 16 hex chars.

extract_traceparent

def extract_traceparent(traceparent: str) -> tuple[str, str, bool]

Parse a traceparent header. Returns (trace_id_hex, span_id_hex, sampled).

Raises: ValueError on malformed input.


Sampling strategies

Set via SPANFORGE_OBSERVE_SAMPLER environment variable or by assigning client._sampler_strategy = SamplerStrategy.XXX before use.

StrategyEnv valueBehaviour
ALWAYS_ON"always_on"Every span is exported (default)
ALWAYS_OFF"always_off"No spans are exported
PARENT_BASED"parent_based"Follows parent's sampled flag; samples by default when no parent
TRACE_ID_RATIO"trace_id_ratio"Deterministic fraction based on SHA-256 hash of trace_id

Backend exporters

BackendEnv valueProtocol
Local buffer"local" (default)Bounded in-memory deque (≤ 10 000 spans)
OTLP"otlp"HTTP POST to {endpoint}/v1/traces as OTLP JSON
Datadog"datadog"HTTP POST to {endpoint}/api/v0.2/traces
Grafana Tempo"grafana"HTTP POST to {endpoint}/api/v1/push
Splunk HEC"splunk"HTTP POST to {endpoint}/services/collector
Elastic APM"elastic"HTTP POST to {endpoint}/_bulk as ECS documents

Types

ReceiverConfig

@dataclass(frozen=True)
class ReceiverConfig:
    endpoint: str
    headers: dict[str, str] = {}
    timeout_seconds: float = 30.0

Per-call OTLP receiver override for export_spans.

ExportResult

@dataclass(frozen=True)
class ExportResult:
    exported_count: int
    failed_count: int
    backend: str
    exported_at: str        # ISO-8601

Annotation

@dataclass(frozen=True)
class Annotation:
    annotation_id: str      # UUID
    event_type: str
    payload: dict[str, Any]
    project_id: str
    created_at: str         # ISO-8601

ObserveStatusInfo

@dataclass(frozen=True)
class ObserveStatusInfo:
    status: str             # "ok" | "error"
    backend: str
    sampler_strategy: str
    span_count: int
    annotation_count: int
    export_count: int
    last_export_at: str | None
    healthy: bool

SamplerStrategy

class SamplerStrategy(enum.Enum):
    ALWAYS_ON = "always_on"
    ALWAYS_OFF = "always_off"
    PARENT_BASED = "parent_based"
    TRACE_ID_RATIO = "trace_id_ratio"

emit_span_async

async def emit_span_async(name: str, attributes: dict[str, Any]) -> str

Async variant of emit_span. Runs the span build-and-export operation in a thread-pool executor so it does not block the event loop.

import asyncio
from spanforge.sdk import sf_observe

async def trace_request(req):
    span_id = await sf_observe.emit_span_async(
        "llm.request",
        {"model": "gpt-4o", "prompt_tokens": 512},
    )
    return span_id

Accepts the same parameters and returns the same 16-hex span ID as emit_span.


Exceptions

ExceptionInheritsRaised by
SFObserveErrorSFErrorBase for all observe errors
SFObserveExportErrorSFObserveErrorexport_spans on transport / HTTP failure
SFObserveEmitErrorSFObserveErroremit_span / emit_span_async on invalid input or export failure
SFObserveAnnotationErrorSFObserveErroradd_annotation / get_annotations on invalid input