Concepts
Signal Persistence
Understanding signal stores and recording
Signal Persistence
Open Harness persists signals for recording, replay, and debugging.
Event Sourcing
The system uses event sourcing principles:
State is derived from a sequence of signals.
Instead of storing snapshots, we store the complete signal trace. This enables:
- Replay: Reconstruct any execution deterministically
- Audit: Complete history of all agent activations
- Debug: See exactly what signals fired and when
- Testing: Run tests without API calls
Signal Trace
Every runReactive execution produces a signal trace:
const result = await runReactive({
agents: { analyzer },
state: initialState,
harness,
});
// Full signal trace
console.log(result.signals);
// [
// { name: "workflow:start", timestamp: "...", payload: { state: {...} } },
// { name: "agent:activated", payload: { agent: "analyzer" } },
// { name: "harness:start", payload: { model: "..." } },
// { name: "text:delta", payload: { content: "..." } },
// { name: "harness:end", payload: { ... } },
// { name: "analysis:complete", payload: { ... } },
// { name: "workflow:end", payload: { state: {...}, metrics: {...} } },
// ]Signal Store Interface
All persistence goes through the SignalStore:
interface SignalStore {
// Save a recording
saveRecording(recording: Recording): Promise<void>;
// Load a recording by ID
loadRecording(id: string): Promise<Recording | null>;
// List recordings with optional filters
listRecordings(options?: ListOptions): Promise<RecordingMetadata[]>;
// Delete a recording
deleteRecording(id: string): Promise<void>;
}
interface Recording {
id: string;
name?: string;
tags?: string[];
signals: Signal[];
createdAt: string;
metadata?: {
durationMs: number;
activations: number;
finalState: unknown;
};
}Built-in Stores
MemorySignalStore
In-memory storage for development and testing:
import { MemorySignalStore } from "@open-harness/core";
const store = new MemorySignalStore();- Fast, no setup
- Data lost on restart
- Perfect for tests
Recording Mode
Enable recording during execution:
const result = await runReactive({
agents: { analyzer },
state: initialState,
harness,
recording: {
mode: "record",
store: new MemorySignalStore(),
name: "my-workflow-run",
tags: ["test", "analyzer"],
},
});
console.log("Recording ID:", result.recordingId);Replay Mode
Replay a recorded execution:
const result = await runReactive({
agents: { analyzer },
state: initialState,
harness, // Not called during replay
recording: {
mode: "replay",
store,
recordingId: "rec_abc123",
},
});During replay:
- Harness signals are injected from the recording
- Agents receive the same inputs as the original run
- No API calls are made
- Result is deterministic
VCR-Style Debugging
Use the Player for step-through debugging:
import { Player } from "@open-harness/core";
const recording = await store.loadRecording(recordingId);
const player = new Player(recording);
// Step through signals one by one
while (player.hasNext()) {
const signal = player.next();
console.log(`[${signal.timestamp}] ${signal.name}`);
}
// Jump to a specific signal
player.seekTo("agent:completed");
// Get current position
console.log(`Position: ${player.position}/${player.length}`);Testing Pattern
import { describe, it, expect } from "vitest";
import { toContainSignal, toHaveSignalsInOrder } from "@open-harness/vitest";
// Extend matchers
expect.extend({ toContainSignal, toHaveSignalsInOrder });
describe("Workflow", () => {
it("emits signals in order", async () => {
const result = await runReactive({ /* ... */ });
expect(result.signals).toContainSignal("workflow:start");
expect(result.signals).toContainSignal("analysis:complete");
expect(result.signals).toHaveSignalsInOrder([
"workflow:start",
"agent:activated",
"analysis:complete",
"workflow:end",
]);
});
});