From db065178346526199626a9d0582853853733df14 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 11:35:07 -0700 Subject: [PATCH 1/2] feat: add MtimePollingWatcher and WatchMode for FileWatch polling fallback From 84b549c5e11aaec215dc07b047f9bdb8da3cf800 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 11:37:42 -0700 Subject: [PATCH 2/2] feat(scheduler): add WatchMode enum and MtimePollingWatcher fallback for FileWatchStrategy --- crates/orchestrator/src/scheduler/strategy.rs | 247 +++++++++++++++--- 1 file changed, 217 insertions(+), 30 deletions(-) diff --git a/crates/orchestrator/src/scheduler/strategy.rs b/crates/orchestrator/src/scheduler/strategy.rs index 9f3af685..071c71da 100644 --- a/crates/orchestrator/src/scheduler/strategy.rs +++ b/crates/orchestrator/src/scheduler/strategy.rs @@ -14,9 +14,9 @@ use croner::Cron; use globset::{Glob, GlobSet, GlobSetBuilder}; use notify::Watcher as _; use std::collections::HashMap; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio::sync::{broadcast, mpsc, watch}; use tracing::{info, warn}; use uuid::Uuid; @@ -857,9 +857,153 @@ fn event_kind_to_str(kind: ¬ify::EventKind) -> &'static str { } } -/// A [`TriggerStrategy`] that watches filesystem paths for changes using the -/// OS-native notification API (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW -/// on Windows) via the `notify` crate. +/// Selects which backend [`FileWatchStrategy`] uses for detecting filesystem changes. +/// +/// | Variant | Description | +/// |-----------|-------------| +/// | `Native` | OS-native events (inotify / FSEvents / kqueue). Low latency, no polling. | +/// | `Polling` | mtime polling at a fixed interval. Works on any filesystem. | +/// | `Auto` | Tries native first; silently falls back to polling if native setup fails. | +#[derive(Debug, Clone)] +pub enum WatchMode { + /// Use OS-native filesystem events only. + Native, + /// Poll file mtimes at the given interval. + Polling { + /// Seconds between each scan of the watched directories. + interval_secs: u64, + }, + /// Try native; fall back to polling if native is unavailable. + Auto { + /// Polling interval (seconds) used when falling back to mtime polling. + poll_interval_secs: u64, + }, +} + +impl Default for WatchMode { + fn default() -> Self { + WatchMode::Auto { poll_interval_secs: 5 } + } +} + +/// Internal handle keeping the active watcher alive for the duration of a +/// [`FileWatchStrategy`]. +enum FileWatchHandle { + /// Native OS watcher — kept alive by holding the struct. + Native(notify::RecommendedWatcher), + /// Background mtime polling task — kept alive until abort. + Polling(tokio::task::JoinHandle<()>), +} + +/// Recursively walk `root` and collect the mtime for every regular file. +fn snapshot_mtimes(root: &Path) -> HashMap { + let mut map = HashMap::new(); + let Ok(walker) = std::fs::read_dir(root) else { return map }; + let mut stack = vec![walker]; + while let Some(dir) = stack.last_mut() { + match dir.next() { + None => { + stack.pop(); + } + Some(Ok(entry)) => { + let path = entry.path(); + if let Ok(meta) = std::fs::metadata(&path) { + if meta.is_dir() { + if let Ok(sub) = std::fs::read_dir(&path) { + stack.push(sub); + } + } else if meta.is_file() { + if let Ok(mtime) = meta.modified() { + map.insert(path, mtime); + } + } + } + } + Some(Err(_)) => {} + } + } + map +} + +/// Spawn a background task that polls file mtimes and forwards synthetic +/// [`notify::Event`]s through `tx`. +fn spawn_polling_watcher( + paths: Vec, + interval_secs: u64, + tx: mpsc::UnboundedSender>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Build initial snapshot. + let mut prev: HashMap = + paths.iter().flat_map(|p| snapshot_mtimes(p)).collect(); + + let interval = Duration::from_secs(interval_secs.max(1)); + let mut ticker = tokio::time::interval(interval); + ticker.tick().await; // consume the immediate first tick + + loop { + ticker.tick().await; + + // Build current snapshot. + let current: HashMap = + paths.iter().flat_map(|p| snapshot_mtimes(p)).collect(); + + // Detect created and modified files. + for (path, mtime) in ¤t { + match prev.get(path) { + None => { + // New file. + let ev = notify::Event { + kind: notify::EventKind::Create(notify::event::CreateKind::File), + paths: vec![path.clone()], + attrs: Default::default(), + }; + if tx.send(Ok(ev)).is_err() { + return; + } + } + Some(prev_mtime) if prev_mtime != mtime => { + // Modified file. + let ev = notify::Event { + kind: notify::EventKind::Modify(notify::event::ModifyKind::Data( + notify::event::DataChange::Content, + )), + paths: vec![path.clone()], + attrs: Default::default(), + }; + if tx.send(Ok(ev)).is_err() { + return; + } + } + _ => {} + } + } + + // Detect removed files. + for path in prev.keys() { + if !current.contains_key(path) { + let ev = notify::Event { + kind: notify::EventKind::Remove(notify::event::RemoveKind::File), + paths: vec![path.clone()], + attrs: Default::default(), + }; + if tx.send(Ok(ev)).is_err() { + return; + } + } + } + + prev = current; + } + }) +} + +/// A [`TriggerStrategy`] that watches filesystem paths for changes. +/// +/// By default uses the OS-native notification API (inotify on Linux, FSEvents +/// on macOS, ReadDirectoryChangesW on Windows) via the `notify` crate. When +/// native watching is unavailable, or when [`WatchMode::Polling`] is selected, +/// an mtime-based polling loop is used instead. /// /// When a matching filesystem event occurs the strategy fires a synthetic /// [`Task`] carrying the changed file's path, name, containing directory, and @@ -874,7 +1018,7 @@ fn event_kind_to_str(kind: ¬ify::EventKind) -> &'static str { /// # Event Kind Filtering /// /// `event_kinds` contains lowercase strings: `"create"`, `"modify"`, -/// `"delete"`, `"access"`, `"rename"`. An empty vec accepts all kinds. +/// `"delete"`, `"access"`. An empty vec accepts all kinds. /// /// # Debouncing /// @@ -892,20 +1036,20 @@ fn event_kind_to_str(kind: ¬ify::EventKind) -> &'static str { /// # Example /// /// ```rust,ignore -/// use orchestrator::scheduler::strategy::FileWatchStrategy; +/// use orchestrator::scheduler::strategy::{FileWatchStrategy, WatchMode}; /// /// let strategy = FileWatchStrategy::new( /// vec!["/tmp/watched".into()], /// Some(vec!["**/*.toml".into()]), /// vec!["create".into(), "modify".into()], -/// 200, // 200 ms debounce +/// 200, // 200 ms debounce +/// WatchMode::Auto { poll_interval_secs: 5 }, /// )?; /// ``` pub struct FileWatchStrategy { - /// The OS-level filesystem watcher — must be kept alive for the lifetime of - /// this strategy or events will stop being delivered. - _watcher: notify::RecommendedWatcher, - /// Async receiver end of the mpsc bridge from the sync `notify` callback. + /// Active watcher handle — kept alive for the lifetime of this strategy. + _handle: FileWatchHandle, + /// Async receiver end of the mpsc bridge. rx: mpsc::UnboundedReceiver>, /// Optional glob patterns — `None` means accept every path. include_patterns: Option, @@ -926,31 +1070,17 @@ impl FileWatchStrategy { /// * `event_kinds` — which event kinds to accept (`"create"`, `"modify"`, /// `"delete"`, `"access"`). An empty vec accepts all. /// * `debounce_ms` — debounce window in milliseconds (0 = no debounce). + /// * `mode` — which watching backend to use (native, polling, or auto). pub fn new( paths: Vec, patterns: Option>, event_kinds: Vec, debounce_ms: u64, + mode: WatchMode, ) -> anyhow::Result { let (tx, rx) = mpsc::unbounded_channel(); - // Build the OS-native watcher. The callback bridges sync notify events - // into the async mpsc channel. - let mut watcher = notify::RecommendedWatcher::new( - move |res: notify::Result| { - // Ignore send errors — receiver may have been dropped during shutdown. - let _ = tx.send(res); - }, - notify::Config::default(), - ) - .map_err(|e| anyhow::anyhow!("Failed to create filesystem watcher: {}", e))?; - - // Register each path for recursive watching. - for path in &paths { - watcher - .watch(path, notify::RecursiveMode::Recursive) - .map_err(|e| anyhow::anyhow!("Failed to watch path {:?}: {}", path, e))?; - } + let handle = Self::create_handle(paths.clone(), mode, tx)?; // Build the optional glob set. let include_patterns = match patterns { @@ -972,7 +1102,7 @@ impl FileWatchStrategy { }; Ok(Self { - _watcher: watcher, + _handle: handle, rx, include_patterns, event_kinds, @@ -981,6 +1111,63 @@ impl FileWatchStrategy { }) } + /// Create the appropriate [`FileWatchHandle`] for the given mode. + fn create_handle( + paths: Vec, + mode: WatchMode, + tx: mpsc::UnboundedSender>, + ) -> anyhow::Result { + match mode { + WatchMode::Native => { + let handle = Self::create_native_watcher(paths, tx)?; + Ok(FileWatchHandle::Native(handle)) + } + WatchMode::Polling { interval_secs } => { + let join = spawn_polling_watcher(paths, interval_secs, tx); + Ok(FileWatchHandle::Polling(join)) + } + WatchMode::Auto { poll_interval_secs } => { + // Attempt native first; fall back to polling on failure. + match Self::create_native_watcher(paths.clone(), tx.clone()) { + Ok(watcher) => { + info!("FileWatchStrategy: using native OS watcher"); + Ok(FileWatchHandle::Native(watcher)) + } + Err(e) => { + warn!( + %e, + interval_secs = poll_interval_secs, + "FileWatchStrategy: native watcher unavailable, falling back to mtime polling" + ); + let join = spawn_polling_watcher(paths, poll_interval_secs, tx); + Ok(FileWatchHandle::Polling(join)) + } + } + } + } + } + + /// Build and configure an OS-native `RecommendedWatcher`. + fn create_native_watcher( + paths: Vec, + tx: mpsc::UnboundedSender>, + ) -> anyhow::Result { + let mut watcher = notify::RecommendedWatcher::new( + move |res: notify::Result| { + let _ = tx.send(res); + }, + notify::Config::default(), + ) + .map_err(|e| anyhow::anyhow!("Failed to create filesystem watcher: {}", e))?; + + for path in &paths { + watcher + .watch(path, notify::RecursiveMode::Recursive) + .map_err(|e| anyhow::anyhow!("Failed to watch path {:?}: {}", path, e))?; + } + Ok(watcher) + } + /// Returns `true` if the event passes the kind and pattern filters. fn matches_event(&self, event: ¬ify::Event) -> bool { // Filter by event kind.