Python SDK
Integrate with Syncropel from Python — emit records, query threads, enforce grammar, fail open on transport errors.
Install
pip install syncropelPython 3.10+. Single runtime dependency (httpx).
Hello, Syncropel
Start a Syncropel daemon locally (spl serve --daemon), then:
import asyncio
from syncropel import Client, Identity, Ref
async def main():
async with Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:my-app"),
) as client:
result = await client.emit(
act="PUT",
kind="music.catalog.track",
body={"title": "Glow", "artists": ["Zonke"]},
refs=[Ref.music_track(isrc="USJI19810404")],
thread="music.library",
)
print(result.success, result.record_id)
asyncio.run(main())One call validates the body.kind grammar, builds the 8-field record envelope, retries on transient 5xx and network errors, and resolves cleanly on transport failures — so a flaky network never crashes your handler.
What you get
- Async client —
emit,query,query_thread,intend,fulfill, plus reserved-kind helpers - Sync helper —
emit_sync()for scripts and CLIs without an event loop - Grammar enforcement —
body.kindvalidated client-side before any network call - Canonical references — built-in constructors for cross-publisher correlation (music tracks, code files, incidents, calendar events, and more)
- Fail-open transport — every emit returns a result; transport errors never raise
- Identity-aware — records are signed with the configured DID
- In-memory
MockKernelatsyncropel.testing— write adapter tests without running a daemon
Core API
Client
| Method | Purpose |
|---|---|
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=) | Primary emit. Validates, envelopes, retries. Returns EmitResult. Never raises on transport failures. |
emit_sync(...) | Synchronous variant. Persistent httpx.Client across calls amortises TCP keep-alive for bulk emits. |
intend(goal, thread=, ...) | Open a thread with an INTEND record. Generates a random thread ID if none supplied. |
fulfill(thread, summary, fulfills=, ...) | Close a thread with a KNOW record. fulfills accepts one record ID or a list. |
query_thread(thread, limit=100, since=None) | All records in a thread. Fail-open (returns [] on transport error). |
query(kind=, actor=, thread=, since=, limit=100, where=None) | Filtered records. At least one of kind/actor/thread is required. |
health() | Server health probe. Fail-open (returns {} on failure). |
close() | Release the underlying HTTP client. |
Constructor kwargs: endpoint (required), identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None (bearer token for authenticated daemons), transport=None (custom httpx transport for tests).
Identity
Identity.static("did:example:my-app") # Available today
Identity.key(path_or_bytes) # Planned
Identity.federated(...) # PlannedReserved-kind helpers
For records that participate in the protocol's coherence story:
await client.emit_correction(
corrects=[record_id],
revised_fields={"title": "Glow (Remastered)"},
reason="updated metadata",
thread="music.library",
)
await client.emit_erasure(
erases=[record_id],
reason="GDPR request",
thread="music.library",
)
await client.emit_alias(
old_kind="music.song",
new_kind="music.catalog.track",
reason="grammar migration",
thread="music.library",
)Each helper maps to a reserved core.* kind and enforces the appropriate body shape.
Canonical references (Ref)
Canonical refs let records about the same real-world entity correlate across apps and publishers:
Ref.music_track(isrc="USJI19810404") # → @music.track
Ref.code_file(repo="acme/myproject",
path="src/main.rs") # → @code.file
Ref.ops_incident(pagerduty="ABC123") # → @ops.incident
Ref.cal_event(uid="abc@google.com") # → @cal.event
Ref.social_person(email="alice@example.com") # → @social.personEleven categories total — full list in the inline help: help(Ref).
Authenticating to a remote daemon
When the Syncropel daemon has bearer-token auth enabled, pass the token via api_key:
import os
async with Client(
endpoint="https://your-host.example.com:9100",
identity=Identity.static("did:example:my-app"),
api_key=os.environ["SPL_TOKEN"],
) as client:
...The client sends Authorization: Bearer <token> on every outbound request. See Authentication & Service Accounts for how to mint the token.
Synchronous usage
For scripts and CLI tools without an event loop:
from syncropel import Client, Identity
client = Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:cli-tool"),
)
result = client.emit_sync(
act="KNOW",
kind="ops.deploy.completed",
body={"version": "1.2.3", "duration_s": 42},
thread="ops.deploys",
)
print(result.success, result.record_id)
client.close()emit_sync() uses a persistent httpx.Client across calls, so bulk emits share a TCP connection.
Inference — client.infer()
client.infer() emits an infer.query.v1 INTEND and (by default) polls until the KNOW lands. The full query anatomy lives in the Inference guide; this section is the SDK surface.
from syncropel import Client
client = Client(endpoint="http://localhost:9100")
result = await client.infer({
"input": {"goal": "Summarise this paper in one paragraph"},
"responders": [
{"kind": "llm", "model": "~sonnet"},
{"kind": "llm", "model": "~gpt-4o"},
],
"fold": {"function": "consensus", "min_quorum": 2},
"answer_shape": {"kind": "core.summary.v1"},
"side_effects": {"max_cost_usd": 0.30, "max_latency_secs": 60},
})
print(result.answer)
print(f"Trust: {result.trust_summary['mean']:.2f}")
print(f"Cost: ${result.cost_actual_usd:.4f}")The request dict mirrors the schema field-for-field. See Query Anatomy for every field.
Fire-and-forget
Pass InferOptions(wait=False) to emit the INTEND and return immediately:
from syncropel import InferOptions
pending = await client.infer(request, InferOptions(wait=False))
print(pending.correlation_id, pending.thread_id)You get back InferPending with pending=True, correlation_id, and thread_id. Poll the thread yourself or subscribe for the KNOW.
Options
await client.infer(request, InferOptions(
wait=True, # default
poll_interval_ms=500, # default
poll_timeout_ms=300_000, # default — 5 minutes
thread="th_abc123...", # optional explicit thread
))Result shape
@dataclass
class InferResult:
answer: Any
provenance: list[ProvenanceEntry]
trust_summary: dict # mean, min, max, n_contributors
cost_actual_usd: float
fold_function: str
chosen_response_id: str | None
know_record_id: str
thread_id: strError handling — fail-hard (not fail-open)
Unlike client.emit(), client.infer() fails hard:
- Invalid request shape raises
ValueError/TypeErrorbefore any HTTP call. - Network or daemon error on INTEND emission raises
RuntimeError. - Poll timeout raises
TimeoutError.
Catch timeouts explicitly if you want the thread id preserved for manual polling.
Testing without a daemon
Use MockKernel to write unit tests against the SDK's interface without running spl serve:
from syncropel import Client, Identity
from syncropel.testing import MockKernel
def test_my_adapter():
kernel = MockKernel()
client = Client(
endpoint="mock://",
identity=Identity.static("did:example:test"),
transport=kernel.transport(),
)
# Call your code-under-test
my_adapter_function(client)
# Assert against what was emitted
records = kernel.records_for_thread("my-thread")
assert len(records) == 1
assert records[0].body["kind"] == "music.catalog.track"What's next
- Records concept — the 8-field envelope and why it's content-addressed
- Threads concept — how records group into workflows
- Authentication & Service Accounts — securing the daemon your Python code talks to
- Agent integration guide — use the SDK inside an AI agent
TypeScript SDK
Integrate with Syncropel from JavaScript or TypeScript — emit records, query threads, enforce grammar, fail open on transport errors. Works in Node, Deno, Bun, Cloudflare Workers, and modern browsers.
Projections
Schema, validators, and a markdown-subset parser for the Syncropel Rendering Protocol (SRP) — the declarative document format for query-driven and AI-generated UI.