SSyncropel Docs

Python SDK

Integrate with Syncropel from Python — emit records, query threads, enforce grammar, fail open on transport errors.

Install

pip install syncropel

Python 3.10+. Single runtime dependency (httpx).

Hello, Syncropel

Start a Syncropel daemon locally (spl serve --daemon), then:

import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:my-app"),
    ) as client:
        result = await client.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow", "artists": ["Zonke"]},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print(result.success, result.record_id)

asyncio.run(main())

One call validates the body.kind grammar, builds the 8-field record envelope, retries on transient 5xx and network errors, and resolves cleanly on transport failures — so a flaky network never crashes your handler.

What you get

  • Async clientemit, query, query_thread, intend, fulfill, plus reserved-kind helpers
  • Sync helperemit_sync() for scripts and CLIs without an event loop
  • Grammar enforcementbody.kind validated client-side before any network call
  • Canonical references — built-in constructors for cross-publisher correlation (music tracks, code files, incidents, calendar events, and more)
  • Fail-open transport — every emit returns a result; transport errors never raise
  • Identity-aware — records are signed with the configured DID
  • In-memory MockKernel at syncropel.testing — write adapter tests without running a daemon

Core API

Client

MethodPurpose
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=)Primary emit. Validates, envelopes, retries. Returns EmitResult. Never raises on transport failures.
emit_sync(...)Synchronous variant. Persistent httpx.Client across calls amortises TCP keep-alive for bulk emits.
intend(goal, thread=, ...)Open a thread with an INTEND record. Generates a random thread ID if none supplied.
fulfill(thread, summary, fulfills=, ...)Close a thread with a KNOW record. fulfills accepts one record ID or a list.
query_thread(thread, limit=100, since=None)All records in a thread. Fail-open (returns [] on transport error).
query(kind=, actor=, thread=, since=, limit=100, where=None)Filtered records. At least one of kind/actor/thread is required.
health()Server health probe. Fail-open (returns {} on failure).
close()Release the underlying HTTP client.

Constructor kwargs: endpoint (required), identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None (bearer token for authenticated daemons), transport=None (custom httpx transport for tests).

Identity

Identity.static("did:example:my-app")     # Available today
Identity.key(path_or_bytes)                # Planned
Identity.federated(...)                    # Planned

Reserved-kind helpers

For records that participate in the protocol's coherence story:

await client.emit_correction(
    corrects=[record_id],
    revised_fields={"title": "Glow (Remastered)"},
    reason="updated metadata",
    thread="music.library",
)

await client.emit_erasure(
    erases=[record_id],
    reason="GDPR request",
    thread="music.library",
)

await client.emit_alias(
    old_kind="music.song",
    new_kind="music.catalog.track",
    reason="grammar migration",
    thread="music.library",
)

Each helper maps to a reserved core.* kind and enforces the appropriate body shape.

Canonical references (Ref)

Canonical refs let records about the same real-world entity correlate across apps and publishers:

Ref.music_track(isrc="USJI19810404")            # → @music.track
Ref.code_file(repo="acme/myproject",
              path="src/main.rs")               # → @code.file
Ref.ops_incident(pagerduty="ABC123")            # → @ops.incident
Ref.cal_event(uid="abc@google.com")             # → @cal.event
Ref.social_person(email="alice@example.com")   # → @social.person

Eleven categories total — full list in the inline help: help(Ref).

Authenticating to a remote daemon

When the Syncropel daemon has bearer-token auth enabled, pass the token via api_key:

import os

async with Client(
    endpoint="https://your-host.example.com:9100",
    identity=Identity.static("did:example:my-app"),
    api_key=os.environ["SPL_TOKEN"],
) as client:
    ...

The client sends Authorization: Bearer <token> on every outbound request. See Authentication & Service Accounts for how to mint the token.

Synchronous usage

For scripts and CLI tools without an event loop:

from syncropel import Client, Identity

client = Client(
    endpoint="http://localhost:9100",
    identity=Identity.static("did:example:cli-tool"),
)

result = client.emit_sync(
    act="KNOW",
    kind="ops.deploy.completed",
    body={"version": "1.2.3", "duration_s": 42},
    thread="ops.deploys",
)
print(result.success, result.record_id)

client.close()

emit_sync() uses a persistent httpx.Client across calls, so bulk emits share a TCP connection.

Inference — client.infer()

client.infer() emits an infer.query.v1 INTEND and (by default) polls until the KNOW lands. The full query anatomy lives in the Inference guide; this section is the SDK surface.

from syncropel import Client

client = Client(endpoint="http://localhost:9100")

result = await client.infer({
    "input": {"goal": "Summarise this paper in one paragraph"},
    "responders": [
        {"kind": "llm", "model": "~sonnet"},
        {"kind": "llm", "model": "~gpt-4o"},
    ],
    "fold": {"function": "consensus", "min_quorum": 2},
    "answer_shape": {"kind": "core.summary.v1"},
    "side_effects": {"max_cost_usd": 0.30, "max_latency_secs": 60},
})

print(result.answer)
print(f"Trust: {result.trust_summary['mean']:.2f}")
print(f"Cost: ${result.cost_actual_usd:.4f}")

The request dict mirrors the schema field-for-field. See Query Anatomy for every field.

Fire-and-forget

Pass InferOptions(wait=False) to emit the INTEND and return immediately:

from syncropel import InferOptions

pending = await client.infer(request, InferOptions(wait=False))
print(pending.correlation_id, pending.thread_id)

You get back InferPending with pending=True, correlation_id, and thread_id. Poll the thread yourself or subscribe for the KNOW.

Options

await client.infer(request, InferOptions(
    wait=True,                # default
    poll_interval_ms=500,     # default
    poll_timeout_ms=300_000,  # default — 5 minutes
    thread="th_abc123...",    # optional explicit thread
))

Result shape

@dataclass
class InferResult:
    answer: Any
    provenance: list[ProvenanceEntry]
    trust_summary: dict  # mean, min, max, n_contributors
    cost_actual_usd: float
    fold_function: str
    chosen_response_id: str | None
    know_record_id: str
    thread_id: str

Error handling — fail-hard (not fail-open)

Unlike client.emit(), client.infer() fails hard:

  • Invalid request shape raises ValueError / TypeError before any HTTP call.
  • Network or daemon error on INTEND emission raises RuntimeError.
  • Poll timeout raises TimeoutError.

Catch timeouts explicitly if you want the thread id preserved for manual polling.

Testing without a daemon

Use MockKernel to write unit tests against the SDK's interface without running spl serve:

from syncropel import Client, Identity
from syncropel.testing import MockKernel

def test_my_adapter():
    kernel = MockKernel()

    client = Client(
        endpoint="mock://",
        identity=Identity.static("did:example:test"),
        transport=kernel.transport(),
    )

    # Call your code-under-test
    my_adapter_function(client)

    # Assert against what was emitted
    records = kernel.records_for_thread("my-thread")
    assert len(records) == 1
    assert records[0].body["kind"] == "music.catalog.track"

What's next

On this page