SSyncropel Docs

Python SDK

Integrate with Syncropel from Python — emit records, query threads, enforce grammar, fail open on transport errors.

Install

pip install syncropel

Python 3.10+. Single runtime dependency (httpx).

Hello, Syncropel

Start a Syncropel instance locally (spl serve), then:

import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:my-app"),
    ) as client:
        result = await client.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow", "artists": ["Zonke"]},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print(result.success, result.record_id)

asyncio.run(main())

One call validates the body.kind grammar, builds the 8-field record envelope, retries on transient 5xx and network errors, and resolves cleanly on transport failures — so a flaky network never crashes your handler.

What you get

  • Async clientemit, query, query_thread, intend, fulfill, plus reserved-kind helpers
  • Sync helperemit_sync() for scripts and CLIs without an event loop
  • Grammar enforcementbody.kind validated client-side before any network call
  • Canonical references — built-in constructors for cross-publisher correlation (music tracks, code files, incidents, calendar events, and more)
  • Fail-open transport — every emit returns a result; transport errors never raise
  • Identity-aware — records are signed with the configured DID
  • In-memory MockKernel at syncropel.testing — write adapter tests without running an instance

Core API

Client

MethodPurpose
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=)Primary emit. Validates, envelopes, retries. Returns EmitResult. Never raises on transport failures.
emit_sync(...)Synchronous variant. Persistent httpx.Client across calls amortises TCP keep-alive for bulk emits.
intend(goal, thread=, ...)Open a thread with an INTEND record. Generates a random thread ID if none supplied.
fulfill(thread, summary, fulfills=, ...)Close a thread with a KNOW record. fulfills accepts one record ID or a list.
query_thread(thread, limit=100, since=None)All records in a thread. Fail-open (returns [] on transport error).
query(kind=, actor=, thread=, since=, limit=100, where=None)Filtered records. At least one of kind/actor/thread is required.
health()Server health probe. Fail-open (returns {} on failure).
close()Release the underlying HTTP client.

Constructor kwargs: endpoint (required), identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None (bearer token for authenticated instances), transport=None (custom httpx transport for tests).

Identity

Identity.static("did:example:my-app")     # Available today
Identity.key(path_or_bytes)                # Planned
Identity.federated(...)                    # Planned

Reserved-kind helpers

For records that participate in the protocol's coherence story:

await client.emit_correction(
    corrects=[record_id],
    revised_fields={"title": "Glow (Remastered)"},
    reason="updated metadata",
    thread="music.library",
)

await client.emit_erasure(
    erases=[record_id],
    reason="GDPR request",
    thread="music.library",
)

await client.emit_alias(
    old_kind="music.song",
    new_kind="music.catalog.track",
    reason="grammar migration",
    thread="music.library",
)

Each helper maps to a reserved core.* kind and enforces the appropriate body shape.

Canonical references (Ref)

Canonical refs let records about the same real-world entity correlate across apps and publishers:

Ref.music_track(isrc="USJI19810404")            # → @music.track
Ref.code_file(repo="acme/myproject",
              path="src/main.rs")               # → @code.file
Ref.ops_incident(pagerduty="ABC123")            # → @ops.incident
Ref.cal_event(uid="abc@google.com")             # → @cal.event
Ref.social_person(email="alice@example.com")   # → @social.person

Eleven categories total — full list in the inline help: help(Ref).

Authenticating to a remote instance

When the Syncropel instance has bearer-token auth enabled, pass the token via api_key:

import os

async with Client(
    endpoint="https://your-host.example.com:9100",
    identity=Identity.static("did:example:my-app"),
    api_key=os.environ["SPL_TOKEN"],
) as client:
    ...

The client sends Authorization: Bearer <token> on every outbound request. See Authentication & Service Accounts for how to mint the token.

Synchronous usage

For scripts and CLI tools without an event loop:

from syncropel import Client, Identity

client = Client(
    endpoint="http://localhost:9100",
    identity=Identity.static("did:example:cli-tool"),
)

result = client.emit_sync(
    act="KNOW",
    kind="ops.deploy.completed",
    body={"version": "1.2.3", "duration_s": 42},
    thread="ops.deploys",
)
print(result.success, result.record_id)

client.close()

emit_sync() uses a persistent httpx.Client across calls, so bulk emits share a TCP connection.

Inference — client.infer()

client.infer() emits an infer.query.v1 INTEND and (by default) polls until the KNOW lands. The full query anatomy lives in the Inference guide; this section is the SDK surface.

from syncropel import Client

client = Client(endpoint="http://localhost:9100")

result = await client.infer({
    "input": {"goal": "Summarise this paper in one paragraph"},
    "responders": [
        {"kind": "llm", "model": "~sonnet"},
        {"kind": "llm", "model": "~gpt-4o"},
    ],
    "fold": {"function": "consensus", "min_quorum": 2},
    "answer_shape": {"kind": "core.summary.v1"},
    "side_effects": {"max_cost_usd": 0.30, "max_latency_secs": 60},
})

print(result.answer)
print(f"Trust: {result.trust_summary['mean']:.2f}")
print(f"Cost: ${result.cost_actual_usd:.4f}")

The request dict mirrors the schema field-for-field. See Query Anatomy for every field.

Fire-and-forget

Pass InferOptions(wait=False) to emit the INTEND and return immediately:

from syncropel import InferOptions

pending = await client.infer(request, InferOptions(wait=False))
print(pending.correlation_id, pending.thread_id)

You get back InferPending with pending=True, correlation_id, and thread_id. Poll the thread yourself or subscribe for the KNOW.

Options

await client.infer(request, InferOptions(
    wait=True,                # default
    poll_interval_ms=500,     # default
    poll_timeout_ms=300_000,  # default — 5 minutes
    thread="th_abc123...",    # optional explicit thread
))

Result shape

@dataclass
class InferResult:
    answer: Any
    provenance: list[ProvenanceEntry]
    trust_summary: dict  # mean, min, max, n_contributors
    cost_actual_usd: float
    fold_function: str
    chosen_response_id: str | None
    know_record_id: str
    thread_id: str

Error handling — fail-hard (not fail-open)

Unlike client.emit(), client.infer() fails hard:

  • Invalid request shape raises ValueError / TypeError before any HTTP call.
  • Network or instance error on INTEND emission raises RuntimeError.
  • Poll timeout raises TimeoutError.

Catch timeouts explicitly if you want the thread id preserved for manual polling.

Testing without an instance

Use MockKernel to write unit tests against the SDK's interface without running spl serve:

from syncropel import Client, Identity
from syncropel.testing import MockKernel

def test_my_adapter():
    kernel = MockKernel()

    client = Client(
        endpoint="mock://",
        identity=Identity.static("did:example:test"),
        transport=kernel.transport(),
    )

    # Call your code-under-test
    my_adapter_function(client)

    # Assert against what was emitted
    records = kernel.records_for_thread("my-thread")
    assert len(records) == 1
    assert records[0].body["kind"] == "music.catalog.track"

What's next

On this page