Skip to content

HMAC Signing & Audit Chains

spanforge provides a cryptographic audit trail based on HMAC-SHA256. Every signed event carries a payload checksum and a chain signature that links it to its predecessor, forming a tamper-evident sequence that can detect deletions, reorderings, and payload modifications.

How signing works

checksum  = sha256(canonical_payload_json)
sig_input = event_id + "|" + checksum + "|" + (prev_id or "")
signature = HMAC-SHA256(sig_input, org_secret)

The canonical payload JSON is compact (no whitespace) with sorted keys for determinism. The resulting checksum and signature values are stored directly on the event.

Signing a single event

from spanforge import Event, EventType
from spanforge.signing import sign, verify, assert_verified

event = Event(
    event_type=EventType.TRACE_SPAN_COMPLETED,
    source="my-tool@1.0.0",
    payload={"span_name": "chat"},
)

signed = sign(event, org_secret="my-org-secret")

assert signed.checksum is not None      # "sha256:..."
assert signed.signature is not None     # "hmac-sha256:..."
assert signed.prev_id is None           # first in chain

# Verify
assert verify(signed, org_secret="my-org-secret") is True

# Strict variant — raises VerificationError on failure
assert_verified(signed, org_secret="my-org-secret")

Building an audit chain

Use AuditStream to build a chain where each event is linked to the previous one via prev_id:

from spanforge import Event, EventType
from spanforge.signing import AuditStream

stream = AuditStream(org_secret="my-org-secret", source="my-tool@1.0.0")

events_to_sign = [
    Event(event_type=EventType.TRACE_SPAN_COMPLETED,
          source="my-tool@1.0.0",
          payload={"index": i})
    for i in range(10)
]

for evt in events_to_sign:
    signed = stream.append(evt)     # returns signed event with prev_id set

print(len(stream))                  # 10
print(stream.events[0].prev_id)     # None — first event
print(stream.events[1].prev_id)     # == stream.events[0].event_id

Verifying a chain

from spanforge.signing import verify_chain

result = stream.verify()             # or: verify_chain(events, org_secret="...")

assert result.valid                  # True if no tampering or gaps
assert result.tampered_count == 0    # number of events with bad signatures
assert result.gaps == []             # event_ids where prev_id linkage broke
assert result.first_tampered is None # first tampered event_id, or None

Detecting tampering

from spanforge.signing import verify_chain

# Tamper with an event's payload after signing
signed_events = list(stream.events)
object.__setattr__(signed_events[3], "_payload", {"hacked": True})

result = verify_chain(signed_events, org_secret="my-org-secret")
assert not result.valid
assert result.tampered_count >= 1
assert result.first_tampered == signed_events[3].event_id

Detecting deletions (gaps)

# Remove event index 2 from the chain
events_with_gap = [e for i, e in enumerate(stream.events) if i != 2]

result = verify_chain(events_with_gap, org_secret="my-org-secret")
assert not result.valid
assert stream.events[3].event_id in result.gaps

Key rotation

For long-lived audit streams, rotate the signing key periodically. The rotation event itself is signed with the old key, providing continuity:

stream = AuditStream(org_secret="old-secret", source="my-tool@1.0.0")

# ... append events ...

rotation_event = stream.rotate_key(
    "new-secret-v2",
    metadata={"reason": "scheduled", "rotated_by": "ops-team"},
)

# Subsequent events are signed with "new-secret-v2"
# Verification still works across the rotation boundary:
result = stream.verify()
assert result.valid

Higher-level compliance wrapper

The spanforge.compliance module provides a richer wrapper over verify_chain() that includes gap reporting, violation objects, and timestamp monotonicity checks:

from spanforge.compliance import verify_chain_integrity

result = verify_chain_integrity(events, org_secret="my-org-secret")
if not result:
    for v in result.violations:
        print(f"[{v.violation_type}] {v.event_id}: {v.detail}")

See compliance.md for the full compliance API.


Key strength validation (new in v1.0.0)

Before using a key in production, validate it against security requirements:

from spanforge.signing import validate_key_strength

warnings = validate_key_strength("my-secret")
if warnings:
    for w in warnings:
        print(f"⚠ {w}")
    # Key length 9 < minimum 32 characters
    # Key uses only 2 character class(es); recommend at least 2 (upper, lower, digit, special)

validate_key_strength() is also called automatically by spanforge.configure() when a signing_key is set — warnings are logged via the spanforge.config logger.

You can control the minimum key length via the SPANFORGE_SIGNING_KEY_MIN_BITS environment variable (value in bits, divided by 8 for character count):

export SPANFORGE_SIGNING_KEY_MIN_BITS=512   # requires 64-character key

Key expiry checking (new in v1.0.0)

Configure a key expiry date to prevent use of stale keys:

export SPANFORGE_SIGNING_KEY_EXPIRES_AT="2026-12-31T00:00:00Z"

Once configured, sign() will raise SigningError if the key has expired. Check key status programmatically:

from spanforge.signing import check_key_expiry

status, days = check_key_expiry("2026-12-31T00:00:00Z")
# status: "valid", "expiring_soon" (≤7 days), "expired", or "no_expiry"
# days:   days remaining (or days since expiry)

Environment-isolated key derivation (new in v1.0.0)

Use the context parameter on derive_key() to ensure the same passphrase produces different keys for different environments:

from spanforge.signing import derive_key

prod_key, prod_salt = derive_key("my-passphrase", context="production")
stg_key, stg_salt = derive_key("my-passphrase", context="staging")

assert prod_key != stg_key  # different environments → different keys

Or set via environment variable:

export SPANFORGE_SIGNING_KEY_CONTEXT="production"

Multi-tenant key resolution (new in v1.0.0)

In multi-tenant deployments, different organisations may use different signing keys. Use a KeyResolver to resolve the correct key per event:

from spanforge.signing import AuditStream, DictKeyResolver

resolver = DictKeyResolver({
    "org_acme": "acme-secret-key-that-is-long-enough",
    "org_beta": "beta-secret-key-that-is-long-enough",
})

stream = AuditStream(
    org_secret="fallback-key-for-events-without-org",
    source="multi-tenant-svc@1.0.0",
    key_resolver=resolver,
    require_org_id=True,  # enforce that every event has an org_id
)

# Each event is signed with its org's key
stream.append(event_for_acme)   # signed with "acme-secret-key..."
stream.append(event_for_beta)   # signed with "beta-secret-key..."

Three resolvers are provided:

ResolverDescription
StaticKeyResolver(secret)Same key for all orgs (single-tenant / testing).
EnvKeyResolver(prefix)Reads {prefix}{ORG_ID} from environment variables.
DictKeyResolver(keys)In-memory {org_id: secret} dictionary.

You can also implement the KeyResolver protocol for custom resolution (e.g. from a secrets manager):

from spanforge.signing import KeyResolver

class VaultKeyResolver:
    def resolve(self, org_id: str) -> str:
        return vault_client.get_secret(f"signing-key/{org_id}")

Async audit streams (new in v1.0.0)

For asyncio-based applications, use AsyncAuditStream which uses asyncio.Lock instead of threading.RLock:

import asyncio
from spanforge.signing import AsyncAuditStream

async def main():
    stream = AsyncAuditStream(org_secret="my-key", source="async-svc@1.0.0")

    signed = await stream.append(event)
    await stream.rotate_key("new-key", metadata={"reason": "scheduled"})

    result = await stream.verify()
    assert result.valid

asyncio.run(main())