diff --git a/rust/crates/runtime/src/lane_events.rs b/rust/crates/runtime/src/lane_events.rs index 603a375..03d8ace 100644 --- a/rust/crates/runtime/src/lane_events.rs +++ b/rust/crates/runtime/src/lane_events.rs @@ -1,4 +1,4 @@ -#![allow(clippy::similar_names)] +#![allow(clippy::similar_names, clippy::cast_possible_truncation)] use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -73,6 +73,316 @@ pub enum LaneFailureClass { Infra, } +/// Provenance labels for event source classification. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EventProvenance { + /// Event from a live, active lane + LiveLane, + /// Event from a synthetic test + Test, + /// Event from a healthcheck probe + Healthcheck, + /// Event from a replay/log replay + Replay, + /// Event from the transport layer itself + Transport, +} + +/// Session identity metadata captured at creation time. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SessionIdentity { + /// Stable title for the session + pub title: String, + /// Workspace/worktree path + pub workspace: String, + /// Lane/session purpose + pub purpose: String, + /// Placeholder reason if any field is unknown + #[serde(skip_serializing_if = "Option::is_none")] + pub placeholder_reason: Option, +} + +impl SessionIdentity { + /// Create complete session identity + #[must_use] + pub fn new( + title: impl Into, + workspace: impl Into, + purpose: impl Into, + ) -> Self { + Self { + title: title.into(), + workspace: workspace.into(), + purpose: purpose.into(), + placeholder_reason: None, + } + } + + /// Create session identity with placeholder for missing fields + #[must_use] + pub fn with_placeholder( + title: impl Into, + workspace: impl Into, + purpose: impl Into, + reason: impl Into, + ) -> Self { + Self { + title: title.into(), + workspace: workspace.into(), + purpose: purpose.into(), + placeholder_reason: Some(reason.into()), + } + } +} + +/// Lane ownership and workflow scope binding. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneOwnership { + /// Owner/assignee identity + pub owner: String, + /// Workflow scope (e.g., claw-code-dogfood, external-git-maintenance) + pub workflow_scope: String, + /// Whether the watcher is expected to act, observe, or ignore + pub watcher_action: WatcherAction, +} + +/// Watcher action expectation for a lane event. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WatcherAction { + /// Watcher should take action on this event + Act, + /// Watcher should only observe + Observe, + /// Watcher should ignore this event + Ignore, +} + +/// Event metadata for ordering, provenance, deduplication, and ownership. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneEventMetadata { + /// Monotonic sequence number for event ordering + pub seq: u64, + /// Event provenance source + pub provenance: EventProvenance, + /// Session identity at creation + #[serde(skip_serializing_if = "Option::is_none")] + pub session_identity: Option, + /// Lane ownership and scope + #[serde(skip_serializing_if = "Option::is_none")] + pub ownership: Option, + /// Nudge ID for deduplication cycles + #[serde(skip_serializing_if = "Option::is_none")] + pub nudge_id: Option, + /// Event fingerprint for terminal event deduplication + #[serde(skip_serializing_if = "Option::is_none")] + pub event_fingerprint: Option, + /// Timestamp when event was observed/created + pub timestamp_ms: u64, +} + +impl LaneEventMetadata { + /// Create new event metadata + #[must_use] + pub fn new(seq: u64, provenance: EventProvenance) -> Self { + Self { + seq, + provenance, + session_identity: None, + ownership: None, + nudge_id: None, + event_fingerprint: None, + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + } + } + + /// Add session identity + #[must_use] + pub fn with_session_identity(mut self, identity: SessionIdentity) -> Self { + self.session_identity = Some(identity); + self + } + + /// Add ownership info + #[must_use] + pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self { + self.ownership = Some(ownership); + self + } + + /// Add nudge ID for dedupe + #[must_use] + pub fn with_nudge_id(mut self, nudge_id: impl Into) -> Self { + self.nudge_id = Some(nudge_id.into()); + self + } + + /// Compute and add event fingerprint for terminal events + #[must_use] + pub fn with_fingerprint(mut self, fingerprint: impl Into) -> Self { + self.event_fingerprint = Some(fingerprint.into()); + self + } +} + +/// Builder for constructing [`LaneEvent`]s with proper metadata. +#[derive(Debug, Clone)] +pub struct LaneEventBuilder { + event: LaneEventName, + status: LaneEventStatus, + emitted_at: String, + metadata: LaneEventMetadata, + detail: Option, + failure_class: Option, + data: Option, +} + +impl LaneEventBuilder { + /// Start building a new lane event + #[must_use] + pub fn new( + event: LaneEventName, + status: LaneEventStatus, + emitted_at: impl Into, + seq: u64, + provenance: EventProvenance, + ) -> Self { + Self { + event, + status, + emitted_at: emitted_at.into(), + metadata: LaneEventMetadata::new(seq, provenance), + detail: None, + failure_class: None, + data: None, + } + } + + /// Add session identity + #[must_use] + pub fn with_session_identity(mut self, identity: SessionIdentity) -> Self { + self.metadata = self.metadata.with_session_identity(identity); + self + } + + /// Add ownership info + #[must_use] + pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self { + self.metadata = self.metadata.with_ownership(ownership); + self + } + + /// Add nudge ID + #[must_use] + pub fn with_nudge_id(mut self, nudge_id: impl Into) -> Self { + self.metadata = self.metadata.with_nudge_id(nudge_id); + self + } + + /// Add detail + #[must_use] + pub fn with_detail(mut self, detail: impl Into) -> Self { + self.detail = Some(detail.into()); + self + } + + /// Add failure class + #[must_use] + pub fn with_failure_class(mut self, failure_class: LaneFailureClass) -> Self { + self.failure_class = Some(failure_class); + self + } + + /// Add data payload + #[must_use] + pub fn with_data(mut self, data: serde_json::Value) -> Self { + self.data = Some(data); + self + } + + /// Compute fingerprint and build terminal event + #[must_use] + pub fn build_terminal(mut self) -> LaneEvent { + let fingerprint = compute_event_fingerprint(&self.event, &self.status, self.data.as_ref()); + self.metadata = self.metadata.with_fingerprint(fingerprint); + self.build() + } + + /// Build the event + #[must_use] + pub fn build(self) -> LaneEvent { + LaneEvent { + event: self.event, + status: self.status, + emitted_at: self.emitted_at, + failure_class: self.failure_class, + detail: self.detail, + data: self.data, + metadata: self.metadata, + } + } +} + +/// Check if an event kind is terminal (completed, failed, superseded, closed). +#[must_use] +pub fn is_terminal_event(event: LaneEventName) -> bool { + matches!( + event, + LaneEventName::Finished + | LaneEventName::Failed + | LaneEventName::Superseded + | LaneEventName::Closed + | LaneEventName::Merged + ) +} + +/// Compute a fingerprint for terminal event deduplication. +#[must_use] +pub fn compute_event_fingerprint( + event: &LaneEventName, + status: &LaneEventStatus, + data: Option<&serde_json::Value>, +) -> String { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + format!("{event:?}").hash(&mut hasher); + format!("{status:?}").hash(&mut hasher); + if let Some(d) = data { + serde_json::to_string(d) + .unwrap_or_default() + .hash(&mut hasher); + } + format!("{:016x}", hasher.finish()) +} + +/// Deduplicate terminal events within a reconciliation window. +/// Returns only the first occurrence of each terminal fingerprint. +#[must_use] +pub fn dedupe_terminal_events(events: &[LaneEvent]) -> Vec { + let mut seen_fingerprints = std::collections::HashSet::new(); + let mut result = Vec::new(); + + for event in events { + if is_terminal_event(event.event) { + if let Some(fp) = &event.metadata.event_fingerprint { + if seen_fingerprints.contains(fp) { + continue; // Skip duplicate terminal event + } + seen_fingerprints.insert(fp.clone()); + } + } + result.push(event.clone()); + } + + result +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct LaneEventBlocker { #[serde(rename = "failureClass")] @@ -106,9 +416,13 @@ pub struct LaneEvent { pub detail: Option, #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, + /// Event metadata for ordering, provenance, dedupe, and ownership + pub metadata: LaneEventMetadata, } impl LaneEvent { + /// Create a new lane event with minimal metadata (seq=0, provenance=LiveLane) + /// Use `LaneEventBuilder` for events requiring full metadata. #[must_use] pub fn new( event: LaneEventName, @@ -122,6 +436,7 @@ impl LaneEvent { failure_class: None, detail: None, data: None, + metadata: LaneEventMetadata::new(0, EventProvenance::LiveLane), } } @@ -254,8 +569,10 @@ mod tests { use serde_json::json; use super::{ - dedupe_superseded_commit_events, LaneCommitProvenance, LaneEvent, LaneEventBlocker, - LaneEventName, LaneEventStatus, LaneFailureClass, + compute_event_fingerprint, dedupe_superseded_commit_events, dedupe_terminal_events, + is_terminal_event, EventProvenance, LaneCommitProvenance, LaneEvent, LaneEventBlocker, + LaneEventBuilder, LaneEventMetadata, LaneEventName, LaneEventStatus, LaneFailureClass, + LaneOwnership, SessionIdentity, WatcherAction, }; #[test] @@ -420,4 +737,222 @@ mod tests { assert_eq!(retained.len(), 1); assert_eq!(retained[0].detail.as_deref(), Some("new")); } + + #[test] + fn lane_event_metadata_includes_monotonic_sequence() { + let meta1 = LaneEventMetadata::new(0, EventProvenance::LiveLane); + let meta2 = LaneEventMetadata::new(1, EventProvenance::LiveLane); + let meta3 = LaneEventMetadata::new(2, EventProvenance::Test); + + assert_eq!(meta1.seq, 0); + assert_eq!(meta2.seq, 1); + assert_eq!(meta3.seq, 2); + assert!(meta1.timestamp_ms <= meta2.timestamp_ms); + } + + #[test] + fn event_provenance_round_trips_through_serialization() { + let cases = [ + (EventProvenance::LiveLane, "live_lane"), + (EventProvenance::Test, "test"), + (EventProvenance::Healthcheck, "healthcheck"), + (EventProvenance::Replay, "replay"), + (EventProvenance::Transport, "transport"), + ]; + + for (provenance, expected) in cases { + let json = serde_json::to_value(provenance).expect("should serialize"); + assert_eq!(json, serde_json::json!(expected)); + + let round_trip: EventProvenance = + serde_json::from_value(json).expect("should deserialize"); + assert_eq!(round_trip, provenance); + } + } + + #[test] + fn session_identity_is_complete_at_creation() { + let identity = SessionIdentity::new("my-lane", "/tmp/repo", "implement feature X"); + + assert_eq!(identity.title, "my-lane"); + assert_eq!(identity.workspace, "/tmp/repo"); + assert_eq!(identity.purpose, "implement feature X"); + assert!(identity.placeholder_reason.is_none()); + + // Test with placeholder + let with_placeholder = SessionIdentity::with_placeholder( + "untitled", + "/tmp/unknown", + "unknown", + "session created before title was known", + ); + assert_eq!( + with_placeholder.placeholder_reason, + Some("session created before title was known".to_string()) + ); + } + + #[test] + fn lane_ownership_binding_includes_workflow_scope() { + let ownership = LaneOwnership { + owner: "claw-1".to_string(), + workflow_scope: "claw-code-dogfood".to_string(), + watcher_action: WatcherAction::Act, + }; + + assert_eq!(ownership.owner, "claw-1"); + assert_eq!(ownership.workflow_scope, "claw-code-dogfood"); + assert_eq!(ownership.watcher_action, WatcherAction::Act); + } + + #[test] + fn watcher_action_round_trips_through_serialization() { + let cases = [ + (WatcherAction::Act, "act"), + (WatcherAction::Observe, "observe"), + (WatcherAction::Ignore, "ignore"), + ]; + + for (action, expected) in cases { + let json = serde_json::to_value(action).expect("should serialize"); + assert_eq!(json, serde_json::json!(expected)); + + let round_trip: WatcherAction = + serde_json::from_value(json).expect("should deserialize"); + assert_eq!(round_trip, action); + } + } + + #[test] + fn is_terminal_event_detects_terminal_states() { + assert!(is_terminal_event(LaneEventName::Finished)); + assert!(is_terminal_event(LaneEventName::Failed)); + assert!(is_terminal_event(LaneEventName::Superseded)); + assert!(is_terminal_event(LaneEventName::Closed)); + assert!(is_terminal_event(LaneEventName::Merged)); + + assert!(!is_terminal_event(LaneEventName::Started)); + assert!(!is_terminal_event(LaneEventName::Ready)); + assert!(!is_terminal_event(LaneEventName::Blocked)); + } + + #[test] + fn compute_event_fingerprint_is_deterministic() { + let fp1 = compute_event_fingerprint( + &LaneEventName::Finished, + &LaneEventStatus::Completed, + Some(&json!({"commit": "abc123"})), + ); + let fp2 = compute_event_fingerprint( + &LaneEventName::Finished, + &LaneEventStatus::Completed, + Some(&json!({"commit": "abc123"})), + ); + + assert_eq!(fp1, fp2, "same inputs should produce same fingerprint"); + assert!(!fp1.is_empty()); + assert_eq!(fp1.len(), 16, "fingerprint should be 16 hex chars"); + } + + #[test] + fn compute_event_fingerprint_differs_for_different_inputs() { + let fp1 = + compute_event_fingerprint(&LaneEventName::Finished, &LaneEventStatus::Completed, None); + let fp2 = compute_event_fingerprint(&LaneEventName::Failed, &LaneEventStatus::Failed, None); + let fp3 = compute_event_fingerprint( + &LaneEventName::Finished, + &LaneEventStatus::Completed, + Some(&json!({"commit": "abc123"})), + ); + + assert_ne!(fp1, fp2, "different event/status should differ"); + assert_ne!(fp1, fp3, "different data should differ"); + } + + #[test] + fn dedupe_terminal_events_suppresses_duplicates() { + let event1 = LaneEventBuilder::new( + LaneEventName::Finished, + LaneEventStatus::Completed, + "2026-04-04T00:00:00Z", + 0, + EventProvenance::LiveLane, + ) + .build_terminal(); + + let event2 = LaneEventBuilder::new( + LaneEventName::Started, + LaneEventStatus::Running, + "2026-04-04T00:00:01Z", + 1, + EventProvenance::LiveLane, + ) + .build(); + + let event3 = LaneEventBuilder::new( + LaneEventName::Finished, + LaneEventStatus::Completed, + "2026-04-04T00:00:02Z", + 2, + EventProvenance::LiveLane, + ) + .build_terminal(); // Same fingerprint as event1 + + let deduped = dedupe_terminal_events(&[event1.clone(), event2.clone(), event3.clone()]); + + assert_eq!(deduped.len(), 2, "should have 2 events after dedupe"); + assert_eq!(deduped[0].event, LaneEventName::Finished); + assert_eq!(deduped[1].event, LaneEventName::Started); + // event3 should be suppressed as duplicate of event1 + } + + #[test] + fn lane_event_builder_constructs_event_with_metadata() { + let event = LaneEventBuilder::new( + LaneEventName::Started, + LaneEventStatus::Running, + "2026-04-04T00:00:00Z", + 42, + EventProvenance::Test, + ) + .with_session_identity(SessionIdentity::new("test-lane", "/tmp", "test")) + .with_ownership(LaneOwnership { + owner: "bot-1".to_string(), + workflow_scope: "test-suite".to_string(), + watcher_action: WatcherAction::Observe, + }) + .with_nudge_id("nudge-123") + .with_detail("starting test run") + .build(); + + assert_eq!(event.event, LaneEventName::Started); + assert_eq!(event.metadata.seq, 42); + assert_eq!(event.metadata.provenance, EventProvenance::Test); + assert_eq!( + event.metadata.session_identity.as_ref().unwrap().title, + "test-lane" + ); + assert_eq!(event.metadata.ownership.as_ref().unwrap().owner, "bot-1"); + assert_eq!(event.metadata.nudge_id, Some("nudge-123".to_string())); + assert_eq!(event.detail, Some("starting test run".to_string())); + } + + #[test] + fn lane_event_metadata_round_trips_through_serialization() { + let meta = LaneEventMetadata::new(5, EventProvenance::Healthcheck) + .with_session_identity(SessionIdentity::new("lane-1", "/tmp", "purpose")) + .with_nudge_id("nudge-abc"); + + let json = serde_json::to_value(&meta).expect("should serialize"); + assert_eq!(json["seq"], 5); + assert_eq!(json["provenance"], "healthcheck"); + assert_eq!(json["nudge_id"], "nudge-abc"); + assert!(json["timestamp_ms"].as_u64().is_some()); + + let round_trip: LaneEventMetadata = + serde_json::from_value(json).expect("should deserialize"); + assert_eq!(round_trip.seq, 5); + assert_eq!(round_trip.provenance, EventProvenance::Healthcheck); + assert_eq!(round_trip.nudge_id, Some("nudge-abc".to_string())); + } } diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index e691df2..432e1c1 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -83,8 +83,10 @@ pub use hooks::{ HookAbortSignal, HookEvent, HookProgressEvent, HookProgressReporter, HookRunResult, HookRunner, }; pub use lane_events::{ - dedupe_superseded_commit_events, LaneCommitProvenance, LaneEvent, LaneEventBlocker, - LaneEventName, LaneEventStatus, LaneFailureClass, + compute_event_fingerprint, dedupe_superseded_commit_events, dedupe_terminal_events, + is_terminal_event, EventProvenance, LaneCommitProvenance, LaneEvent, LaneEventBlocker, + LaneEventBuilder, LaneEventMetadata, LaneEventName, LaneEventStatus, LaneFailureClass, + LaneOwnership, SessionIdentity, WatcherAction, }; pub use mcp::{ mcp_server_signature, mcp_tool_name, mcp_tool_prefix, normalize_name_for_mcp,