runReactive
Execute signal-based agent workflows
runReactive
The runReactive() function executes signal-based agent workflows.
Basic Usage
import { createWorkflow, ClaudeHarness } from "@open-harness/core";
type State = { input: string; result: string | null };
const { agent, runReactive } = createWorkflow<State>();
const analyzer = agent({
prompt: "Analyze: {{ state.input }}",
activateOn: ["workflow:start"],
emits: ["analysis:complete"],
updates: "result",
});
const result = await runReactive({
agents: { analyzer },
state: { input: "Hello", result: null },
harness: new ClaudeHarness(),
endWhen: (s) => s.result !== null,
});Configuration
interface RunReactiveConfig<TState> {
// Required
agents: Record<string, Agent>;
state: TState;
harness: Harness;
// Optional
endWhen?: (state: TState) => boolean;
recording?: RecordingConfig;
reducers?: Record<string, SignalReducer<TState>>;
timeout?: number;
}agents
Record of named agents to include in the workflow:
agents: {
analyzer,
reviewer,
fixer,
}state
Initial state object. Must match the TState type parameter:
state: {
input: "data to analyze",
analysis: null,
review: null,
}harness
Default harness for agents without per-agent override:
harness: new ClaudeHarness({ model: "claude-sonnet-4-20250514" })endWhen
Termination condition. Workflow ends when this returns true:
endWhen: (state) => state.result !== nullIf not provided, workflow runs until quiescence (no pending signals).
recording
Recording/replay configuration:
recording: {
mode: "record", // "record" | "replay"
store: new MemorySignalStore(),
recordingId?: "rec_123", // Required for replay
name?: "my-workflow", // Recording name
tags?: ["test"], // Recording tags
}reducers
Custom state reducers for complex signal→state updates:
reducers: {
"trade:proposed": (state, signal) => ({
...state,
trades: [...state.trades, signal.payload],
}),
}timeout
Maximum execution time in milliseconds:
timeout: 30000 // 30 secondsReturn Value
interface WorkflowResult<TState> {
state: TState; // Final state
signals: Signal[]; // Full signal trace
metrics: {
durationMs: number; // Execution time
activations: number; // Agent activation count
};
terminatedEarly: boolean; // endWhen triggered
recordingId?: string; // If recording
}state
The final state after all agents have completed:
console.log(result.state.analysis);signals
Complete signal trace for debugging and testing:
result.signals.forEach(s => {
console.log(`[${s.timestamp}] ${s.name}`, s.payload);
});metrics
Execution metrics:
console.log(`Duration: ${result.metrics.durationMs}ms`);
console.log(`Activations: ${result.metrics.activations}`);terminatedEarly
true if endWhen condition triggered termination:
if (result.terminatedEarly) {
console.log("Workflow completed via endWhen condition");
}recordingId
Recording ID if recording was enabled:
if (result.recordingId) {
console.log(`Saved recording: ${result.recordingId}`);
}Examples
Basic Workflow
const result = await runReactive({
agents: { analyzer },
state: { input: "Hello", result: null },
harness: new ClaudeHarness(),
endWhen: (s) => s.result !== null,
});Multi-Agent Workflow
const result = await runReactive({
agents: { planner, coder, reviewer },
state: { task: "Build a TODO app", plan: null, code: null, review: null },
harness: new ClaudeHarness(),
endWhen: (s) => s.review?.passed === true,
});With Recording
const store = new MemorySignalStore();
// Record
const result = await runReactive({
agents: { analyzer },
state: initialState,
harness: new ClaudeHarness(),
recording: { mode: "record", store, name: "test-run" },
});
// Replay
const replay = await runReactive({
agents: { analyzer },
state: initialState,
harness: new ClaudeHarness(), // Not called
recording: { mode: "replay", store, recordingId: result.recordingId! },
});With Custom Reducers
const result = await runReactive({
agents: { analyzer, trader },
state: { analysis: null, trades: [] },
harness,
reducers: {
"trade:proposed": (state, signal) => ({
...state,
trades: [...state.trades, signal.payload.trade],
}),
},
});Error Handling
try {
const result = await runReactive({ /* ... */ });
} catch (error) {
if (error instanceof TimeoutError) {
console.error("Workflow timed out");
} else if (error instanceof HarnessError) {
console.error("Harness failed:", error.message);
}
}