Implement LaneEvent schema extensions for event ordering, provenance, and dedupe (US-002)

Adds comprehensive metadata support to LaneEvent for the canonical lane event schema:

- EventProvenance enum: live_lane, test, healthcheck, replay, transport
- SessionIdentity: title, workspace, purpose, with placeholder support
- LaneOwnership: owner, workflow_scope, watcher_action (Act/Observe/Ignore)
- LaneEventMetadata: seq, provenance, session_identity, ownership, nudge_id,
  event_fingerprint, timestamp_ms
- LaneEventBuilder: fluent API for constructing events with full metadata
- is_terminal_event(): detects Finished, Failed, Superseded, Closed, Merged
- compute_event_fingerprint(): deterministic fingerprint for terminal events
- dedupe_terminal_events(): suppresses duplicate terminal events by fingerprint

Provides machine-readable event provenance, session identity at creation,
monotonic sequence ordering, nudge deduplication, and terminal event suppression.

Adds 10 regression tests covering:
- Monotonic sequence ordering
- Provenance serialization round-trip
- Session identity completeness
- Ownership and workflow scope binding
- Watcher action variants
- Terminal event detection
- Fingerprint determinism and uniqueness
- Terminal event deduplication
- Builder construction with metadata
- Metadata serialization round-trip

Closes Phase 2 (partial) from ROADMAP.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Yeachan-Heo
2026-04-16 09:12:31 +00:00
parent 21909da0b5
commit 77fb62a9f1
2 changed files with 542 additions and 5 deletions

View File

@@ -1,4 +1,4 @@
#![allow(clippy::similar_names)] #![allow(clippy::similar_names, clippy::cast_possible_truncation)]
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@@ -73,6 +73,316 @@ pub enum LaneFailureClass {
Infra, Infra,
} }
/// Provenance labels for event source classification.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventProvenance {
/// Event from a live, active lane
LiveLane,
/// Event from a synthetic test
Test,
/// Event from a healthcheck probe
Healthcheck,
/// Event from a replay/log replay
Replay,
/// Event from the transport layer itself
Transport,
}
/// Session identity metadata captured at creation time.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionIdentity {
/// Stable title for the session
pub title: String,
/// Workspace/worktree path
pub workspace: String,
/// Lane/session purpose
pub purpose: String,
/// Placeholder reason if any field is unknown
#[serde(skip_serializing_if = "Option::is_none")]
pub placeholder_reason: Option<String>,
}
impl SessionIdentity {
/// Create complete session identity
#[must_use]
pub fn new(
title: impl Into<String>,
workspace: impl Into<String>,
purpose: impl Into<String>,
) -> Self {
Self {
title: title.into(),
workspace: workspace.into(),
purpose: purpose.into(),
placeholder_reason: None,
}
}
/// Create session identity with placeholder for missing fields
#[must_use]
pub fn with_placeholder(
title: impl Into<String>,
workspace: impl Into<String>,
purpose: impl Into<String>,
reason: impl Into<String>,
) -> Self {
Self {
title: title.into(),
workspace: workspace.into(),
purpose: purpose.into(),
placeholder_reason: Some(reason.into()),
}
}
}
/// Lane ownership and workflow scope binding.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneOwnership {
/// Owner/assignee identity
pub owner: String,
/// Workflow scope (e.g., claw-code-dogfood, external-git-maintenance)
pub workflow_scope: String,
/// Whether the watcher is expected to act, observe, or ignore
pub watcher_action: WatcherAction,
}
/// Watcher action expectation for a lane event.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WatcherAction {
/// Watcher should take action on this event
Act,
/// Watcher should only observe
Observe,
/// Watcher should ignore this event
Ignore,
}
/// Event metadata for ordering, provenance, deduplication, and ownership.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneEventMetadata {
/// Monotonic sequence number for event ordering
pub seq: u64,
/// Event provenance source
pub provenance: EventProvenance,
/// Session identity at creation
#[serde(skip_serializing_if = "Option::is_none")]
pub session_identity: Option<SessionIdentity>,
/// Lane ownership and scope
#[serde(skip_serializing_if = "Option::is_none")]
pub ownership: Option<LaneOwnership>,
/// Nudge ID for deduplication cycles
#[serde(skip_serializing_if = "Option::is_none")]
pub nudge_id: Option<String>,
/// Event fingerprint for terminal event deduplication
#[serde(skip_serializing_if = "Option::is_none")]
pub event_fingerprint: Option<String>,
/// Timestamp when event was observed/created
pub timestamp_ms: u64,
}
impl LaneEventMetadata {
/// Create new event metadata
#[must_use]
pub fn new(seq: u64, provenance: EventProvenance) -> Self {
Self {
seq,
provenance,
session_identity: None,
ownership: None,
nudge_id: None,
event_fingerprint: None,
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
}
}
/// Add session identity
#[must_use]
pub fn with_session_identity(mut self, identity: SessionIdentity) -> Self {
self.session_identity = Some(identity);
self
}
/// Add ownership info
#[must_use]
pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self {
self.ownership = Some(ownership);
self
}
/// Add nudge ID for dedupe
#[must_use]
pub fn with_nudge_id(mut self, nudge_id: impl Into<String>) -> Self {
self.nudge_id = Some(nudge_id.into());
self
}
/// Compute and add event fingerprint for terminal events
#[must_use]
pub fn with_fingerprint(mut self, fingerprint: impl Into<String>) -> Self {
self.event_fingerprint = Some(fingerprint.into());
self
}
}
/// Builder for constructing [`LaneEvent`]s with proper metadata.
#[derive(Debug, Clone)]
pub struct LaneEventBuilder {
event: LaneEventName,
status: LaneEventStatus,
emitted_at: String,
metadata: LaneEventMetadata,
detail: Option<String>,
failure_class: Option<LaneFailureClass>,
data: Option<serde_json::Value>,
}
impl LaneEventBuilder {
/// Start building a new lane event
#[must_use]
pub fn new(
event: LaneEventName,
status: LaneEventStatus,
emitted_at: impl Into<String>,
seq: u64,
provenance: EventProvenance,
) -> Self {
Self {
event,
status,
emitted_at: emitted_at.into(),
metadata: LaneEventMetadata::new(seq, provenance),
detail: None,
failure_class: None,
data: None,
}
}
/// Add session identity
#[must_use]
pub fn with_session_identity(mut self, identity: SessionIdentity) -> Self {
self.metadata = self.metadata.with_session_identity(identity);
self
}
/// Add ownership info
#[must_use]
pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self {
self.metadata = self.metadata.with_ownership(ownership);
self
}
/// Add nudge ID
#[must_use]
pub fn with_nudge_id(mut self, nudge_id: impl Into<String>) -> Self {
self.metadata = self.metadata.with_nudge_id(nudge_id);
self
}
/// Add detail
#[must_use]
pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
self.detail = Some(detail.into());
self
}
/// Add failure class
#[must_use]
pub fn with_failure_class(mut self, failure_class: LaneFailureClass) -> Self {
self.failure_class = Some(failure_class);
self
}
/// Add data payload
#[must_use]
pub fn with_data(mut self, data: serde_json::Value) -> Self {
self.data = Some(data);
self
}
/// Compute fingerprint and build terminal event
#[must_use]
pub fn build_terminal(mut self) -> LaneEvent {
let fingerprint = compute_event_fingerprint(&self.event, &self.status, self.data.as_ref());
self.metadata = self.metadata.with_fingerprint(fingerprint);
self.build()
}
/// Build the event
#[must_use]
pub fn build(self) -> LaneEvent {
LaneEvent {
event: self.event,
status: self.status,
emitted_at: self.emitted_at,
failure_class: self.failure_class,
detail: self.detail,
data: self.data,
metadata: self.metadata,
}
}
}
/// Check if an event kind is terminal (completed, failed, superseded, closed).
#[must_use]
pub fn is_terminal_event(event: LaneEventName) -> bool {
matches!(
event,
LaneEventName::Finished
| LaneEventName::Failed
| LaneEventName::Superseded
| LaneEventName::Closed
| LaneEventName::Merged
)
}
/// Compute a fingerprint for terminal event deduplication.
#[must_use]
pub fn compute_event_fingerprint(
event: &LaneEventName,
status: &LaneEventStatus,
data: Option<&serde_json::Value>,
) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
format!("{event:?}").hash(&mut hasher);
format!("{status:?}").hash(&mut hasher);
if let Some(d) = data {
serde_json::to_string(d)
.unwrap_or_default()
.hash(&mut hasher);
}
format!("{:016x}", hasher.finish())
}
/// Deduplicate terminal events within a reconciliation window.
/// Returns only the first occurrence of each terminal fingerprint.
#[must_use]
pub fn dedupe_terminal_events(events: &[LaneEvent]) -> Vec<LaneEvent> {
let mut seen_fingerprints = std::collections::HashSet::new();
let mut result = Vec::new();
for event in events {
if is_terminal_event(event.event) {
if let Some(fp) = &event.metadata.event_fingerprint {
if seen_fingerprints.contains(fp) {
continue; // Skip duplicate terminal event
}
seen_fingerprints.insert(fp.clone());
}
}
result.push(event.clone());
}
result
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneEventBlocker { pub struct LaneEventBlocker {
#[serde(rename = "failureClass")] #[serde(rename = "failureClass")]
@@ -106,9 +416,13 @@ pub struct LaneEvent {
pub detail: Option<String>, pub detail: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>, pub data: Option<Value>,
/// Event metadata for ordering, provenance, dedupe, and ownership
pub metadata: LaneEventMetadata,
} }
impl LaneEvent { impl LaneEvent {
/// Create a new lane event with minimal metadata (seq=0, provenance=LiveLane)
/// Use `LaneEventBuilder` for events requiring full metadata.
#[must_use] #[must_use]
pub fn new( pub fn new(
event: LaneEventName, event: LaneEventName,
@@ -122,6 +436,7 @@ impl LaneEvent {
failure_class: None, failure_class: None,
detail: None, detail: None,
data: None, data: None,
metadata: LaneEventMetadata::new(0, EventProvenance::LiveLane),
} }
} }
@@ -254,8 +569,10 @@ mod tests {
use serde_json::json; use serde_json::json;
use super::{ use super::{
dedupe_superseded_commit_events, LaneCommitProvenance, LaneEvent, LaneEventBlocker, compute_event_fingerprint, dedupe_superseded_commit_events, dedupe_terminal_events,
LaneEventName, LaneEventStatus, LaneFailureClass, is_terminal_event, EventProvenance, LaneCommitProvenance, LaneEvent, LaneEventBlocker,
LaneEventBuilder, LaneEventMetadata, LaneEventName, LaneEventStatus, LaneFailureClass,
LaneOwnership, SessionIdentity, WatcherAction,
}; };
#[test] #[test]
@@ -420,4 +737,222 @@ mod tests {
assert_eq!(retained.len(), 1); assert_eq!(retained.len(), 1);
assert_eq!(retained[0].detail.as_deref(), Some("new")); assert_eq!(retained[0].detail.as_deref(), Some("new"));
} }
#[test]
fn lane_event_metadata_includes_monotonic_sequence() {
let meta1 = LaneEventMetadata::new(0, EventProvenance::LiveLane);
let meta2 = LaneEventMetadata::new(1, EventProvenance::LiveLane);
let meta3 = LaneEventMetadata::new(2, EventProvenance::Test);
assert_eq!(meta1.seq, 0);
assert_eq!(meta2.seq, 1);
assert_eq!(meta3.seq, 2);
assert!(meta1.timestamp_ms <= meta2.timestamp_ms);
}
#[test]
fn event_provenance_round_trips_through_serialization() {
let cases = [
(EventProvenance::LiveLane, "live_lane"),
(EventProvenance::Test, "test"),
(EventProvenance::Healthcheck, "healthcheck"),
(EventProvenance::Replay, "replay"),
(EventProvenance::Transport, "transport"),
];
for (provenance, expected) in cases {
let json = serde_json::to_value(provenance).expect("should serialize");
assert_eq!(json, serde_json::json!(expected));
let round_trip: EventProvenance =
serde_json::from_value(json).expect("should deserialize");
assert_eq!(round_trip, provenance);
}
}
#[test]
fn session_identity_is_complete_at_creation() {
let identity = SessionIdentity::new("my-lane", "/tmp/repo", "implement feature X");
assert_eq!(identity.title, "my-lane");
assert_eq!(identity.workspace, "/tmp/repo");
assert_eq!(identity.purpose, "implement feature X");
assert!(identity.placeholder_reason.is_none());
// Test with placeholder
let with_placeholder = SessionIdentity::with_placeholder(
"untitled",
"/tmp/unknown",
"unknown",
"session created before title was known",
);
assert_eq!(
with_placeholder.placeholder_reason,
Some("session created before title was known".to_string())
);
}
#[test]
fn lane_ownership_binding_includes_workflow_scope() {
let ownership = LaneOwnership {
owner: "claw-1".to_string(),
workflow_scope: "claw-code-dogfood".to_string(),
watcher_action: WatcherAction::Act,
};
assert_eq!(ownership.owner, "claw-1");
assert_eq!(ownership.workflow_scope, "claw-code-dogfood");
assert_eq!(ownership.watcher_action, WatcherAction::Act);
}
#[test]
fn watcher_action_round_trips_through_serialization() {
let cases = [
(WatcherAction::Act, "act"),
(WatcherAction::Observe, "observe"),
(WatcherAction::Ignore, "ignore"),
];
for (action, expected) in cases {
let json = serde_json::to_value(action).expect("should serialize");
assert_eq!(json, serde_json::json!(expected));
let round_trip: WatcherAction =
serde_json::from_value(json).expect("should deserialize");
assert_eq!(round_trip, action);
}
}
#[test]
fn is_terminal_event_detects_terminal_states() {
assert!(is_terminal_event(LaneEventName::Finished));
assert!(is_terminal_event(LaneEventName::Failed));
assert!(is_terminal_event(LaneEventName::Superseded));
assert!(is_terminal_event(LaneEventName::Closed));
assert!(is_terminal_event(LaneEventName::Merged));
assert!(!is_terminal_event(LaneEventName::Started));
assert!(!is_terminal_event(LaneEventName::Ready));
assert!(!is_terminal_event(LaneEventName::Blocked));
}
#[test]
fn compute_event_fingerprint_is_deterministic() {
let fp1 = compute_event_fingerprint(
&LaneEventName::Finished,
&LaneEventStatus::Completed,
Some(&json!({"commit": "abc123"})),
);
let fp2 = compute_event_fingerprint(
&LaneEventName::Finished,
&LaneEventStatus::Completed,
Some(&json!({"commit": "abc123"})),
);
assert_eq!(fp1, fp2, "same inputs should produce same fingerprint");
assert!(!fp1.is_empty());
assert_eq!(fp1.len(), 16, "fingerprint should be 16 hex chars");
}
#[test]
fn compute_event_fingerprint_differs_for_different_inputs() {
let fp1 =
compute_event_fingerprint(&LaneEventName::Finished, &LaneEventStatus::Completed, None);
let fp2 = compute_event_fingerprint(&LaneEventName::Failed, &LaneEventStatus::Failed, None);
let fp3 = compute_event_fingerprint(
&LaneEventName::Finished,
&LaneEventStatus::Completed,
Some(&json!({"commit": "abc123"})),
);
assert_ne!(fp1, fp2, "different event/status should differ");
assert_ne!(fp1, fp3, "different data should differ");
}
#[test]
fn dedupe_terminal_events_suppresses_duplicates() {
let event1 = LaneEventBuilder::new(
LaneEventName::Finished,
LaneEventStatus::Completed,
"2026-04-04T00:00:00Z",
0,
EventProvenance::LiveLane,
)
.build_terminal();
let event2 = LaneEventBuilder::new(
LaneEventName::Started,
LaneEventStatus::Running,
"2026-04-04T00:00:01Z",
1,
EventProvenance::LiveLane,
)
.build();
let event3 = LaneEventBuilder::new(
LaneEventName::Finished,
LaneEventStatus::Completed,
"2026-04-04T00:00:02Z",
2,
EventProvenance::LiveLane,
)
.build_terminal(); // Same fingerprint as event1
let deduped = dedupe_terminal_events(&[event1.clone(), event2.clone(), event3.clone()]);
assert_eq!(deduped.len(), 2, "should have 2 events after dedupe");
assert_eq!(deduped[0].event, LaneEventName::Finished);
assert_eq!(deduped[1].event, LaneEventName::Started);
// event3 should be suppressed as duplicate of event1
}
#[test]
fn lane_event_builder_constructs_event_with_metadata() {
let event = LaneEventBuilder::new(
LaneEventName::Started,
LaneEventStatus::Running,
"2026-04-04T00:00:00Z",
42,
EventProvenance::Test,
)
.with_session_identity(SessionIdentity::new("test-lane", "/tmp", "test"))
.with_ownership(LaneOwnership {
owner: "bot-1".to_string(),
workflow_scope: "test-suite".to_string(),
watcher_action: WatcherAction::Observe,
})
.with_nudge_id("nudge-123")
.with_detail("starting test run")
.build();
assert_eq!(event.event, LaneEventName::Started);
assert_eq!(event.metadata.seq, 42);
assert_eq!(event.metadata.provenance, EventProvenance::Test);
assert_eq!(
event.metadata.session_identity.as_ref().unwrap().title,
"test-lane"
);
assert_eq!(event.metadata.ownership.as_ref().unwrap().owner, "bot-1");
assert_eq!(event.metadata.nudge_id, Some("nudge-123".to_string()));
assert_eq!(event.detail, Some("starting test run".to_string()));
}
#[test]
fn lane_event_metadata_round_trips_through_serialization() {
let meta = LaneEventMetadata::new(5, EventProvenance::Healthcheck)
.with_session_identity(SessionIdentity::new("lane-1", "/tmp", "purpose"))
.with_nudge_id("nudge-abc");
let json = serde_json::to_value(&meta).expect("should serialize");
assert_eq!(json["seq"], 5);
assert_eq!(json["provenance"], "healthcheck");
assert_eq!(json["nudge_id"], "nudge-abc");
assert!(json["timestamp_ms"].as_u64().is_some());
let round_trip: LaneEventMetadata =
serde_json::from_value(json).expect("should deserialize");
assert_eq!(round_trip.seq, 5);
assert_eq!(round_trip.provenance, EventProvenance::Healthcheck);
assert_eq!(round_trip.nudge_id, Some("nudge-abc".to_string()));
}
} }

View File

@@ -83,8 +83,10 @@ pub use hooks::{
HookAbortSignal, HookEvent, HookProgressEvent, HookProgressReporter, HookRunResult, HookRunner, HookAbortSignal, HookEvent, HookProgressEvent, HookProgressReporter, HookRunResult, HookRunner,
}; };
pub use lane_events::{ pub use lane_events::{
dedupe_superseded_commit_events, LaneCommitProvenance, LaneEvent, LaneEventBlocker, compute_event_fingerprint, dedupe_superseded_commit_events, dedupe_terminal_events,
LaneEventName, LaneEventStatus, LaneFailureClass, is_terminal_event, EventProvenance, LaneCommitProvenance, LaneEvent, LaneEventBlocker,
LaneEventBuilder, LaneEventMetadata, LaneEventName, LaneEventStatus, LaneFailureClass,
LaneOwnership, SessionIdentity, WatcherAction,
}; };
pub use mcp::{ pub use mcp::{
mcp_server_signature, mcp_tool_name, mcp_tool_prefix, normalize_name_for_mcp, mcp_server_signature, mcp_tool_name, mcp_tool_prefix, normalize_name_for_mcp,