From 4b30084c16c79ae64eff906198cc140108d6ee34 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 16:24:34 +0200 Subject: [PATCH 01/19] Add `Watch::run_with_hooks` --- src/lib.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 51b9a06..dfe2521 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -297,7 +297,27 @@ impl Watch { /// command when changes are detected. /// /// Workspace's `target` directory and hidden paths are excluded by default. - pub fn run(mut self, commands: impl Into) -> Result<()> { + pub fn run(self, commands: impl Into) -> Result<()> { + self.run_with_hooks(commands, || {}, || {}) + } + + /// Like [`run`], but calls `on_start` and `on_finish` around each command-batch execution. + /// + /// `on_start` is called in the build thread immediately before the first command in the batch + /// is spawned. `on_finish` is called in the same thread immediately after the last command has + /// exited (whether it succeeded or not). + /// + /// Workspace's `target` directory and hidden paths are excluded by default. + pub fn run_with_hooks( + mut self, + commands: impl Into, + on_start: F, + on_finish: G, + ) -> Result<()> + where + F: Fn() + Send + Sync + 'static, + G: Fn() + Send + Sync + 'static, + { let metadata = metadata(); let list = commands.into(); @@ -357,13 +377,19 @@ impl Watch { } } + let on_start = Arc::new(on_start); + let on_finish = Arc::new(on_finish); + let mut current_child = SharedChild::new(); loop { { log::info!("Re-running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); + let on_start = Arc::clone(&on_start); + let on_finish = Arc::clone(&on_finish); thread::spawn(move || { + on_start(); let mut status = ExitStatus::default(); list.spawn(|res| match res { Err(err) => { @@ -377,6 +403,7 @@ impl Watch { status.success() } }); + on_finish(); if status.success() { log::info!("Command succeeded."); } else if let Some(code) = status.code() { From af2f25cbec1509049d4c77a782a49f4145d07c52 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 18:11:57 +0200 Subject: [PATCH 02/19] Replace `run_with_hooks` by `run_with_lock` --- src/lib.rs | 69 +++++++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dfe2521..912d501 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -298,26 +298,27 @@ impl Watch { /// /// Workspace's `target` directory and hidden paths are excluded by default. pub fn run(self, commands: impl Into) -> Result<()> { - self.run_with_hooks(commands, || {}, || {}) + self.run_inner(commands, None) } - /// Like [`run`], but calls `on_start` and `on_finish` around each command-batch execution. + /// Like [`run`], but executes each command batch while holding `lock`. /// - /// `on_start` is called in the build thread immediately before the first command in the batch - /// is spawned. `on_finish` is called in the same thread immediately after the last command has - /// exited (whether it succeeded or not). + /// This can be used as a binary semaphore between two execution paths that must not overlap. /// /// Workspace's `target` directory and hidden paths are excluded by default. - pub fn run_with_hooks( + pub fn run_with_lock( + self, + commands: impl Into, + lock: Arc>, + ) -> Result<()> { + self.run_inner(commands, Some(lock)) + } + + fn run_inner( mut self, commands: impl Into, - on_start: F, - on_finish: G, - ) -> Result<()> - where - F: Fn() + Send + Sync + 'static, - G: Fn() + Send + Sync + 'static, - { + lock: Option>>, + ) -> Result<()> { let metadata = metadata(); let list = commands.into(); @@ -377,33 +378,37 @@ impl Watch { } } - let on_start = Arc::new(on_start); - let on_finish = Arc::new(on_finish); - let mut current_child = SharedChild::new(); loop { { log::info!("Re-running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); - let on_start = Arc::clone(&on_start); - let on_finish = Arc::clone(&on_finish); + let lock = lock.as_ref().map(Arc::clone); thread::spawn(move || { - on_start(); let mut status = ExitStatus::default(); - list.spawn(|res| match res { - Err(err) => { - log::error!("Could not execute command: {err}"); - false - } - Ok(child) => { - log::trace!("new child: {}", child.id()); - current_child.replace(child); - status = current_child.wait(); - status.success() - } - }); - on_finish(); + let mut run_batch = || { + list.spawn(|res| match res { + Err(err) => { + log::error!("Could not execute command: {err}"); + false + } + Ok(child) => { + log::trace!("new child: {}", child.id()); + current_child.replace(child); + status = current_child.wait(); + status.success() + } + }); + }; + + if let Some(lock) = lock { + let _guard = lock.lock().expect("not poisoned"); + run_batch(); + } else { + run_batch(); + } + if status.success() { log::info!("Command succeeded."); } else if let Some(code) = status.code() { From b3d3d152c3a3f832b74a8e489c53c5b7eff37909 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 20:13:33 +0200 Subject: [PATCH 03/19] Improve lock API --- src/lib.rs | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 912d501..e59af0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,7 +167,7 @@ use std::{ env, io, path::{Path, PathBuf}, process::{Child, Command, ExitStatus}, - sync::{Arc, Mutex, mpsc}, + sync::{Arc, Mutex, MutexGuard, PoisonError, mpsc}, thread, time::{Duration, Instant}, }; @@ -306,19 +306,11 @@ impl Watch { /// This can be used as a binary semaphore between two execution paths that must not overlap. /// /// Workspace's `target` directory and hidden paths are excluded by default. - pub fn run_with_lock( - self, - commands: impl Into, - lock: Arc>, - ) -> Result<()> { + pub fn run_with_lock(self, commands: impl Into, lock: Lock) -> Result<()> { self.run_inner(commands, Some(lock)) } - fn run_inner( - mut self, - commands: impl Into, - lock: Option>>, - ) -> Result<()> { + fn run_inner(mut self, commands: impl Into, lock: Option) -> Result<()> { let metadata = metadata(); let list = commands.into(); @@ -384,7 +376,7 @@ impl Watch { log::info!("Re-running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); - let lock = lock.as_ref().map(Arc::clone); + let lock = lock.clone(); thread::spawn(move || { let mut status = ExitStatus::default(); let mut run_batch = || { @@ -403,7 +395,7 @@ impl Watch { }; if let Some(lock) = lock { - let _guard = lock.lock().expect("not poisoned"); + let _guard = lock.lock(); run_batch(); } else { run_batch(); @@ -707,6 +699,27 @@ impl CommandList { } } +/// A synchronization primitive shared between watch-driven command execution and +/// external code that must not run concurrently. +/// +/// Clone this type to share the same lock across threads/components. +#[derive(Clone, Debug, Default)] +pub struct Lock(Arc>); + +impl Lock { + /// Create a new lock instance. + pub fn new() -> Self { + Self::default() + } + + /// Acquire the lock, blocking the current thread until it is available. + /// + /// The lock is released when the returned guard is dropped. + pub fn lock(&self) -> Result, PoisonError>> { + self.0.lock() + } +} + #[cfg(test)] mod test { use super::*; From 9f19a6a18cc02799d523e0a840efca70c51bf442 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 20:37:36 +0200 Subject: [PATCH 04/19] Clean up --- src/lib.rs | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e59af0b..6925654 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -379,28 +379,24 @@ impl Watch { let lock = lock.clone(); thread::spawn(move || { let mut status = ExitStatus::default(); - let mut run_batch = || { - list.spawn(|res| match res { - Err(err) => { - log::error!("Could not execute command: {err}"); - false - } - Ok(child) => { - log::trace!("new child: {}", child.id()); - current_child.replace(child); - status = current_child.wait(); - status.success() - } - }); - }; if let Some(lock) = lock { let _guard = lock.lock(); - run_batch(); - } else { - run_batch(); } + list.spawn(|res| match res { + Err(err) => { + log::error!("Could not execute command: {err}"); + false + } + Ok(child) => { + log::trace!("new child: {}", child.id()); + current_child.replace(child); + status = current_child.wait(); + status.success() + } + }); + if status.success() { log::info!("Command succeeded."); } else if let Some(code) = status.code() { From f64c1641c54aede88618e1ded44fafeaa4fcfef9 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 23:10:48 +0200 Subject: [PATCH 05/19] Replace `Mutex` by `RwLock` --- src/lib.rs | 55 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6925654..25e19e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,7 +167,7 @@ use std::{ env, io, path::{Path, PathBuf}, process::{Child, Command, ExitStatus}, - sync::{Arc, Mutex, MutexGuard, PoisonError, mpsc}, + sync::{Arc, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc}, thread, time::{Duration, Instant}, }; @@ -380,22 +380,32 @@ impl Watch { thread::spawn(move || { let mut status = ExitStatus::default(); - if let Some(lock) = lock { - let _guard = lock.lock(); - } + let mut run_batch = || { + list.spawn(|res| match res { + Err(err) => { + log::error!("Could not execute command: {err}"); + false + } + Ok(child) => { + log::trace!("new child: {}", child.id()); + current_child.replace(child); + status = current_child.wait(); + status.success() + } + }); + }; - list.spawn(|res| match res { - Err(err) => { - log::error!("Could not execute command: {err}"); - false - } - Ok(child) => { - log::trace!("new child: {}", child.id()); - current_child.replace(child); - status = current_child.wait(); - status.success() + if let Some(lock) = lock { + match lock.write() { + Ok(_guard) => run_batch(), + Err(err) => { + log::error!("could not acquire write lock: {err}"); + return; + } } - }); + } else { + run_batch(); + } if status.success() { log::info!("Command succeeded."); @@ -700,7 +710,7 @@ impl CommandList { /// /// Clone this type to share the same lock across threads/components. #[derive(Clone, Debug, Default)] -pub struct Lock(Arc>); +pub struct Lock(Arc>); impl Lock { /// Create a new lock instance. @@ -708,11 +718,18 @@ impl Lock { Self::default() } - /// Acquire the lock, blocking the current thread until it is available. + /// Acquire a shared read lock, blocking the current thread until it is available. + /// + /// Multiple readers may hold the lock concurrently as long as no writer holds it. + pub fn read(&self) -> Result, PoisonError>> { + self.0.read() + } + + /// Acquire an exclusive write lock, blocking the current thread until it is available. /// /// The lock is released when the returned guard is dropped. - pub fn lock(&self) -> Result, PoisonError>> { - self.0.lock() + pub fn write(&self) -> Result, PoisonError>> { + self.0.write() } } From 4b96dd4015cbb52eaa2ecb1ee1d3026cf89d0034 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Thu, 23 Apr 2026 23:23:22 +0200 Subject: [PATCH 06/19] Update documentation --- src/lib.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 25e19e1..e556217 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -301,9 +301,11 @@ impl Watch { self.run_inner(commands, None) } - /// Like [`run`], but executes each command batch while holding `lock`. + /// Like [`run`], but executes each command batch while holding a write lock on `lock`. /// - /// This can be used as a binary semaphore between two execution paths that must not overlap. + /// This allows external code to coordinate with the watch loop using the same [`Lock`]: + /// - command batches acquire a write lock (exclusive), + /// - external readers can acquire a read lock and will block will a command batch is running. /// /// Workspace's `target` directory and hidden paths are excluded by default. pub fn run_with_lock(self, commands: impl Into, lock: Lock) -> Result<()> { @@ -705,8 +707,8 @@ impl CommandList { } } -/// A synchronization primitive shared between watch-driven command execution and -/// external code that must not run concurrently. +/// Shared reader/writer synchronization primitive used to coordinate watch-driven +/// command execution with external code. /// /// Clone this type to share the same lock across threads/components. #[derive(Clone, Debug, Default)] @@ -718,16 +720,16 @@ impl Lock { Self::default() } - /// Acquire a shared read lock, blocking the current thread until it is available. + /// Acquire a shared read lock, blocking until no writer holds the lock. /// - /// Multiple readers may hold the lock concurrently as long as no writer holds it. + /// Multiple readers may hold this lock concurrently. pub fn read(&self) -> Result, PoisonError>> { self.0.read() } - /// Acquire an exclusive write lock, blocking the current thread until it is available. + /// Acquire an exclusive write lock, blocking until all readers/writers release it. /// - /// The lock is released when the returned guard is dropped. + /// Use this for operations that mutate shared state (e.g. rebuild output files). pub fn write(&self) -> Result, PoisonError>> { self.0.write() } From c6694408980869215617900a91a84a9e7890fe98 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Fri, 24 Apr 2026 03:29:30 +0200 Subject: [PATCH 07/19] Rename `Lock` in `WatchLock` --- src/lib.rs | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e556217..d58a5c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -235,6 +235,8 @@ pub struct Watch { exclude_globs: Vec, #[clap(skip)] workspace_exclude_globs: Vec, + #[clap(skip)] + watch_lock: Option, } impl Watch { @@ -287,6 +289,14 @@ impl Watch { self } + /// Return the shared lock for this watcher, creating it if it does not exist yet. + /// + /// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with + /// watch-driven command execution. + pub fn lock(&mut self) -> WatchLock { + self.watch_lock.get_or_insert_with(WatchLock::new).clone() + } + /// Set the debounce duration after relaunching the command. pub fn debounce(mut self, duration: Duration) -> Self { self.debounce = duration; @@ -297,22 +307,7 @@ impl Watch { /// command when changes are detected. /// /// Workspace's `target` directory and hidden paths are excluded by default. - pub fn run(self, commands: impl Into) -> Result<()> { - self.run_inner(commands, None) - } - - /// Like [`run`], but executes each command batch while holding a write lock on `lock`. - /// - /// This allows external code to coordinate with the watch loop using the same [`Lock`]: - /// - command batches acquire a write lock (exclusive), - /// - external readers can acquire a read lock and will block will a command batch is running. - /// - /// Workspace's `target` directory and hidden paths are excluded by default. - pub fn run_with_lock(self, commands: impl Into, lock: Lock) -> Result<()> { - self.run_inner(commands, Some(lock)) - } - - fn run_inner(mut self, commands: impl Into, lock: Option) -> Result<()> { + pub fn run(mut self, commands: impl Into) -> Result<()> { let metadata = metadata(); let list = commands.into(); @@ -378,7 +373,7 @@ impl Watch { log::info!("Re-running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); - let lock = lock.clone(); + let lock = self.watch_lock.clone(); thread::spawn(move || { let mut status = ExitStatus::default(); @@ -712,9 +707,9 @@ impl CommandList { /// /// Clone this type to share the same lock across threads/components. #[derive(Clone, Debug, Default)] -pub struct Lock(Arc>); +pub struct WatchLock(Arc>); -impl Lock { +impl WatchLock { /// Create a new lock instance. pub fn new() -> Self { Self::default() From c79ed5cd57048a17af7355a7ed587c34eaac5403 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Fri, 24 Apr 2026 13:38:26 +0200 Subject: [PATCH 08/19] After review --- src/lib.rs | 65 +++++++++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d58a5c6..e136684 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -167,7 +167,7 @@ use std::{ env, io, path::{Path, PathBuf}, process::{Child, Command, ExitStatus}, - sync::{Arc, Mutex, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc}, + sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc}, thread, time::{Duration, Instant}, }; @@ -236,7 +236,7 @@ pub struct Watch { #[clap(skip)] workspace_exclude_globs: Vec, #[clap(skip)] - watch_lock: Option, + watch_lock: WatchLock, } impl Watch { @@ -289,12 +289,13 @@ impl Watch { self } - /// Return the shared lock for this watcher, creating it if it does not exist yet. + /// Return the shared lock used by this watcher. /// /// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with /// watch-driven command execution. - pub fn lock(&mut self) -> WatchLock { - self.watch_lock.get_or_insert_with(WatchLock::new).clone() + #[must_use = "store and share the lock with readers that must coordinate with rebuilds"] + pub fn lock(&self) -> WatchLock { + self.watch_lock.clone() } /// Set the debounce duration after relaunching the command. @@ -392,17 +393,8 @@ impl Watch { }); }; - if let Some(lock) = lock { - match lock.write() { - Ok(_guard) => run_batch(), - Err(err) => { - log::error!("could not acquire write lock: {err}"); - return; - } - } - } else { - run_batch(); - } + let _guard = lock.write(); + run_batch(); if status.success() { log::info!("Command succeeded."); @@ -702,31 +694,38 @@ impl CommandList { } } -/// Shared reader/writer synchronization primitive used to coordinate watch-driven -/// command execution with external code. +/// Guard returned by [`WatchLock::acquire`]. /// -/// Clone this type to share the same lock across threads/components. +/// Keep this value alive for the duration of the protected read section. +/// The lock is released automatically when the guard is dropped. +pub struct WatchLockGuard<'a> { + _guard: RwLockReadGuard<'a, ()>, +} + +/// A lock handle used to coordinate file reads with watch-driven rebuilds. +/// +/// Obtain it from [`Watch::lock`], clone it, and call [`WatchLock::acquire`] while +/// reading files that must not race with rebuild writes. #[derive(Clone, Debug, Default)] pub struct WatchLock(Arc>); impl WatchLock { - /// Create a new lock instance. - pub fn new() -> Self { - Self::default() - } - - /// Acquire a shared read lock, blocking until no writer holds the lock. + /// Acquire shared access to the protected section. /// - /// Multiple readers may hold this lock concurrently. - pub fn read(&self) -> Result, PoisonError>> { - self.0.read() + /// Multiple readers may hold this guard concurrently. + pub fn acquire(&self) -> WatchLockGuard<'_> { + WatchLockGuard { + _guard: self + .0 + .read() + .expect("watch lock poisoned while acquiring read access"), + } } - /// Acquire an exclusive write lock, blocking until all readers/writers release it. - /// - /// Use this for operations that mutate shared state (e.g. rebuild output files). - pub fn write(&self) -> Result, PoisonError>> { - self.0.write() + fn write(&self) -> RwLockWriteGuard<'_, ()> { + self.0 + .write() + .expect("watch lock poisoned while acquiring write access") } } From 06025aaffdb92017a4f40ac4b1bb2be612e8639a Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Fri, 24 Apr 2026 17:22:08 +0200 Subject: [PATCH 09/19] Remove closure --- src/lib.rs | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e136684..55b0e04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -378,23 +378,19 @@ impl Watch { thread::spawn(move || { let mut status = ExitStatus::default(); - let mut run_batch = || { - list.spawn(|res| match res { - Err(err) => { - log::error!("Could not execute command: {err}"); - false - } - Ok(child) => { - log::trace!("new child: {}", child.id()); - current_child.replace(child); - status = current_child.wait(); - status.success() - } - }); - }; - let _guard = lock.write(); - run_batch(); + list.spawn(|res| match res { + Err(err) => { + log::error!("Could not execute command: {err}"); + false + } + Ok(child) => { + log::trace!("new child: {}", child.id()); + current_child.replace(child); + status = current_child.wait(); + status.success() + } + }); if status.success() { log::info!("Command succeeded."); From 53bf865fb61464dd3ead53c24631389638de70d7 Mon Sep 17 00:00:00 2001 From: yozhgoor Date: Sat, 25 Apr 2026 12:07:54 +0200 Subject: [PATCH 10/19] Add `WatchMsg` to differentiate file changes and build statuses --- src/lib.rs | 92 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 55b0e04..35b9cd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,11 +350,11 @@ impl Watch { }) .collect::, _>>()?; - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel::(); let handler = WatchEventHandler { watch: self.clone(), - tx, + tx: tx.clone(), command_start: Instant::now(), }; @@ -369,46 +369,81 @@ impl Watch { } let mut current_child = SharedChild::new(); + let mut guard = Some(self.watch_lock.write()); + let mut dirty = true; + let mut running = false; + loop { - { + if dirty && !running { log::info!("Re-running command"); + + if guard.is_none() { + guard = Some(self.watch_lock.write()); + } + + dirty = false; + running = true; + + let tx_build = tx.clone(); let mut current_child = current_child.clone(); let mut list = list.clone(); - let lock = self.watch_lock.clone(); + thread::spawn(move || { - let mut status = ExitStatus::default(); + let mut success = true; - let _guard = lock.write(); list.spawn(|res| match res { Err(err) => { log::error!("Could not execute command: {err}"); + success = false; false } Ok(child) => { log::trace!("new child: {}", child.id()); current_child.replace(child); - status = current_child.wait(); - status.success() + + let status = current_child.wait(); + if status.success() { + true + } else { + if let Some(code) = status.code() { + log::error!("Command failed (exit code: {code})"); + } else { + log::error!("Command failed."); + } + success = false; + false + } } }); - if status.success() { - log::info!("Command succeeded."); - } else if let Some(code) = status.code() { - log::error!("Command failed (exit code: {code})"); - } else { - log::error!("Command failed."); - } + let _ = tx_build.send(WatchMsg::BuildDone { success }); }); } - let res = rx.recv(); - if res.is_ok() { - log::trace!("Changes detected, re-generating"); - } - current_child.terminate(); - if res.is_err() { - break; + match rx.recv() { + Ok(WatchMsg::FsChange) => { + log::trace!("Changes detected, re-generating"); + dirty = true; + + if guard.is_none() { + guard = Some(self.watch_lock.write()); + } + + current_child.terminate(); + } + Ok(WatchMsg::BuildDone { success }) => { + running = false; + + if success { + if dirty { + log::trace!("Changes detected during build, re-generating"); + } else { + log::info!("Command succeeded."); + guard = None; + } + } + } + Err(_) => break, } } @@ -527,17 +562,24 @@ impl Watch { } } +#[derive(Debug)] struct WatchEventHandler { watch: Watch, - tx: mpsc::Sender<()>, + tx: mpsc::Sender, command_start: Instant, } +#[derive(Debug)] +enum WatchMsg { + FsChange, + BuildDone { success: bool }, +} + impl EventHandler for WatchEventHandler { fn handle_event(&mut self, event: Result) { match event { Ok(event) => { - if (event.kind.is_modify() || event.kind.is_create() || event.kind.is_create()) + if (event.kind.is_modify() || event.kind.is_create()) && event.paths.iter().any(|x| { !self.watch.is_excluded_path(x) && x.exists() @@ -549,7 +591,7 @@ impl EventHandler for WatchEventHandler { log::trace!("Changes detected in {event:?}"); self.command_start = Instant::now(); - self.tx.send(()).expect("can send"); + self.tx.send(WatchMsg::FsChange).expect("can send"); } else { log::trace!("Ignoring changes in {event:?}"); } From b1c881a0a818c837ec7bda0456af8a40ccf34a5a Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 12:49:33 +0200 Subject: [PATCH 11/19] CLEANUP --- src/lib.rs | 104 +++++++++++++++++++++-------------------------------- 1 file changed, 40 insertions(+), 64 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 35b9cd9..d0f0613 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,7 +162,7 @@ use anyhow::{Context, Result}; use clap::Parser; use glob::Pattern; use lazy_static::lazy_static; -use notify::{Event, EventHandler, RecursiveMode, Watcher}; +use notify::Watcher as _; use std::{ env, io, path::{Path, PathBuf}, @@ -350,7 +350,7 @@ impl Watch { }) .collect::, _>>()?; - let (tx, rx) = mpsc::channel::(); + let (tx, rx) = mpsc::channel(); let handler = WatchEventHandler { watch: self.clone(), @@ -362,88 +362,65 @@ impl Watch { notify::recommended_watcher(handler).context("could not initialize watcher")?; for path in &self.watch_paths { - match watcher.watch(path, RecursiveMode::Recursive) { + match watcher.watch(path, notify::RecursiveMode::Recursive) { Ok(()) => log::trace!("Watching {}", path.display()), Err(err) => log::error!("cannot watch {}: {err}", path.display()), } } let mut current_child = SharedChild::new(); - let mut guard = Some(self.watch_lock.write()); - let mut dirty = true; - let mut running = false; - + let mut lock_guard = None; loop { - if dirty && !running { - log::info!("Re-running command"); - - if guard.is_none() { - guard = Some(self.watch_lock.write()); + { + if lock_guard.is_none() { + log::info!("Running command"); + lock_guard.replace(self.watch_lock.write()); + } else { + log::info!("Re-running command"); } - dirty = false; - running = true; - - let tx_build = tx.clone(); let mut current_child = current_child.clone(); let mut list = list.clone(); - + let tx = tx.clone(); thread::spawn(move || { - let mut success = true; + let mut status = ExitStatus::default(); list.spawn(|res| match res { Err(err) => { log::error!("Could not execute command: {err}"); - success = false; false } Ok(child) => { - log::trace!("new child: {}", child.id()); + log::trace!("Child spawned PID: {}", child.id()); current_child.replace(child); - - let status = current_child.wait(); - if status.success() { - true - } else { - if let Some(code) = status.code() { - log::error!("Command failed (exit code: {code})"); - } else { - log::error!("Command failed."); - } - success = false; - false - } + status = current_child.wait(); + status.success() } }); - let _ = tx_build.send(WatchMsg::BuildDone { success }); + if status.success() { + log::info!("Command succeeded."); + tx.send(Event::CommandSucceded).expect("can send"); + } else if let Some(code) = status.code() { + log::error!("Command failed (exit code: {code})"); + } else { + log::error!("Command failed."); + } }); } match rx.recv() { - Ok(WatchMsg::FsChange) => { + Ok(Event::ChangeDetected) => { log::trace!("Changes detected, re-generating"); - dirty = true; - - if guard.is_none() { - guard = Some(self.watch_lock.write()); - } - current_child.terminate(); } - Ok(WatchMsg::BuildDone { success }) => { - running = false; - - if success { - if dirty { - log::trace!("Changes detected during build, re-generating"); - } else { - log::info!("Command succeeded."); - guard = None; - } - } + Ok(Event::CommandSucceded) => { + lock_guard.take(); + } + Err(_) => { + current_child.terminate(); + break; } - Err(_) => break, } } @@ -562,21 +539,14 @@ impl Watch { } } -#[derive(Debug)] struct WatchEventHandler { watch: Watch, - tx: mpsc::Sender, + tx: mpsc::Sender, command_start: Instant, } -#[derive(Debug)] -enum WatchMsg { - FsChange, - BuildDone { success: bool }, -} - -impl EventHandler for WatchEventHandler { - fn handle_event(&mut self, event: Result) { +impl notify::EventHandler for WatchEventHandler { + fn handle_event(&mut self, event: Result) { match event { Ok(event) => { if (event.kind.is_modify() || event.kind.is_create()) @@ -591,7 +561,7 @@ impl EventHandler for WatchEventHandler { log::trace!("Changes detected in {event:?}"); self.command_start = Instant::now(); - self.tx.send(WatchMsg::FsChange).expect("can send"); + self.tx.send(Event::ChangeDetected).expect("can send"); } else { log::trace!("Ignoring changes in {event:?}"); } @@ -767,6 +737,12 @@ impl WatchLock { } } +#[derive(Debug)] +enum Event { + CommandSucceded, + ChangeDetected, +} + #[cfg(test)] mod test { use super::*; From 2d369d49a708a14a64dd5e200debe52332838161 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 13:38:01 +0200 Subject: [PATCH 12/19] Avoid building if the command succeeded --- src/lib.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d0f0613..f7bd32e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -369,16 +369,10 @@ impl Watch { } let mut current_child = SharedChild::new(); - let mut lock_guard = None; + let mut lock_guard = Some(self.watch_lock.write()); loop { - { - if lock_guard.is_none() { - log::info!("Running command"); - lock_guard.replace(self.watch_lock.write()); - } else { - log::info!("Re-running command"); - } - + if lock_guard.is_some() { + log::info!("Running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); let tx = tx.clone(); @@ -413,6 +407,7 @@ impl Watch { Ok(Event::ChangeDetected) => { log::trace!("Changes detected, re-generating"); current_child.terminate(); + lock_guard = lock_guard.or_else(|| Some(self.watch_lock.write())); } Ok(Event::CommandSucceded) => { lock_guard.take(); From 39f0033ae787579b7043282b852ae7dac385867a Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 13:44:13 +0200 Subject: [PATCH 13/19] CLEANUP --- src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index f7bd32e..b836d69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -407,7 +407,9 @@ impl Watch { Ok(Event::ChangeDetected) => { log::trace!("Changes detected, re-generating"); current_child.terminate(); - lock_guard = lock_guard.or_else(|| Some(self.watch_lock.write())); + if lock_guard.is_none() { + lock_guard = Some(self.watch_lock.write()); + } } Ok(Event::CommandSucceded) => { lock_guard.take(); From 7a9ac4691ca55be3416461893588afbf7deecbc7 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 13:45:18 +0200 Subject: [PATCH 14/19] CLEANUP --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b836d69..24556e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -406,10 +406,10 @@ impl Watch { match rx.recv() { Ok(Event::ChangeDetected) => { log::trace!("Changes detected, re-generating"); - current_child.terminate(); if lock_guard.is_none() { lock_guard = Some(self.watch_lock.write()); } + current_child.terminate(); } Ok(Event::CommandSucceded) => { lock_guard.take(); From ba2292068daef63aec98c0fd99c73e4c1353ca30 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 17:23:29 +0200 Subject: [PATCH 15/19] Fix race condition with generation counter for WatchLock --- src/lib.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 24556e1..b8c6bb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -370,12 +370,14 @@ impl Watch { let mut current_child = SharedChild::new(); let mut lock_guard = Some(self.watch_lock.write()); + let mut generation: u64 = 0; loop { if lock_guard.is_some() { log::info!("Running command"); let mut current_child = current_child.clone(); let mut list = list.clone(); let tx = tx.clone(); + let build_id = generation; thread::spawn(move || { let mut status = ExitStatus::default(); @@ -394,7 +396,8 @@ impl Watch { if status.success() { log::info!("Command succeeded."); - tx.send(Event::CommandSucceded).expect("can send"); + tx.send(Event::CommandSucceeded(build_id)) + .expect("can send"); } else if let Some(code) = status.code() { log::error!("Command failed (exit code: {code})"); } else { @@ -409,11 +412,17 @@ impl Watch { if lock_guard.is_none() { lock_guard = Some(self.watch_lock.write()); } + generation += 1; current_child.terminate(); } - Ok(Event::CommandSucceded) => { + Ok(Event::CommandSucceeded(build_id)) if build_id == generation => { lock_guard.take(); } + Ok(Event::CommandSucceeded(build_id)) => { + log::trace!( + "Ignoring stale success from build {build_id} (current: {generation})" + ); + } Err(_) => { current_child.terminate(); break; @@ -736,7 +745,7 @@ impl WatchLock { #[derive(Debug)] enum Event { - CommandSucceded, + CommandSucceeded(u64), ChangeDetected, } From 676b1bd5b823ea5e0dfee534e88a1fab92059b03 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 17:27:01 +0200 Subject: [PATCH 16/19] Document that Watch::run() holds the write lock immediately on start --- src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index b8c6bb6..b052890 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -293,6 +293,15 @@ impl Watch { /// /// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with /// watch-driven command execution. + /// + /// # Lock lifecycle + /// + /// [`run`](Self::run) acquires the **write lock immediately** when it is called — before the + /// first command is even spawned. Any code that calls [`WatchLock::acquire`] will therefore + /// block until the first build completes. This is intentional: it prevents readers from + /// observing an empty or incomplete dist directory before the initial build has finished. + /// The write lock is then re-acquired on every subsequent rebuild and released once the + /// command sequence succeeds. #[must_use = "store and share the lock with readers that must coordinate with rebuilds"] pub fn lock(&self) -> WatchLock { self.watch_lock.clone() From d8104fbc3a0fd9513d3bfea98e6cdda2ac14dbb3 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 17:47:23 +0200 Subject: [PATCH 17/19] Update stale version in crate-level doc example --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b052890..dc29831 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ //! //! ```toml //! [dependencies] -//! xtask-watch = "0.1.0" +//! xtask-watch = "0.3" //! ``` //! //! # Examples From 1a3d1afe2bd4d33abb8295cda2ca63a4fa074e26 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 17:47:40 +0200 Subject: [PATCH 18/19] Add WatchLock discoverability note to Watch struct doc --- src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index dc29831..9cca742 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -200,6 +200,9 @@ pub fn xtask_command() -> Command { /// Watches over your project's source code, relaunching a given command when /// changes are detected. +/// +/// Use [`Watch::lock`] to obtain a [`WatchLock`] that can be shared with external +/// code (e.g. an HTTP server) to coordinate reads with ongoing rebuilds. #[non_exhaustive] #[derive(Clone, Debug, Default, Parser)] #[clap(about = "Watches over your project's source code.")] From 393d3da3023c21a6902f2381b782045d95c8d337 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Sat, 25 Apr 2026 17:55:04 +0200 Subject: [PATCH 19/19] Document lock poisoning handling: unwrap_or_else for WatchLock, clarify Mutex invariants --- src/lib.rs | 54 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9cca742..08c0f54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -325,7 +325,10 @@ impl Watch { let list = commands.into(); { - let mut commands = list.commands.lock().expect("not poisoned"); + let mut commands = list + .commands + .lock() + .expect("no panic-prone code runs while this lock is held"); commands.extend(self.shell_commands.iter().map(|x| { let mut command = @@ -602,12 +605,18 @@ impl SharedChild { } fn replace(&mut self, child: impl Into>) { - *self.child.lock().expect("not poisoned") = child.into(); + *self + .child + .lock() + .expect("no panic-prone code runs while this lock is held") = child.into(); } fn wait(&mut self) -> ExitStatus { loop { - let mut child = self.child.lock().expect("not poisoned"); + let mut child = self + .child + .lock() + .expect("no panic-prone code runs while this lock is held"); match child.as_mut().map(|child| child.try_wait()) { Some(Ok(Some(status))) => { break status; @@ -628,7 +637,12 @@ impl SharedChild { } fn terminate(&mut self) { - if let Some(child) = self.child.lock().expect("not poisoned").as_mut() { + if let Some(child) = self + .child + .lock() + .expect("no panic-prone code runs while this lock is held") + .as_mut() + { #[cfg(unix)] { let killing_start = Instant::now(); @@ -693,14 +707,22 @@ impl From<[Command; SIZE]> for CommandList { impl CommandList { /// Returns `true` if the list is empty. pub fn is_empty(&self) -> bool { - self.commands.lock().expect("not poisoned").is_empty() + self.commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .is_empty() } /// Spawn each command of the list one after the other. /// /// The caller is responsible to wait the commands. pub fn spawn(&mut self, mut callback: impl FnMut(io::Result) -> bool) { - for process in self.commands.lock().expect("not poisoned").iter_mut() { + for process in self + .commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .iter_mut() + { if !callback(process.spawn()) { break; } @@ -710,7 +732,12 @@ impl CommandList { /// Run all the commands sequentially using [`std::process::Command::status`] and stop at the /// first failure. pub fn status(&mut self) -> io::Result { - for process in self.commands.lock().expect("not poisoned").iter_mut() { + for process in self + .commands + .lock() + .expect("no panic-prone code runs while this lock is held") + .iter_mut() + { let exit_status = process.status()?; if !exit_status.success() { return Ok(exit_status); @@ -741,17 +768,16 @@ impl WatchLock { /// Multiple readers may hold this guard concurrently. pub fn acquire(&self) -> WatchLockGuard<'_> { WatchLockGuard { - _guard: self - .0 - .read() - .expect("watch lock poisoned while acquiring read access"), + // The inner value is `()` — there is no data to corrupt, so we can + // always recover from a poisoned lock. + _guard: self.0.read().unwrap_or_else(|e| e.into_inner()), } } fn write(&self) -> RwLockWriteGuard<'_, ()> { - self.0 - .write() - .expect("watch lock poisoned while acquiring write access") + // The inner value is `()` — there is no data to corrupt, so we can + // always recover from a poisoned lock. + self.0.write().unwrap_or_else(|e| e.into_inner()) } }