DarkMatter Documentation
Everything you need to add execution lineage, replay, and cryptographic audit trails to your AI agent pipelines. No SDK required. Works with any language, any model, any framework.
Overview
DarkMatter is an execution history layer for multi-agent AI systems. Every agent action becomes a context commit, an immutable node chained to its parent via SHA-256 hash linkage. The chain is independently verifiable and can be replayed, forked, or exported as a compliance artifact.
| Concept | What it does | Endpoint |
|---|---|---|
| Commit | Record what an agent received and produced | POST /api/commit |
| Pull | Inherit verified context from upstream agent | GET /api/pull |
| Replay | Walk the full decision chain root to tip | GET /api/replay/:id |
| Fork | Branch from any checkpoint without touching the original | POST /api/fork/:id |
| Verify | Prove the chain was never tampered with | GET /api/verify/:id |
| Export | Download a portable proof artifact | GET /api/export/:id |
Quickstart
The fastest path from install to first replay:
pip install darkmatter-sdk
darkmatter demo # see it locally — no signup
darkmatter init # create account + agent, write .envbash
You'll see a 3-step chain commit, verify, replay, fork, and diff — locally. Then get your free API key to run it in production.
Single developer (no multi-agent needed)
You don't need multiple agents to use DarkMatter. One developer, one pipeline, full replay of every step. Commit to yourself:
import darkmatter as dm
MY_AGENT = "dm_agent_your_id"
parent_id = None
# Record each step of your pipeline
for step in pipeline_steps:
result = run_step(step)
ctx = dm.commit(
to_agent_id=MY_AGENT,
payload={"input": step, "output": result},
parent_id=parent_id,
)
parent_id = ctx["id"]
# Replay it
replay = dm.replay(parent_id)
for s in replay["replay"]:
print(s["step"], s["payload"]["output"])python
From zero to verified chain
darkmatter demo. No API key needed.pip install darkmatter-sdk. Then run darkmatter demo to see it locally (no signup), then darkmatter init to create your agent and API key.DARKMATTER_API_KEY=dm_sk_...dm.commit() with the input and output. Pass parent_id to link commits into a chain. That's it.dm.replay(ctx_id) walks the chain root→tip. dm.fork(ctx_id) branches without modifying the original. dm.verify(ctx_id) proves the chain is intact.curl (no SDK)
# Agent A commits output to Agent B
curl -X POST https://darkmatterhub.ai/api/commit -H "Authorization: Bearer AGENT_A_KEY" -H "Content-Type: application/json" -d '{
"toAgentId": "dm_agent_b_id",
"payload": {"input": "Analyze Q1", "output": "Revenue up 34%"},
"agent": {"role": "researcher", "model": "claude-opus-4-6"}
}'
# Returns: {"id": "ctx_...", "integrity": {"payload_hash": "sha256:..."}}
# Agent B pulls verified context
curl https://darkmatterhub.ai/api/pull -H "Authorization: Bearer AGENT_B_KEY"
# Replay the full chain
curl https://darkmatterhub.ai/api/replay/ctx_... -H "Authorization: Bearer ANY_KEY"
bash
Authentication
All API requests require a Bearer token.
Authorization: Bearer dm_sk_your_api_key_herehttp
| Property | Detail |
|---|---|
| Format | dm_sk_ prefix followed by hex string |
| Scope | Each key belongs to one agent |
| Rate limit | 120 requests per minute per key |
| Rotation | Rotate anytime from the dashboard without losing history |
Core Concepts
Context commit
An immutable node containing a structured payload, agent attribution, a parent reference, and an integrity hash. Once stored, it cannot be modified. Every commit gets a globally unique ctx_ ID.
Parent linkage
Pass parentId in each commit to link it to the previous step. This builds a lineage graph. Each node stores parent_hash which chains integrity hashes together. Tampering at any node breaks every downstream hash.
Trace ID
Use traceId to group commits into one pipeline run. Pass the same trace ID across all agents in one execution to group them for compliance reports.
Fork
Creates a new branch from any existing checkpoint without modifying the original chain. The fork node carries fork_of, fork_point, and lineage_root. Continue by committing with parentId set to the fork ID.
Integration: LangGraph
LangGraph handles internal graph checkpointing. DarkMatter adds cross-framework lineage and an independent record that lives outside your infrastructure. They're complementary.
One-line wrapper (recommended)
Zero changes to your existing graph. DarkMatterTracer intercepts every node completion and commits automatically:
from darkmatter.integrations.langgraph import DarkMatterTracer
# Your existing compiled app — unchanged
app = workflow.compile()
# One line to add DarkMatter
app = DarkMatterTracer(app, agent_id="MY_AGENT_ID", to_agent_id="MY_AGENT_ID")
# Use exactly as before — every node auto-commits
result = app.invoke({"input": "Write a report"})
print(result)python
Manual pattern (more control)
Add a dm_commit call at the end of each node. Pass the returned ctx_id through state:
import os, requests
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
# Add dm_ctx_id and dm_trace_id to your graph state
class AgentState(TypedDict):
input: str
output: Optional[str]
dm_ctx_id: Optional[str] # carries the last committed ctx ID
dm_trace_id: Optional[str] # groups all nodes into one trace
def dm_commit(payload: dict, to_agent_id: str, parent_id: str = None, trace_id: str = None, role: str = None) -> str:
body = {
"toAgentId": to_agent_id,
"payload": payload,
"agent": {"role": role} if role else {},
}
if parent_id: body["parentId"] = parent_id
if trace_id: body["traceId"] = trace_id
return requests.post(
f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5
).json()["id"]
def researcher_node(state: AgentState) -> AgentState:
output = llm.invoke(state["input"])
ctx_id = dm_commit(
payload = {"input": state["input"], "output": output.content},
to_agent_id= WRITER_AGENT_ID,
parent_id = state.get("dm_ctx_id"),
trace_id = state.get("dm_trace_id"),
role = "researcher"
)
return {"output": output.content, "dm_ctx_id": ctx_id}
def writer_node(state: AgentState) -> AgentState:
draft = writer_llm.invoke(state["output"])
ctx_id = dm_commit(
payload = {"input": state["output"], "output": draft.content},
to_agent_id= REVIEWER_AGENT_ID,
parent_id = state["dm_ctx_id"], # chains to researcher
trace_id = state.get("dm_trace_id"),
role = "writer"
)
return {"output": draft.content, "dm_ctx_id": ctx_id}
# Build and run
import uuid
graph = StateGraph(AgentState)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_edge("researcher", "writer")
graph.add_edge("writer", END)
app = graph.compile()
result = app.invoke({
"input": "Analyze Q1 earnings",
"dm_trace_id": f"trc_{uuid.uuid4().hex[:12]}"
})
python
Integration: LangChain
Use the DarkMatterCallbackHandler pattern to automatically commit context after any LangChain chain or LLM call without modifying your chain logic.
import os, requests
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterCallbackHandler(BaseCallbackHandler):
def __init__(self, agent_id: str, to_agent_id: str, trace_id: str = None, role: str = None):
self.agent_id = agent_id
self.to_agent_id = to_agent_id
self.trace_id = trace_id
self.role = role
self.last_input = None
self.last_ctx_id = None
def on_llm_start(self, serialized, prompts, **kwargs):
self.last_input = prompts[0] if prompts else None
def on_llm_end(self, response: LLMResult, **kwargs):
output = response.generations[0][0].text
body = {
"toAgentId": self.to_agent_id,
"payload": {"input": self.last_input, "output": output},
"agent": {"role": self.role, "provider": "anthropic"},
}
if self.last_ctx_id: body["parentId"] = self.last_ctx_id
if self.trace_id: body["traceId"] = self.trace_id
result = requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5).json()
self.last_ctx_id = result.get("id")
# Use with any LangChain chain
dm_handler = DarkMatterCallbackHandler(
agent_id = AGENT_A_ID,
to_agent_id = AGENT_B_ID,
trace_id = "trc_my_run_001",
role = "analyst"
)
llm = ChatAnthropic(model="claude-opus-4-6", callbacks=[dm_handler])
prompt = ChatPromptTemplate.from_template("Analyze: {topic}")
chain = prompt | llm
result = chain.invoke({"topic": "Q1 earnings"})
# Context Passport automatically committed after LLM responds
python
Integration: CrewAI
Commit after each task using CrewAI's output_json or by wrapping the task execution. Pass dm_ctx_id between tasks to build the lineage chain.
import os, requests, uuid
from crewai import Agent, Task, Crew
from langchain_anthropic import ChatAnthropic
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
TRACE = f"trc_{uuid.uuid4().hex[:12]}"
def dm_commit(task_desc: str, output: str, to_id: str, parent_id: str = None, role: str = None) -> str:
body = {
"toAgentId": to_id,
"payload": {"input": task_desc, "output": output},
"agent": {"role": role},
"traceId": TRACE,
}
if parent_id: body["parentId"] = parent_id
return requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5).json()["id"]
llm = ChatAnthropic(model="claude-opus-4-6")
researcher = Agent(role="Researcher", goal="Find key facts", llm=llm)
writer = Agent(role="Writer", goal="Write the report", llm=llm)
task1 = Task(description="Research Q1 earnings trends", agent=researcher)
task2 = Task(description="Write a summary report", agent=writer)
crew = Crew(agents=[researcher, writer], tasks=[task1, task2])
result = crew.kickoff()
# Commit after each task output
ctx1 = dm_commit(task1.description, result.tasks_output[0].raw, WRITER_AGENT_ID, role="researcher")
ctx2 = dm_commit(task2.description, result.tasks_output[1].raw, REVIEWER_AGENT_ID, parent_id=ctx1, role="writer")
python
Integration: Anthropic SDK
Wrap your existing Anthropic client in one line. Every messages.create() call auto-commits:
import anthropic
from darkmatter.integrations.anthropic import dm_client
# Wrap once — use everywhere
client = dm_client(
anthropic.Anthropic(),
agent_id="agent-01",
to_agent_id="agent-01",
)
# Identical interface to anthropic.Anthropic()
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Analyze Q1 earnings"}]
)
print(client.last_ctx_id) # ctx_1234567890_abc123python
Manual pattern
Wrap the Anthropic client so every messages.create() call automatically produces a Context Passport. One line to wrap, zero changes to the rest of your code.
import os, requests
import anthropic
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterAnthropicClient:
"""Drop-in wrapper for anthropic.Anthropic() that commits a Context Passport after each call."""
def __init__(self, anthropic_client, dm_api_key: str, agent_id: str, to_agent_id: str,
trace_id: str = None, role: str = None):
self._client = anthropic_client
self.dm_key = dm_api_key
self.agent_id = agent_id
self.to_agent_id = to_agent_id
self.trace_id = trace_id
self.role = role
self.last_ctx_id = None
def messages_create(self, **kwargs) -> anthropic.types.Message:
response = self._client.messages.create(**kwargs)
output = response.content[0].text
# Extract input from messages kwarg
messages = kwargs.get("messages", [])
inp = messages[-1]["content"] if messages else None
body = {
"toAgentId": self.to_agent_id,
"payload": {"input": inp, "output": output},
"agent": {
"provider": "anthropic",
"model": kwargs.get("model"),
"role": self.role,
},
}
if self.last_ctx_id: body["parentId"] = self.last_ctx_id
if self.trace_id: body["traceId"] = self.trace_id
result = requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5).json()
self.last_ctx_id = result.get("id")
return response
# Usage — one line to wrap
dm_client = DarkMatterAnthropicClient(
anthropic_client = anthropic.Anthropic(),
dm_api_key = DM_KEY,
agent_id = RESEARCHER_ID,
to_agent_id = WRITER_ID,
trace_id = "trc_q1_analysis",
role = "researcher"
)
# Exact same call as normal Anthropic SDK — Context Passport committed automatically
response = dm_client.messages_create(
model = "claude-opus-4-6",
max_tokens= 1024,
messages = [{"role": "user", "content": "Analyze Q1 earnings"}]
)
python
Integration: OpenAI SDK
Same wrapper pattern for OpenAI. Works with GPT-4o, o1, o3, and any OpenAI-compatible endpoint.
import os, requests
from openai import OpenAI
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterOpenAIClient:
def __init__(self, openai_client, dm_api_key: str, agent_id: str, to_agent_id: str,
trace_id: str = None, role: str = None):
self._client = openai_client
self.dm_key = dm_api_key
self.to_agent_id = to_agent_id
self.trace_id = trace_id
self.role = role
self.last_ctx_id = None
def chat_completions_create(self, **kwargs):
response = self._client.chat.completions.create(**kwargs)
output = response.choices[0].message.content
messages = kwargs.get("messages", [])
inp = messages[-1]["content"] if messages else None
body = {
"toAgentId": self.to_agent_id,
"payload": {"input": inp, "output": output},
"agent": {"provider": "openai", "model": kwargs.get("model"), "role": self.role},
}
if self.last_ctx_id: body["parentId"] = self.last_ctx_id
if self.trace_id: body["traceId"] = self.trace_id
result = requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5).json()
self.last_ctx_id = result.get("id")
return response
# Usage
dm_client = DarkMatterOpenAIClient(
openai_client = OpenAI(),
dm_api_key = DM_KEY,
agent_id = AGENT_ID,
to_agent_id = NEXT_AGENT_ID,
trace_id = "trc_pipeline_001",
role = "analyst"
)
response = dm_client.chat_completions_create(
model = "gpt-4o",
messages = [{"role": "user", "content": "Summarize the findings"}]
)
python
Integration: Local models (Ollama, llama.cpp, vLLM)
Works with any local inference server. Use the same wrapper pattern — commit after each generation.
import os, requests as req
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
def run_ollama_and_commit(prompt: str, model: str, to_agent_id: str,
parent_id: str = None, trace_id: str = None) -> tuple[str, str]:
# Run local model via Ollama
response = req.post("http://localhost:11434/api/generate",
json={"model": model, "prompt": prompt, "stream": False}).json()["response"]
# Commit Context Passport
body = {
"toAgentId": to_agent_id,
"payload": {
"input": prompt,
"output": response,
"memory": {"model": model, "provider": "local"}
},
"agent": {"provider": "local", "model": model},
}
if parent_id: body["parentId"] = parent_id
if trace_id: body["traceId"] = trace_id
ctx_id = req.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {DM_KEY}"},
json=body, timeout=5).json()["id"]
return response, ctx_id
# vLLM — same pattern, different endpoint
def run_vllm_and_commit(prompt, model, to_agent_id, parent_id=None, trace_id=None):
response = req.post("http://localhost:8000/v1/completions",
json={"model": model, "prompt": prompt, "max_tokens": 512}
).json()["choices"][0]["text"]
# ... same commit pattern as above
python
Integration: AWS Bedrock
Wrap the Bedrock runtime client to commit a Context Passport after each agent invocation. Works with Bedrock Agents, Bedrock Flows, and direct model invocations.
import os, json, boto3, requests
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterBedrockClient:
def __init__(self, region: str, dm_api_key: str, agent_id: str, to_agent_id: str,
trace_id: str = None, role: str = None):
self._client = boto3.client("bedrock-runtime", region_name=region)
self.dm_key = dm_api_key
self.to_agent_id = to_agent_id
self.trace_id = trace_id
self.role = role
self.last_ctx_id = None
def invoke_model(self, model_id: str, prompt: str, **kwargs) -> dict:
# Invoke Bedrock model
body = json.dumps({"prompt": prompt, "max_tokens": kwargs.get("max_tokens", 1024)})
response = self._client.invoke_model(modelId=model_id, body=body)
result = json.loads(response["body"].read())
output = result.get("completion") or result.get("content", [{}])[0].get("text", "")
# Commit Context Passport
commit_body = {
"toAgentId": self.to_agent_id,
"payload": {"input": prompt, "output": output},
"agent": {"provider": "aws_bedrock", "model": model_id, "role": self.role},
}
if self.last_ctx_id: commit_body["parentId"] = self.last_ctx_id
if self.trace_id: commit_body["traceId"] = self.trace_id
r = requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {self.dm_key}"},
json=commit_body, timeout=5).json()
self.last_ctx_id = r.get("id")
return result
# Usage
bedrock = DarkMatterBedrockClient(
region = "us-east-1",
dm_api_key = DM_KEY,
agent_id = AGENT_ID,
to_agent_id = NEXT_AGENT_ID,
trace_id = "trc_bedrock_001",
role = "processor"
)
result = bedrock.invoke_model(
model_id = "anthropic.claude-opus-4-6-v1:0",
prompt = "Analyze this contract for risk clauses"
)
# Context Passport committed — retrieve with GET /api/replay/{ctx_id}
python
Integration: Google ADK
Google Agent Development Kit uses an event-based model. Commit a Context Passport in your agent's response handler.
import os, requests
from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterADKAgent(LlmAgent):
def __init__(self, *args, dm_api_key: str, agent_id: str, to_agent_id: str,
trace_id: str = None, **kwargs):
super().__init__(*args, **kwargs)
self.dm_key = dm_api_key
self.agent_id = agent_id
self.to_agent_id = to_agent_id
self.trace_id = trace_id
self.last_ctx_id = None
async def _run_async_impl(self, ctx):
async for event in super()._run_async_impl(ctx):
yield event
# Commit after final response
if event.is_final_response() and event.content:
output = " ".join(p.text for p in event.content.parts if hasattr(p, "text"))
body = {
"toAgentId": self.to_agent_id,
"payload": {"input": str(ctx.user_content), "output": output},
"agent": {"provider": "google", "model": self.model, "role": self.name},
}
if self.last_ctx_id: body["parentId"] = self.last_ctx_id
if self.trace_id: body["traceId"] = self.trace_id
r = requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {self.dm_key}"},
json=body, timeout=5).json()
self.last_ctx_id = r.get("id")
# Usage
agent = DarkMatterADKAgent(
name = "research_agent",
model = "gemini-2.0-flash",
dm_api_key = DM_KEY,
agent_id = AGENT_ID,
to_agent_id = NEXT_AGENT_ID,
trace_id = "trc_adk_run_001"
)
python
Integration: OpenTelemetry
Export DarkMatter commits as OTEL spans so they flow into Datadog, Grafana, Honeycomb, or any OTEL-compatible backend. Alternatively, receive OTEL spans from your pipeline and convert them to Context Passports.
import os, requests
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
DM_URL = "https://darkmatterhub.ai"
DM_KEY = os.environ["DARKMATTER_API_KEY"]
class DarkMatterSpanExporter(SpanExporter):
"""Converts OTEL spans to Context Passports and commits them to DarkMatter."""
def __init__(self, dm_api_key: str, agent_id: str, to_agent_id: str, trace_id: str = None):
self.dm_key = dm_api_key
self.agent_id = agent_id
self.to_agent_id = to_agent_id
self.trace_id = trace_id
def export(self, spans: list[ReadableSpan]) -> SpanExportResult:
for span in spans:
attrs = dict(span.attributes or {})
inp = attrs.get("llm.input") or attrs.get("gen_ai.prompt")
output = attrs.get("llm.output") or attrs.get("gen_ai.completion")
model = attrs.get("llm.model_name") or attrs.get("gen_ai.response.model")
if not (inp or output):
continue
body = {
"toAgentId": self.to_agent_id,
"payload": {"input": inp, "output": output},
"agent": {"model": model, "role": span.name},
"eventType": "commit",
}
if self.trace_id: body["traceId"] = self.trace_id
requests.post(f"{DM_URL}/api/commit",
headers={"Authorization": f"Bearer {self.dm_key}"},
json=body, timeout=5)
return SpanExportResult.SUCCESS
def shutdown(self): pass
# Register with OTEL tracer provider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(DarkMatterSpanExporter(
dm_api_key = DM_KEY,
agent_id = AGENT_ID,
to_agent_id = NEXT_AGENT_ID,
trace_id = "trc_otel_pipeline"
))
)
# All OTEL spans now automatically produce Context Passports in DarkMatter
python
gen_ai.prompt to payload.input, gen_ai.completion to payload.output, and gen_ai.response.model to agent.model. Custom attributes in llm.* namespace are also supported.POST /api/commit
Commit agent context. Returns a canonical v2 context object.
| Field | Type | Required | Description |
|---|---|---|---|
toAgentId | string | required | Recipient agent ID |
payload.input | string/object | optional | What the agent received |
payload.output | string/object | optional | What the agent produced |
payload.memory | object | optional | Durable working state |
parentId | string | optional | Parent ctx ID, chains commits into lineage |
traceId | string | optional | Groups commits into a run |
eventType | string | optional | Defaults to commit |
agent.role | string | optional | researcher, writer, reviewer, etc. |
agent.provider | string | optional | anthropic, openai, local |
agent.model | string | optional | Model name |
GET /api/pull
Returns all verified contexts addressed to the authenticated agent, newest first.
GET /api/replay/:ctxId
Walks the parent chain from ctxId back to root, reverses to chronological order, returns each step with full payload and integrity verification. Add ?mode=summary for metadata only.
POST /api/fork/:ctxId
Branch from any checkpoint. Returns new ctx ID with fork_of, fork_point, lineage_root. Original chain never modified. Continue by committing with parentId set to returned ID.
curl -X POST https://darkmatterhub.ai/api/fork/ctx_... -H "Authorization: Bearer YOUR_KEY" -d '{"branchKey": "experiment-1"}'
# Response: id (use as parentId for next commit), fork_of, fork_point, lineage_root
bash
GET /api/verify/:ctxId
Returns a standalone trust object for the chain.
| Field | Description |
|---|---|
chain_intact | Boolean, true if all hashes verify |
length | Number of commits in chain |
root_hash | sha256 hash of root commit |
tip_hash | sha256 hash of tip commit |
broken_at | ctx ID where chain first breaks, or null |
fork_points | Array of ctx IDs where forks occurred |
verified_at | Timestamp of this verification |
GET /api/export/:ctxId
Downloads a portable JSON proof artifact. chain_hash is deterministic across exports of the same unchanged chain. export_hash is unique per export instance.
GET /api/lineage/:ctxId
Returns ancestor chain metadata (no payloads) from tip to root. Faster than replay for visualization.
GET /api/me
Returns the identity of the authenticated agent.
POST /api/agents/register
Allows an authenticated agent to programmatically spawn a new agent without requiring a dashboard login. Useful for pipelines that create agents dynamically at runtime.
Cap: 10 new agents per account per day. This prevents runaway agent spawning. The limit resets at midnight UTC.
# Spawn a new agent from an existing agent's API key
curl -X POST https://darkmatterhub.ai/api/agents/register \
-H "Authorization: Bearer EXISTING_AGENT_KEY" \
-H "Content-Type: application/json" \
-d '{
"agentName": "summarizer-v2",
"role": "summarizer",
"provider": "anthropic",
"model": "claude-opus-4-6"
}'
# Returns: { agentId, agentName, apiKey, spawnedBy, createdAt }
# Store the returned apiKey — it will not be shown again
bash
Context Passport
The Context Passport is DarkMatter's proposed open standard for AI agent context handoffs. It is a structured, model-agnostic schema that any LLM or agent framework can produce and consume — making agent handoffs interoperable, verifiable, and auditable across systems.
The goal is for schema_version: "1.0" to mean something to any developer building multi-agent systems, the same way Content-Type: application/json means something to any HTTP client. Every context commit in DarkMatter is a Context Passport.
Why a standard schema matters
When Agent A hands work to Agent B today, the format is arbitrary — a string, a dict, or a JSON blob with no fixed shape. Agent B has to guess where the output is, who produced it, and whether it was tampered with. The Context Passport answers all of those questions in a consistent structure, regardless of which model or framework is on either end of the handoff.
This also directly addresses the schema drift problem: because every commit uses the same payload.output field for structured outputs, Agent B always knows exactly where to look for what Agent A produced. Commit structured JSON as your output — not a stringified version — and the schema is preserved and inspectable at every step in the chain.
Full schema (v1.0)
{
"$schema": "https://darkmatterhub.ai/schema/context-passport/v1.json",
"schema_version": "1.0",
"id": "ctx_{timestamp}_{hex}",
"parent_id": "ctx_... | null",
"trace_id": "trc_... | null",
"branch_key": "main",
"created_by": {
"agent_id": "dm_...",
"agent_name": "research-agent",
"role": "researcher",
"provider": "anthropic | openai | google | mistral | local",
"model": "claude-opus-4-6"
},
"event": {
"type": "commit | fork | checkpoint | audit | ...",
"to_agent_id": "dm_... | null",
"timestamp": "ISO 8601"
},
"payload": {
"input": "string or structured object",
"output": "string or structured object",
"memory": {},
"variables": {}
},
"integrity": {
"payload_hash": "sha256:hex",
"parent_hash": "sha256:hex | null",
"integrity_hash": "sha256:hex",
"verification_status": "valid | broken | unverified"
},
"lineage": {
"fork_of": "ctx_... | null",
"fork_point": "ctx_... | null",
"lineage_root": "ctx_... | null"
},
"created_at": "ISO 8601"
}
json
Payload field guidance
Use structured JSON for payload.output wherever possible. This preserves the schema across the chain and makes replay and inspection meaningful:
# Good — structured output is inspectable at every step
"payload": {
"input": "Analyze Q1 earnings",
"output": {
"summary": "APAC up 34%, margin compressed",
"confidence": 0.94,
"sources": ["Q1_report.pdf", "earnings_call.txt"]
}
}
json
Context object (v2)
{
"id": "ctx_1774358291224_bad9b976",
"schema_version": "1.0",
"parent_id": "ctx_previous_or_null",
"trace_id": "trc_...",
"branch_key": "main",
"created_by": {
"agent_id": "dm_...", "agent_name": "research-agent",
"role": "researcher", "provider": "anthropic", "model": "claude-opus-4-6"
},
"event": {"type": "commit", "to_agent_id": "dm_..."},
"payload": {"input": "...", "output": "...", "memory": {}, "variables": {}},
"integrity": {
"payload_hash": "sha256:e7733904...",
"parent_hash": "sha256:f4a2b1c3...",
"verification_status": "valid"
},
"created_at": "2026-03-24T13:18:11Z"
}
json
Event types
| Type | Category | Use when |
|---|---|---|
commit | Developer | Standard agent handoff |
fork | Developer | Branching from a checkpoint |
checkpoint | Developer | Mid-task save point |
revert | Developer | Rollback to previous state |
retry | Developer | Retrying a failed step |
error | Developer | Recording a failure |
spawn | Developer | Creating a child agent |
merge | Developer | Combining branches |
override | Compliance | Human overrode agent decision |
consent | Compliance | User consent recorded |
escalate | Compliance | Escalated to human review |
redact | Compliance | Sensitive data redacted |
audit | Compliance | Explicit audit event |
Integrity model
Each commit computes two hashes:
- payload_hash = sha256(normalized JSON payload)
- integrity_hash = sha256(payload_hash + parent integrity_hash). For root commits, parent integrity_hash is the string
"root".
Modifying any commit breaks every downstream hash. Use GET /api/verify to check chain_intact and find the exact break point.
Query and search
Search your execution history by model, provider, event type, trace ID, or full-text across payloads. Free on all plans.
import darkmatter as dm
# Find all checkpoints from Claude on a specific trace
results = dm.search(model="claude-opus-4-6", event="checkpoint")
for ctx in results["results"]:
print(ctx["id"], ctx["payload"]["output"])
# Full-text search across payloads
results = dm.search(q="APAC earnings", limit=20)
# Filter by trace ID and date range
results = dm.search(
trace_id="trc_abc123",
from_date="2026-03-01",
to_date="2026-03-31",
)python
| Parameter | Description |
|---|---|
q | Full-text search across payload JSON |
model | Filter by model name |
provider | Filter by provider (anthropic, openai, local) |
event | Filter by event type (commit, checkpoint, fork…) |
traceId | Filter by trace ID (group of commits from one run) |
from / to | ISO date range |
limit | Max results (default 50, max 200) |
Chain diff
Compare two execution chains step by step. Use this to compare two pipeline runs, an original vs a fork, or the same pipeline run with different models.
import darkmatter as dm
# Compare original run vs fork
d = dm.diff(original_ctx_id, fork_ctx_id)
print(f"{d['changedSteps']} steps changed")
print(f"Models A: {d['summary']['modelsA']}")
print(f"Models B: {d['summary']['modelsB']}")
for step in d["steps"]:
if step["diff"]["modelChanged"]:
print(f"Step {step['step']}: model changed")
print(f" {step['a']['model']} → {step['b']['model']}")
if step["diff"]["payloadChanged"]:
print(f"Step {step['step']}: payload changed")python
See also: fork a checkpoint, then diff to compare the branches.
BYOK encryption
Payloads encrypted with AES-256-GCM using a key you provide. DarkMatter stores only ciphertext and never has access to your content. Available on all plans including Free.
# Generate a secure AES-256 key
python3 -c "import secrets; print(secrets.token_hex(32))"
# Encrypted commit
curl -X POST https://darkmatterhub.ai/enterprise/commit -H "Authorization: Bearer YOUR_KEY" -d '{"toAgentId": "dm_...", "payload": {...}, "byokKey": "your-64-hex-key"}'
# Decrypt
curl -X POST https://darkmatterhub.ai/enterprise/decrypt/ctx_... -H "Authorization: Bearer YOUR_KEY" -d '{"byokKey": "your-64-hex-key"}'
bash
W3C DID agent identity
Register a W3C Decentralized Identifier for any agent. Commits carry a verifiable DID independently resolvable without trusting DarkMatter.
# Register DID (uses dashboard JWT auth)
curl -X POST https://darkmatterhub.ai/enterprise/did/register -H "Authorization: Bearer DASHBOARD_JWT" -d '{"agentId": "dm_...", "didId": "did:web:yourco.com:agents:researcher", "publicKey": "z6Mk..."}'
# Resolve DID (public, no auth)
curl https://darkmatterhub.ai/enterprise/did/AGENT_ID
bash
Enterprise: Compliance reports
Generate a structured compliance report for an entire trace. Pass the same traceId to all commits in one pipeline run, then export.
curl https://darkmatterhub.ai/enterprise/report/trc_... -H "Authorization: Bearer YOUR_KEY" -o compliance_report.json
bash
The report includes: agent registry, full audit trail with per-step integrity verification, timestamp range, chain intact status, and a regulatory note covering regulations such as EU AI Act Art. 12 and 19.
Self-hosting
git clone https://github.com/darkmatter-hub/darkmatter then npm install.env.example to .env. Fill in SUPABASE_URL, SUPABASE_KEY, SUPABASE_SERVICE_KEY, RESEND_API_KEY.supabase/schema.sql into your Supabase SQL Editor and run it.PORT=3000 and env vars. Auto-deploys on git push.Guide: Three-agent demo
The canonical DarkMatter demo shows a Claude to GPT-4o to Claude pipeline with a fork mid-chain. See it live at darkmatterhub.ai/demo with no login required. The full Python script is in examples/three-agent-demo/demo.py in the GitHub repo.
Guide: Forking workflows
Fork from any checkpoint to try a different approach without destroying the original chain. Both branches are independently replayable.
FORK=$(curl -s -X POST https://darkmatterhub.ai/api/fork/$CHECKPOINT_ID -H "Authorization: Bearer YOUR_KEY" -d '{"branchKey": "alt-approach"}')
FORK_ID=$(echo $FORK | python3 -c "import sys,json;print(json.load(sys.stdin)['id'])")
# Continue on fork branch
curl -X POST https://darkmatterhub.ai/api/commit -H "Authorization: Bearer YOUR_KEY" -d "{\"toAgentId\": \"dm_...\", \"parentId\": \"$FORK_ID\", \"payload\": {}}"
bash
Guide: Audit trail guide
Use traceId to group all commits in one pipeline run, then export a compliance report covering the full execution.
import uuid, requests
TRACE_ID = f"trc_{uuid.uuid4().hex[:12]}"
def commit(dm_key, to_agent_id, payload, parent_id=None):
return requests.post("https://darkmatterhub.ai/api/commit",
headers={"Authorization": f"Bearer {dm_key}"},
json={"toAgentId": to_agent_id, "payload": payload,
"traceId": TRACE_ID, "parentId": parent_id}
).json()["id"]
# After pipeline completes:
# GET /enterprise/report/{TRACE_ID} (Enterprise plan)
python