Architecture
Understanding signals, agents, and reactive workflows
Architecture
Open Harness v0.3.0 is built around a signal-based reactive architecture. Instead of explicitly defining edges between agents, agents declare what signals they react to and what signals they emit. Workflow emerges from signal flow.
Core Principle
State changes emit signals. Agents subscribe to signals. Workflow is emergent.
Core Components
Signals
A signal is an immutable event that flows through the system:
type Signal = {
id: string; // "sig_abc123"
name: string; // "analysis:complete"
payload: unknown; // The data
timestamp: string; // ISO timestamp
source?: {
agent?: string; // Which agent emitted
parent?: string; // Causality chain
};
};Signal Categories:
| Category | Examples | Purpose |
|---|---|---|
| Workflow | workflow:start, workflow:end | Lifecycle events |
| Agent | agent:activated, agent:skipped | Agent execution |
| Harness | harness:start, harness:end, text:delta | LLM interaction |
| State | state:analysis:changed | State mutations |
| Custom | analysis:complete, trade:proposed | Your signals |
Agents
An agent is a reactive unit that subscribes to signals and produces output:
import { createWorkflow } from "@open-harness/core";
const { agent } = createWorkflow<MyState>();
const analyzer = agent({
// The prompt template ({{ state.x }} syntax)
prompt: "Analyze: {{ state.input }}",
// What signals trigger this agent
activateOn: ["workflow:start"],
// What signals this agent emits when done
emits: ["analysis:complete"],
// Optional guard - only activate if true
when: (ctx) => ctx.state.input !== null,
// Automatically update state with output
updates: "result",
});Workflow
A workflow coordinates multiple agents and manages state:
const { agent, runReactive } = createWorkflow<State>();
const result = await runReactive({
agents: { analyzer, reviewer },
state: initialState,
harness: new ClaudeHarness(),
endWhen: (state) => state.complete,
});Harnesses
Harnesses bridge AI SDKs to the signal architecture:
import { ClaudeHarness } from "@open-harness/core";
const harness = new ClaudeHarness({
model: "claude-sonnet-4-20250514",
});
// Harness emits signals during execution:
// harness:start → text:delta → text:delta → harness:endSignal Flow
Here's how signals flow through a typical workflow:
Parallel Execution
When multiple agents subscribe to the same signal, they run in parallel:
// Both react to workflow:start → run concurrently
const riskAnalyzer = agent({
activateOn: ["workflow:start"],
emits: ["risk:assessed"],
});
const costEstimator = agent({
activateOn: ["workflow:start"],
emits: ["cost:estimated"],
});
// Both complete → downstream agents can react
const decider = agent({
activateOn: ["risk:assessed", "cost:estimated"],
// Activates when EITHER signal fires
});State Management
State updates emit signals automatically:
type State = {
analysis: string | null;
trades: Trade[];
};
// When agent updates state.analysis:
// → emits state:analysis:changed
const trader = agent({
activateOn: ["state:analysis:changed"],
// Reacts to state changes
});Recording & Replay
The signal log is the recording:
const result = await runReactive({
// ... agents, state, harness ...
recording: {
mode: "record",
store: new MemorySignalStore(),
name: "my-workflow",
},
});
// Result contains full signal trace
console.log(result.signals);
// [
// { name: "workflow:start", ... },
// { name: "agent:activated", payload: { agent: "analyzer" } },
// { name: "harness:start", ... },
// { name: "text:delta", payload: { content: "The " } },
// ...
// ]Replay without harness calls:
const result = await runReactive({
// ... same config ...
recording: {
mode: "replay",
store,
recordingId: "rec_abc123",
},
});
// Harness signals injected from recordingAPI Summary
| Function | Purpose |
|---|---|
createWorkflow<TState>() | Factory that returns typed agent() and runReactive() |
agent(config) | Define a reactive agent |
runReactive(config) | Execute the workflow |
ClaudeHarness | Claude model adapter |
MemorySignalStore | In-memory signal storage |
Player | VCR-style debugging for recordings |
Result Shape
All executions return:
interface WorkflowResult<TState> {
state: TState; // Final state
signals: Signal[]; // Full signal trace
metrics: {
durationMs: number; // Total execution time
activations: number; // Number of agent activations
};
terminatedEarly: boolean; // endWhen triggered
recordingId?: string; // If recording
}