Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 217 additions & 30 deletions crates/orchestrator/src/scheduler/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use croner::Cron;
use globset::{Glob, GlobSet, GlobSetBuilder};
use notify::Watcher as _;
use std::collections::HashMap;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{info, warn};
use uuid::Uuid;
Expand Down Expand Up @@ -857,9 +857,153 @@ fn event_kind_to_str(kind: &notify::EventKind) -> &'static str {
}
}

/// A [`TriggerStrategy`] that watches filesystem paths for changes using the
/// OS-native notification API (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW
/// on Windows) via the `notify` crate.
/// Selects which backend [`FileWatchStrategy`] uses for detecting filesystem changes.
///
/// | Variant | Description |
/// |-----------|-------------|
/// | `Native` | OS-native events (inotify / FSEvents / kqueue). Low latency, no polling. |
/// | `Polling` | mtime polling at a fixed interval. Works on any filesystem. |
/// | `Auto` | Tries native first; silently falls back to polling if native setup fails. |
#[derive(Debug, Clone)]
pub enum WatchMode {
/// Use OS-native filesystem events only.
Native,
/// Poll file mtimes at the given interval.
Polling {
/// Seconds between each scan of the watched directories.
interval_secs: u64,
},
/// Try native; fall back to polling if native is unavailable.
Auto {
/// Polling interval (seconds) used when falling back to mtime polling.
poll_interval_secs: u64,
},
}

impl Default for WatchMode {
fn default() -> Self {
WatchMode::Auto { poll_interval_secs: 5 }
}
}

/// Internal handle keeping the active watcher alive for the duration of a
/// [`FileWatchStrategy`].
enum FileWatchHandle {
/// Native OS watcher — kept alive by holding the struct.
Native(notify::RecommendedWatcher),
/// Background mtime polling task — kept alive until abort.
Polling(tokio::task::JoinHandle<()>),
}

/// Recursively walk `root` and collect the mtime for every regular file.
fn snapshot_mtimes(root: &Path) -> HashMap<PathBuf, SystemTime> {
let mut map = HashMap::new();
let Ok(walker) = std::fs::read_dir(root) else { return map };
let mut stack = vec![walker];
while let Some(dir) = stack.last_mut() {
match dir.next() {
None => {
stack.pop();
}
Some(Ok(entry)) => {
let path = entry.path();
if let Ok(meta) = std::fs::metadata(&path) {
if meta.is_dir() {
if let Ok(sub) = std::fs::read_dir(&path) {
stack.push(sub);
}
} else if meta.is_file() {
if let Ok(mtime) = meta.modified() {
map.insert(path, mtime);
}
}
}
}
Some(Err(_)) => {}
}
}
map
}

/// Spawn a background task that polls file mtimes and forwards synthetic
/// [`notify::Event`]s through `tx`.
fn spawn_polling_watcher(
paths: Vec<PathBuf>,
interval_secs: u64,
tx: mpsc::UnboundedSender<notify::Result<notify::Event>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
// Build initial snapshot.
let mut prev: HashMap<PathBuf, SystemTime> =
paths.iter().flat_map(|p| snapshot_mtimes(p)).collect();

let interval = Duration::from_secs(interval_secs.max(1));
let mut ticker = tokio::time::interval(interval);
ticker.tick().await; // consume the immediate first tick

loop {
ticker.tick().await;

// Build current snapshot.
let current: HashMap<PathBuf, SystemTime> =
paths.iter().flat_map(|p| snapshot_mtimes(p)).collect();

// Detect created and modified files.
for (path, mtime) in &current {
match prev.get(path) {
None => {
// New file.
let ev = notify::Event {
kind: notify::EventKind::Create(notify::event::CreateKind::File),
paths: vec![path.clone()],
attrs: Default::default(),
};
if tx.send(Ok(ev)).is_err() {
return;
}
}
Some(prev_mtime) if prev_mtime != mtime => {
// Modified file.
let ev = notify::Event {
kind: notify::EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![path.clone()],
attrs: Default::default(),
};
if tx.send(Ok(ev)).is_err() {
return;
}
}
_ => {}
}
}

// Detect removed files.
for path in prev.keys() {
if !current.contains_key(path) {
let ev = notify::Event {
kind: notify::EventKind::Remove(notify::event::RemoveKind::File),
paths: vec![path.clone()],
attrs: Default::default(),
};
if tx.send(Ok(ev)).is_err() {
return;
}
}
}

prev = current;
}
})
}

/// A [`TriggerStrategy`] that watches filesystem paths for changes.
///
/// By default uses the OS-native notification API (inotify on Linux, FSEvents
/// on macOS, ReadDirectoryChangesW on Windows) via the `notify` crate. When
/// native watching is unavailable, or when [`WatchMode::Polling`] is selected,
/// an mtime-based polling loop is used instead.
///
/// When a matching filesystem event occurs the strategy fires a synthetic
/// [`Task`] carrying the changed file's path, name, containing directory, and
Expand All @@ -874,7 +1018,7 @@ fn event_kind_to_str(kind: &notify::EventKind) -> &'static str {
/// # Event Kind Filtering
///
/// `event_kinds` contains lowercase strings: `"create"`, `"modify"`,
/// `"delete"`, `"access"`, `"rename"`. An empty vec accepts all kinds.
/// `"delete"`, `"access"`. An empty vec accepts all kinds.
///
/// # Debouncing
///
Expand All @@ -892,20 +1036,20 @@ fn event_kind_to_str(kind: &notify::EventKind) -> &'static str {
/// # Example
///
/// ```rust,ignore
/// use orchestrator::scheduler::strategy::FileWatchStrategy;
/// use orchestrator::scheduler::strategy::{FileWatchStrategy, WatchMode};
///
/// let strategy = FileWatchStrategy::new(
/// vec!["/tmp/watched".into()],
/// Some(vec!["**/*.toml".into()]),
/// vec!["create".into(), "modify".into()],
/// 200, // 200 ms debounce
/// 200, // 200 ms debounce
/// WatchMode::Auto { poll_interval_secs: 5 },
/// )?;
/// ```
pub struct FileWatchStrategy {
/// The OS-level filesystem watcher — must be kept alive for the lifetime of
/// this strategy or events will stop being delivered.
_watcher: notify::RecommendedWatcher,
/// Async receiver end of the mpsc bridge from the sync `notify` callback.
/// Active watcher handle — kept alive for the lifetime of this strategy.
_handle: FileWatchHandle,
/// Async receiver end of the mpsc bridge.
rx: mpsc::UnboundedReceiver<notify::Result<notify::Event>>,
/// Optional glob patterns — `None` means accept every path.
include_patterns: Option<GlobSet>,
Expand All @@ -926,31 +1070,17 @@ impl FileWatchStrategy {
/// * `event_kinds` — which event kinds to accept (`"create"`, `"modify"`,
/// `"delete"`, `"access"`). An empty vec accepts all.
/// * `debounce_ms` — debounce window in milliseconds (0 = no debounce).
/// * `mode` — which watching backend to use (native, polling, or auto).
pub fn new(
paths: Vec<PathBuf>,
patterns: Option<Vec<String>>,
event_kinds: Vec<String>,
debounce_ms: u64,
mode: WatchMode,
) -> anyhow::Result<Self> {
let (tx, rx) = mpsc::unbounded_channel();

// Build the OS-native watcher. The callback bridges sync notify events
// into the async mpsc channel.
let mut watcher = notify::RecommendedWatcher::new(
move |res: notify::Result<notify::Event>| {
// Ignore send errors — receiver may have been dropped during shutdown.
let _ = tx.send(res);
},
notify::Config::default(),
)
.map_err(|e| anyhow::anyhow!("Failed to create filesystem watcher: {}", e))?;

// Register each path for recursive watching.
for path in &paths {
watcher
.watch(path, notify::RecursiveMode::Recursive)
.map_err(|e| anyhow::anyhow!("Failed to watch path {:?}: {}", path, e))?;
}
let handle = Self::create_handle(paths.clone(), mode, tx)?;

// Build the optional glob set.
let include_patterns = match patterns {
Expand All @@ -972,7 +1102,7 @@ impl FileWatchStrategy {
};

Ok(Self {
_watcher: watcher,
_handle: handle,
rx,
include_patterns,
event_kinds,
Expand All @@ -981,6 +1111,63 @@ impl FileWatchStrategy {
})
}

/// Create the appropriate [`FileWatchHandle`] for the given mode.
fn create_handle(
paths: Vec<PathBuf>,
mode: WatchMode,
tx: mpsc::UnboundedSender<notify::Result<notify::Event>>,
) -> anyhow::Result<FileWatchHandle> {
match mode {
WatchMode::Native => {
let handle = Self::create_native_watcher(paths, tx)?;
Ok(FileWatchHandle::Native(handle))
}
WatchMode::Polling { interval_secs } => {
let join = spawn_polling_watcher(paths, interval_secs, tx);
Ok(FileWatchHandle::Polling(join))
}
WatchMode::Auto { poll_interval_secs } => {
// Attempt native first; fall back to polling on failure.
match Self::create_native_watcher(paths.clone(), tx.clone()) {
Ok(watcher) => {
info!("FileWatchStrategy: using native OS watcher");
Ok(FileWatchHandle::Native(watcher))
}
Err(e) => {
warn!(
%e,
interval_secs = poll_interval_secs,
"FileWatchStrategy: native watcher unavailable, falling back to mtime polling"
);
let join = spawn_polling_watcher(paths, poll_interval_secs, tx);
Ok(FileWatchHandle::Polling(join))
}
}
}
}
}

/// Build and configure an OS-native `RecommendedWatcher`.
fn create_native_watcher(
paths: Vec<PathBuf>,
tx: mpsc::UnboundedSender<notify::Result<notify::Event>>,
) -> anyhow::Result<notify::RecommendedWatcher> {
let mut watcher = notify::RecommendedWatcher::new(
move |res: notify::Result<notify::Event>| {
let _ = tx.send(res);
},
notify::Config::default(),
)
.map_err(|e| anyhow::anyhow!("Failed to create filesystem watcher: {}", e))?;

for path in &paths {
watcher
.watch(path, notify::RecursiveMode::Recursive)
.map_err(|e| anyhow::anyhow!("Failed to watch path {:?}: {}", path, e))?;
}
Ok(watcher)
}

/// Returns `true` if the event passes the kind and pattern filters.
fn matches_event(&self, event: &notify::Event) -> bool {
// Filter by event kind.
Expand Down