Python SDK
Integrate with Syncropel from Python — emit records, query threads, enforce grammar, fail open on transport errors.
Install
pip install syncropelPython 3.10+. Single runtime dependency (httpx).
Hello, Syncropel
Start a Syncropel instance locally (spl serve), then:
import asyncio
from syncropel import Client, Identity, Ref
async def main():
async with Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:my-app"),
) as client:
result = await client.emit(
act="PUT",
kind="music.catalog.track",
body={"title": "Glow", "artists": ["Zonke"]},
refs=[Ref.music_track(isrc="USJI19810404")],
thread="music.library",
)
print(result.success, result.record_id)
asyncio.run(main())One call validates the body.kind grammar, builds the 8-field record envelope, retries on transient 5xx and network errors, and resolves cleanly on transport failures — so a flaky network never crashes your handler.
What you get
- Async client —
emit,query,query_thread,intend,fulfill, plus reserved-kind helpers - Sync helper —
emit_sync()for scripts and CLIs without an event loop - Grammar enforcement —
body.kindvalidated client-side before any network call - Canonical references — built-in constructors for cross-publisher correlation (music tracks, code files, incidents, calendar events, and more)
- Fail-open transport — every emit returns a result; transport errors never raise
- Identity-aware — records are signed with the configured DID
- In-memory
MockKernelatsyncropel.testing— write adapter tests without running an instance
Core API
Client
| Method | Purpose |
|---|---|
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=) | Primary emit. Validates, envelopes, retries. Returns EmitResult. Never raises on transport failures. |
emit_sync(...) | Synchronous variant. Persistent httpx.Client across calls amortises TCP keep-alive for bulk emits. |
intend(goal, thread=, ...) | Open a thread with an INTEND record. Generates a random thread ID if none supplied. |
fulfill(thread, summary, fulfills=, ...) | Close a thread with a KNOW record. fulfills accepts one record ID or a list. |
query_thread(thread, limit=100, since=None) | All records in a thread. Fail-open (returns [] on transport error). |
query(kind=, actor=, thread=, since=, limit=100, where=None) | Filtered records. At least one of kind/actor/thread is required. |
health() | Server health probe. Fail-open (returns {} on failure). |
close() | Release the underlying HTTP client. |
Constructor kwargs: endpoint (required), identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None (bearer token for authenticated instances), transport=None (custom httpx transport for tests).
Identity
Identity.static("did:example:my-app") # Available today
Identity.key(path_or_bytes) # Planned
Identity.federated(...) # PlannedReserved-kind helpers
For records that participate in the protocol's coherence story:
await client.emit_correction(
corrects=[record_id],
revised_fields={"title": "Glow (Remastered)"},
reason="updated metadata",
thread="music.library",
)
await client.emit_erasure(
erases=[record_id],
reason="GDPR request",
thread="music.library",
)
await client.emit_alias(
old_kind="music.song",
new_kind="music.catalog.track",
reason="grammar migration",
thread="music.library",
)Each helper maps to a reserved core.* kind and enforces the appropriate body shape.
Canonical references (Ref)
Canonical refs let records about the same real-world entity correlate across apps and publishers:
Ref.music_track(isrc="USJI19810404") # → @music.track
Ref.code_file(repo="acme/myproject",
path="src/main.rs") # → @code.file
Ref.ops_incident(pagerduty="ABC123") # → @ops.incident
Ref.cal_event(uid="abc@google.com") # → @cal.event
Ref.social_person(email="alice@example.com") # → @social.personEleven categories total — full list in the inline help: help(Ref).
Authenticating to a remote instance
When the Syncropel instance has bearer-token auth enabled, pass the token via api_key:
import os
async with Client(
endpoint="https://your-host.example.com:9100",
identity=Identity.static("did:example:my-app"),
api_key=os.environ["SPL_TOKEN"],
) as client:
...The client sends Authorization: Bearer <token> on every outbound request. See Authentication & Service Accounts for how to mint the token.
Synchronous usage
For scripts and CLI tools without an event loop:
from syncropel import Client, Identity
client = Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:cli-tool"),
)
result = client.emit_sync(
act="KNOW",
kind="ops.deploy.completed",
body={"version": "1.2.3", "duration_s": 42},
thread="ops.deploys",
)
print(result.success, result.record_id)
client.close()emit_sync() uses a persistent httpx.Client across calls, so bulk emits share a TCP connection.
Inference — client.infer()
client.infer() emits an infer.query.v1 INTEND and (by default) polls until the KNOW lands. The full query anatomy lives in the Inference guide; this section is the SDK surface.
from syncropel import Client
client = Client(endpoint="http://localhost:9100")
result = await client.infer({
"input": {"goal": "Summarise this paper in one paragraph"},
"responders": [
{"kind": "llm", "model": "~sonnet"},
{"kind": "llm", "model": "~gpt-4o"},
],
"fold": {"function": "consensus", "min_quorum": 2},
"answer_shape": {"kind": "core.summary.v1"},
"side_effects": {"max_cost_usd": 0.30, "max_latency_secs": 60},
})
print(result.answer)
print(f"Trust: {result.trust_summary['mean']:.2f}")
print(f"Cost: ${result.cost_actual_usd:.4f}")The request dict mirrors the schema field-for-field. See Query Anatomy for every field.
Fire-and-forget
Pass InferOptions(wait=False) to emit the INTEND and return immediately:
from syncropel import InferOptions
pending = await client.infer(request, InferOptions(wait=False))
print(pending.correlation_id, pending.thread_id)You get back InferPending with pending=True, correlation_id, and thread_id. Poll the thread yourself or subscribe for the KNOW.
Options
await client.infer(request, InferOptions(
wait=True, # default
poll_interval_ms=500, # default
poll_timeout_ms=300_000, # default — 5 minutes
thread="th_abc123...", # optional explicit thread
))Result shape
@dataclass
class InferResult:
answer: Any
provenance: list[ProvenanceEntry]
trust_summary: dict # mean, min, max, n_contributors
cost_actual_usd: float
fold_function: str
chosen_response_id: str | None
know_record_id: str
thread_id: strError handling — fail-hard (not fail-open)
Unlike client.emit(), client.infer() fails hard:
- Invalid request shape raises
ValueError/TypeErrorbefore any HTTP call. - Network or instance error on INTEND emission raises
RuntimeError. - Poll timeout raises
TimeoutError.
Catch timeouts explicitly if you want the thread id preserved for manual polling.
Testing without an instance
Use MockKernel to write unit tests against the SDK's interface without running spl serve:
from syncropel import Client, Identity
from syncropel.testing import MockKernel
def test_my_adapter():
kernel = MockKernel()
client = Client(
endpoint="mock://",
identity=Identity.static("did:example:test"),
transport=kernel.transport(),
)
# Call your code-under-test
my_adapter_function(client)
# Assert against what was emitted
records = kernel.records_for_thread("my-thread")
assert len(records) == 1
assert records[0].body["kind"] == "music.catalog.track"What's next
- Records concept — the 8-field envelope and why it's content-addressed
- Threads concept — how records group into workflows
- Authentication & Service Accounts — securing the instance your Python code talks to
- Agent integration guide — use the SDK inside an AI agent
Files & blobs (data plane)
Read and write files programmatically with the TypeScript SDK — the three-step write with compare-and-swap, streaming reads, directory operations, publishing durable artifacts, and material search.
React Components
A constrained palette of 21 React atoms and molecules that render Syncropel projection documents natively, with CSS-variable theming and no provider required.