From e4b6505413383f871cb84b87e1e6025faf529920 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 3 Jun 2026 22:01:41 +0200 Subject: [PATCH 1/6] state: transactional update/create API and per-VM lock Hold the exclusive flock across the whole read-modify-write so concurrent processes can't lose each other's updates. Add update/update_with for read-modify-write, create for write-once, and lock_vm for serializing whole lifecycle commands on one VM name. write remains a thin internal shim until all callers migrate. --- crates/ember-core/src/error.rs | 4 + crates/ember-core/src/state/store.rs | 363 +++++++++++++++++++++++++-- 2 files changed, 347 insertions(+), 20 deletions(-) diff --git a/crates/ember-core/src/error.rs b/crates/ember-core/src/error.rs index 3307a7f..4e94918 100644 --- a/crates/ember-core/src/error.rs +++ b/crates/ember-core/src/error.rs @@ -49,6 +49,10 @@ pub enum Error { #[error("state: {0}")] State(String), + /// A state file already exists where a fresh create was expected. + #[error("{path}: already exists")] + AlreadyExists { path: PathBuf }, + /// Storage pool error (dm-thin / btrfs / ZFS pool-level state, as /// distinct from individual volume / dataset errors). #[error("storage pool: {0}")] diff --git a/crates/ember-core/src/state/store.rs b/crates/ember-core/src/state/store.rs index 91f3d11..c14528b 100644 --- a/crates/ember-core/src/state/store.rs +++ b/crates/ember-core/src/state/store.rs @@ -4,6 +4,13 @@ //! Shared locks (`LOCK_SH`) for concurrent readers, exclusive locks //! (`LOCK_EX`) for writers. Writes use temp file + `rename()` for //! atomicity — readers never see partial data. +//! +//! Mutations go through [`StateStore::update`] / [`StateStore::update_with`] +//! (read-modify-write) or [`StateStore::create`] (write-once). These hold a +//! single exclusive lock across the whole transaction, so concurrent +//! processes cannot lose each other's updates. There is deliberately no +//! fire-and-forget `write`: an unlocked read-then-write would reopen the +//! lost-update window these methods exist to close. use std::fs::{self, File, OpenOptions}; use std::path::{Path, PathBuf}; @@ -73,6 +80,26 @@ impl StateStore { source: e, })?; } + + // Pre-create companion lock files for the shared, append-heavy + // state files so concurrent readers take a shared lock from the + // first access rather than the lock-free path (see `FileLock::shared`). + for data in [ + self.config_path(), + self.image_registry_path(), + self.network_allocations_path(), + ] { + let lock = lock_path_for(&data); + OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&lock) + .map_err(|e| Error::Io { + path: lock, + source: e, + })?; + } Ok(()) } @@ -111,18 +138,59 @@ impl StateStore { self.root.join("kernels") } + /// Acquire the exclusive per-VM operation lock for `name`. + /// + /// Serializes whole lifecycle commands (create / start / stop / delete / + /// rename / …) for a single VM: while one process holds it, another that + /// targets the same name blocks until the first finishes. This prevents + /// interleavings the per-file content locks can't — double-start, start + /// racing delete, or two creates of the same name. Different names lock + /// independently. + /// + /// The lock is released when the returned guard is dropped. This is + /// distinct from the content locks taken inside [`update`](Self::update) / + /// [`create`](Self::create); a command holds the op lock for its whole + /// duration and takes content locks briefly within. Do not acquire it + /// twice for the same name in one process — `flock` would self-block. + /// + /// The lock file lives in a dedicated `locks/` directory, not inside the + /// per-VM directory, so deleting or renaming a VM never removes a lock + /// that another process is blocked on (which would defeat the exclusion). + /// Lock files are intentionally never removed. + pub fn lock_vm(&self, name: &str) -> Result { + let path = self.root.join("locks").join(format!("{name}.lock")); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).map_err(|e| Error::Io { + path: parent.to_path_buf(), + source: e, + })?; + } + let file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(false) + .open(&path) + .map_err(|e| Error::Io { + path: path.clone(), + source: e, + })?; + let flock = Flock::lock(file, FlockArg::LockExclusive).map_err(|(_, errno)| Error::Io { + path, + source: errno.into(), + })?; + Ok(VmOpLock { _flock: flock }) + } + /// Read and deserialize a JSON file, using a shared (read) lock. /// /// Returns an error if the file does not exist or cannot be parsed. pub fn read(&self, path: &Path) -> Result { let _lock = FileLock::shared(path)?; - - let contents = fs::read_to_string(path).map_err(|e| Error::Io { + self.read_unlocked(path)?.ok_or_else(|| Error::Io { path: path.to_path_buf(), - source: e, - })?; - - serde_json::from_str(&contents).map_err(Into::into) + source: std::io::Error::from(std::io::ErrorKind::NotFound), + }) } /// Read and deserialize a JSON file, returning `None` if it doesn't exist. @@ -130,18 +198,101 @@ impl StateStore { /// Uses a shared (read) lock. Returns an error only on I/O failures /// other than "not found" or on parse errors. pub fn read_optional(&self, path: &Path) -> Result> { - if !path.exists() { - return Ok(None); + let _lock = FileLock::shared(path)?; + self.read_unlocked(path) + } + + /// Atomically read-modify-write a JSON file under a single exclusive lock. + /// + /// The lock is held across the read, the closure, and the write, so two + /// concurrent callers serialize and neither loses the other's change. + /// Errors if the file does not exist — use [`update_with`](Self::update_with) + /// for state that materializes on first write. + /// + /// The closure must be cheap: it runs while the exclusive lock is held, + /// blocking every other reader and writer of this file. Do not perform + /// I/O, downloads, or process spawns inside it — do that work first and + /// apply only the resulting field changes here. + pub fn update(&self, path: &Path, f: impl FnOnce(&mut T) -> Result) -> Result + where + T: Serialize + DeserializeOwned, + { + let _lock = FileLock::exclusive(path)?; + let mut value: T = self + .read_unlocked(path)? + .ok_or_else(|| Error::State(format!("{}: not found", path.display())))?; + let result = f(&mut value)?; + self.write_locked(path, &value)?; + Ok(result) + } + + /// Like [`update`](Self::update), but seeds the value from `default()` + /// when the file does not exist yet. + /// + /// For shared accumulators (image registry, IP allocations) that are + /// created lazily on their first mutation. The same cheap-closure + /// contract as [`update`](Self::update) applies. + pub fn update_with( + &self, + path: &Path, + default: impl FnOnce() -> T, + f: impl FnOnce(&mut T) -> Result, + ) -> Result + where + T: Serialize + DeserializeOwned, + { + let _lock = FileLock::exclusive(path)?; + let mut value: T = self.read_unlocked(path)?.unwrap_or_else(default); + let result = f(&mut value)?; + self.write_locked(path, &value)?; + Ok(result) + } + + /// Atomically create a JSON file, failing if it already exists. + /// + /// The existence check and the write happen under one exclusive lock, so + /// two concurrent creators cannot both succeed — exactly one wins and the + /// other gets [`Error::AlreadyExists`]. Use for write-once state (the + /// global config, a VM's initial metadata). + pub fn create(&self, path: &Path, data: &T) -> Result<()> { + let _lock = FileLock::exclusive(path)?; + if path.exists() { + return Err(Error::AlreadyExists { + path: path.to_path_buf(), + }); } - self.read(path).map(Some) + self.write_locked(path, data) } - /// Serialize and write a JSON file atomically, using an exclusive lock. + /// Serialize and write a JSON file atomically, taking the exclusive lock. /// - /// Writes to a temporary file first, then renames to the target path. - /// Parent directories are created if needed. + /// Fire-and-forget overwrite, kept while callers migrate to + /// [`update`](Self::update) / [`create`](Self::create); it does not guard + /// against a lost update across a preceding read. pub fn write(&self, path: &Path, data: &T) -> Result<()> { - // Ensure parent directory exists. + let _lock = FileLock::exclusive(path)?; + self.write_locked(path, data) + } + + /// Deserialize the JSON file at `path`, returning `None` if it is absent. + /// + /// Takes no lock — the caller must already hold one. + fn read_unlocked(&self, path: &Path) -> Result> { + match fs::read_to_string(path) { + Ok(contents) => Ok(Some(serde_json::from_str(&contents)?)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(Error::Io { + path: path.to_path_buf(), + source: e, + }), + } + } + + /// Serialize `data` to `path` via temp file + atomic `rename`, creating + /// parent directories as needed. + /// + /// Takes no lock — the caller must already hold the exclusive lock. + fn write_locked(&self, path: &Path, data: &T) -> Result<()> { if let Some(parent) = path.parent() { fs::create_dir_all(parent).map_err(|e| Error::Io { path: parent.to_path_buf(), @@ -149,9 +300,7 @@ impl StateStore { })?; } - let _lock = FileLock::exclusive(path)?; - - // Write to temp file in the same directory (same filesystem for rename). + // Write to a temp file in the same directory (same filesystem for rename). let tmp_path = tmp_path_for(path); let json = serde_json::to_string_pretty(data)?; @@ -217,6 +366,12 @@ fn lock_path_for(path: &Path) -> PathBuf { PathBuf::from(lock) } +/// RAII guard for the per-VM operation lock returned by +/// [`StateStore::lock_vm`]. Releases the lock when dropped. +pub struct VmOpLock { + _flock: Flock, +} + /// RAII guard that holds an `flock` on a companion `.lock` file. /// /// The lock is released automatically when the inner `Flock` is dropped. @@ -290,6 +445,174 @@ impl FileLock { mod tests { use super::*; use std::collections::HashMap; + use std::sync::Arc; + + #[test] + fn create_rejects_existing() { + let dir = tempfile::tempdir().unwrap(); + let store = StateStore::new(dir.path().to_path_buf()); + let path = dir.path().join("once.json"); + + store.create(&path, &1u32).unwrap(); + let err = store.create(&path, &2u32).unwrap_err(); + assert!(matches!(err, Error::AlreadyExists { .. })); + + // The original value is untouched. + let loaded: u32 = store.read(&path).unwrap(); + assert_eq!(loaded, 1); + } + + #[test] + fn update_errors_on_missing() { + let dir = tempfile::tempdir().unwrap(); + let store = StateStore::new(dir.path().to_path_buf()); + let path = dir.path().join("missing.json"); + + let err = store.update(&path, |_: &mut u32| Ok(())).unwrap_err(); + assert!(matches!(err, Error::State(_))); + } + + #[test] + fn update_with_seeds_default_then_persists() { + let dir = tempfile::tempdir().unwrap(); + let store = StateStore::new(dir.path().to_path_buf()); + let path = dir.path().join("acc.json"); + + // First call materializes from the default. + store + .update_with( + &path, + || vec![1u32], + |v| { + v.push(2); + Ok(()) + }, + ) + .unwrap(); + // Second call reads back and appends. + store + .update_with(&path, Vec::new, |v| { + v.push(3); + Ok(()) + }) + .unwrap(); + + let loaded: Vec = store.read(&path).unwrap(); + assert_eq!(loaded, vec![1, 2, 3]); + } + + #[test] + fn concurrent_updates_do_not_lose_increments() { + // The core lost-update regression test: N threads each do M locked + // read-modify-write increments against the same file. Without the + // lock spanning the whole transaction, increments would be lost. + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(StateStore::new(dir.path().to_path_buf())); + store.init().unwrap(); + let path = Arc::new(dir.path().join("counter.json")); + + store.create(&path, &0u64).unwrap(); + + const THREADS: u64 = 8; + const PER_THREAD: u64 = 50; + + let handles: Vec<_> = (0..THREADS) + .map(|_| { + let store = Arc::clone(&store); + let path = Arc::clone(&path); + std::thread::spawn(move || { + for _ in 0..PER_THREAD { + store + .update(&path, |n: &mut u64| { + *n += 1; + Ok(()) + }) + .unwrap(); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + + let final_count: u64 = store.read(&path).unwrap(); + assert_eq!(final_count, THREADS * PER_THREAD); + } + + #[test] + fn concurrent_creates_have_exactly_one_winner() { + // Many threads race to create the same file; exactly one succeeds. + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(StateStore::new(dir.path().to_path_buf())); + store.init().unwrap(); + let path = Arc::new(dir.path().join("unique.json")); + + const THREADS: usize = 16; + let handles: Vec<_> = (0..THREADS) + .map(|i| { + let store = Arc::clone(&store); + let path = Arc::clone(&path); + std::thread::spawn(move || store.create(&path, &(i as u32)).is_ok()) + }) + .collect(); + + let winners = handles + .into_iter() + .map(|h| h.join().unwrap()) + .filter(|&ok| ok) + .count(); + assert_eq!(winners, 1); + } + + #[test] + fn lock_vm_serializes_same_name() { + // Two threads taking the op-lock for the same VM name must not hold it + // simultaneously. We assert mutual exclusion by counting overlaps. + use std::sync::atomic::{AtomicI32, Ordering}; + + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(StateStore::new(dir.path().to_path_buf())); + store.init().unwrap(); + + let active = Arc::new(AtomicI32::new(0)); + let max_seen = Arc::new(AtomicI32::new(0)); + + let handles: Vec<_> = (0..6) + .map(|_| { + let store = Arc::clone(&store); + let active = Arc::clone(&active); + let max_seen = Arc::clone(&max_seen); + std::thread::spawn(move || { + for _ in 0..20 { + let _guard = store.lock_vm("shared").unwrap(); + let now = active.fetch_add(1, Ordering::SeqCst) + 1; + max_seen.fetch_max(now, Ordering::SeqCst); + // Tiny critical section; any overlap would push `now` past 1. + active.fetch_sub(1, Ordering::SeqCst); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + assert_eq!(max_seen.load(Ordering::SeqCst), 1); + } + + #[test] + fn lock_vm_distinct_names_are_independent() { + // Different names must be lockable at the same time (no global lock). + let dir = tempfile::tempdir().unwrap(); + let store = StateStore::new(dir.path().to_path_buf()); + store.init().unwrap(); + + let a = store.lock_vm("alpha").unwrap(); + let b = store.lock_vm("beta").unwrap(); + drop((a, b)); + } #[test] fn tmp_path_contains_pid() { @@ -315,7 +638,7 @@ mod tests { let data: HashMap = [("key".to_string(), "value".to_string())].into(); let path = dir.path().join("test.json"); - store.write(&path, &data).unwrap(); + store.create(&path, &data).unwrap(); let loaded: HashMap = store.read(&path).unwrap(); assert_eq!(loaded, data); @@ -338,7 +661,7 @@ mod tests { let data = vec![1u32, 2, 3]; let path = dir.path().join("list.json"); - store.write(&path, &data).unwrap(); + store.create(&path, &data).unwrap(); let loaded: Option> = store.read_optional(&path).unwrap(); assert_eq!(loaded, Some(vec![1, 2, 3])); @@ -367,7 +690,7 @@ mod tests { store.remove(&path).unwrap(); // Write then remove. - store.write(&path, &"hello").unwrap(); + store.create(&path, &"hello").unwrap(); assert!(path.exists()); store.remove(&path).unwrap(); assert!(!path.exists()); @@ -396,7 +719,7 @@ mod tests { let store = StateStore::new(dir.path().to_path_buf()); let path = dir.path().join("deep").join("nested").join("file.json"); - store.write(&path, &42u32).unwrap(); + store.create(&path, &42u32).unwrap(); let loaded: u32 = store.read(&path).unwrap(); assert_eq!(loaded, 42); From f35b282f982861ec54e19d9518531f9fc738cdf2 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 4 Jun 2026 10:24:04 +0200 Subject: [PATCH 2/6] state: vm metadata create/update helpers Add vm::create (write-once, fails if the VM already exists) and vm::update (delta closure applied under the lock), the metadata-side counterparts to the StateStore primitives. vm::save stays as a shim until its callers migrate. --- crates/ember-core/src/state/vm.rs | 90 ++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 20 deletions(-) diff --git a/crates/ember-core/src/state/vm.rs b/crates/ember-core/src/state/vm.rs index 2ea3f52..46742ea 100644 --- a/crates/ember-core/src/state/vm.rs +++ b/crates/ember-core/src/state/vm.rs @@ -233,13 +233,44 @@ pub fn load(store: &StateStore, name: &str) -> Result { }) } -/// Save VM metadata to the state store. +/// Create a VM's metadata file, failing if one already exists for this name. /// -/// Creates the per-VM directory and writes `vm.json`. Overwrites -/// any existing metadata for this VM. -pub fn save(store: &StateStore, vm: &VmMetadata) -> Result<()> { +/// Atomically reserves the VM name: two concurrent creates of the same name +/// cannot both succeed (the loser gets [`Error::AlreadyExists`]). Creates the +/// per-VM directory as a side effect. +pub fn create(store: &StateStore, vm: &VmMetadata) -> Result<()> { let path = store.vm_metadata_path(&vm.name); - store.write(&path, vm) + store.create(&path, vm) +} + +/// Atomically apply a delta to a VM's metadata under an exclusive lock. +/// +/// The closure runs against the freshly-read on-disk state with the lock +/// held across read and write, so field updates computed after a long-running +/// operation (boot, network setup) are applied without clobbering a +/// concurrent change. Returns [`Error::VmNotFound`] if no metadata exists. +/// +/// Do the expensive work (process spawn, network setup) before calling this +/// and only assign the resulting fields inside the closure — the exclusive +/// lock is held for the closure's whole duration. +pub fn update( + store: &StateStore, + name: &str, + f: impl FnOnce(&mut VmMetadata) -> Result, +) -> Result { + let path = store.vm_metadata_path(name); + if !path.exists() { + return Err(Error::VmNotFound { + name: name.to_string(), + }); + } + store.update(&path, f) +} + +/// Overwrite a VM's metadata file. Fire-and-forget shim kept while callers +/// migrate to [`create`] / [`update`]. +pub fn save(store: &StateStore, vm: &VmMetadata) -> Result<()> { + store.write(&store.vm_metadata_path(&vm.name), vm) } /// List all VMs by reading metadata from each subdirectory under `vms/`. @@ -397,7 +428,7 @@ mod tests { let (_dir, store) = test_store(); let vm = sample_vm("testvm"); - save(&store, &vm).unwrap(); + create(&store, &vm).unwrap(); let loaded = load(&store, "testvm").unwrap(); assert_eq!(loaded, vm); } @@ -410,15 +441,27 @@ mod tests { } #[test] - fn save_overwrites_existing() { + fn create_rejects_duplicate() { let (_dir, store) = test_store(); - let mut vm = sample_vm("testvm"); - save(&store, &vm).unwrap(); + let vm = sample_vm("testvm"); + create(&store, &vm).unwrap(); - vm.cpus = 4; - vm.status = VmStatus::Running; - vm.pid = Some(12345); - save(&store, &vm).unwrap(); + let err = create(&store, &vm).unwrap_err(); + assert!(matches!(err, Error::AlreadyExists { .. })); + } + + #[test] + fn update_applies_delta() { + let (_dir, store) = test_store(); + create(&store, &sample_vm("testvm")).unwrap(); + + update(&store, "testvm", |m| { + m.cpus = 4; + m.status = VmStatus::Running; + m.pid = Some(12345); + Ok(()) + }) + .unwrap(); let loaded = load(&store, "testvm").unwrap(); assert_eq!(loaded.cpus, 4); @@ -426,6 +469,13 @@ mod tests { assert_eq!(loaded.pid, Some(12345)); } + #[test] + fn update_nonexistent_returns_not_found() { + let (_dir, store) = test_store(); + let err = update(&store, "nope", |_: &mut VmMetadata| Ok(())).unwrap_err(); + assert!(matches!(err, Error::VmNotFound { name } if name == "nope")); + } + #[test] fn list_empty() { let (_dir, store) = test_store(); @@ -436,9 +486,9 @@ mod tests { #[test] fn list_multiple_vms() { let (_dir, store) = test_store(); - save(&store, &sample_vm("beta")).unwrap(); - save(&store, &sample_vm("alpha")).unwrap(); - save(&store, &sample_vm("gamma")).unwrap(); + create(&store, &sample_vm("beta")).unwrap(); + create(&store, &sample_vm("alpha")).unwrap(); + create(&store, &sample_vm("gamma")).unwrap(); let vms = list(&store).unwrap(); assert_eq!(vms.len(), 3); @@ -453,14 +503,14 @@ mod tests { let (_dir, store) = test_store(); assert!(!exists(&store, "testvm")); - save(&store, &sample_vm("testvm")).unwrap(); + create(&store, &sample_vm("testvm")).unwrap(); assert!(exists(&store, "testvm")); } #[test] fn delete_removes_vm_dir() { let (_dir, store) = test_store(); - save(&store, &sample_vm("testvm")).unwrap(); + create(&store, &sample_vm("testvm")).unwrap(); assert!(exists(&store, "testvm")); delete(&store, "testvm").unwrap(); @@ -489,7 +539,7 @@ mod tests { vm.status = VmStatus::Running; vm.pid = Some(42); - save(&store, &vm).unwrap(); + create(&store, &vm).unwrap(); let loaded = load(&store, "netvm").unwrap(); assert_eq!(loaded, vm); assert_eq!(loaded.network.as_ref().unwrap().guest_ip, "10.100.0.2"); @@ -513,7 +563,7 @@ mod tests { #[test] fn list_skips_invalid_entries() { let (_dir, store) = test_store(); - save(&store, &sample_vm("good")).unwrap(); + create(&store, &sample_vm("good")).unwrap(); // Create a bogus VM directory with invalid JSON. let bad_dir = store.root().join("vms").join("bad"); From 58c0efe4bf1f64b2957dcda9ec0d6ed68dd6d0f1 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 4 Jun 2026 10:24:30 +0200 Subject: [PATCH 3/6] net: make IP allocation lock-safe Route allocate/allocate_single/release through StateStore::update(_with) so the find-free-block-and-insert runs under one lock. Closes the double-allocation race where two concurrent vm creates could claim the same /30 block. --- crates/ember-core/src/network/ip.rs | 173 ++++++++++++++++------------ 1 file changed, 101 insertions(+), 72 deletions(-) diff --git a/crates/ember-core/src/network/ip.rs b/crates/ember-core/src/network/ip.rs index f29d8d0..3626346 100644 --- a/crates/ember-core/src/network/ip.rs +++ b/crates/ember-core/src/network/ip.rs @@ -118,8 +118,9 @@ fn block_ips(base: Ipv4Addr, block_index: u32) -> IpAllocation { /// Allocate a /30 block for a VM. /// /// Finds the lowest-numbered available block in the subnet, records the -/// allocation, and persists it to the state store. The state store's -/// flock ensures safe concurrent access. +/// allocation, and persists it. The find-and-insert runs inside a single +/// locked read-modify-write so concurrent allocators can't both claim the +/// same block. pub fn allocate(store: &StateStore, subnet: &str, vm_name: &str) -> Result { let path = store.network_allocations_path(); let (base, prefix) = parse_cidr(subnet)?; @@ -130,35 +131,36 @@ pub fn allocate(store: &StateStore, subnet: &str, vm_name: &str) -> Result Result<()> { let path = store.network_allocations_path(); - let mut allocs: IpAllocations = match store.read_optional(&path)? { - Some(a) => a, - None => return Ok(()), - }; - - let before = allocs.allocations.len(); - allocs.allocations.retain(|_, name| name != vm_name); - - // Only write back if something changed. - if allocs.allocations.len() != before { - store.write(&path, &allocs)?; + // Nothing to release before any allocation has been made. Avoid + // materializing an empty allocations file (which would also have no + // valid `base_subnet`). + if !path.exists() { + return Ok(()); } - Ok(()) + store.update(&path, |allocs: &mut IpAllocations| { + allocs.allocations.retain(|_, name| name != vm_name); + Ok(()) + }) } #[cfg(test)] @@ -358,6 +359,34 @@ mod tests { assert_eq!(alloc.guest_ip, "10.100.0.2"); } + #[test] + fn concurrent_allocations_are_distinct() { + // Regression test for the double-allocation bug: N threads allocate + // concurrently against one subnet; every block index must be unique. + use std::collections::HashSet; + use std::sync::Arc; + + let dir = tempfile::tempdir().unwrap(); + let store = Arc::new(StateStore::new(dir.path().to_path_buf())); + store.init().unwrap(); + + const THREADS: u32 = 12; + let handles: Vec<_> = (0..THREADS) + .map(|i| { + let store = Arc::clone(&store); + std::thread::spawn(move || { + allocate(&store, "10.100.0.0/16", &format!("vm{i}")) + .unwrap() + .block_index + }) + }) + .collect(); + + let blocks: Vec = handles.into_iter().map(|h| h.join().unwrap()).collect(); + let unique: HashSet = blocks.iter().copied().collect(); + assert_eq!(unique.len(), THREADS as usize, "blocks: {blocks:?}"); + } + #[test] fn allocate_sequential() { let (_dir, store) = test_store(); From 4e8510e70bbcc47d7d371673d867bd7b7bc0dbc4 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 4 Jun 2026 10:24:37 +0200 Subject: [PATCH 4/6] image: route registry mutations through a locked update Add registry::update and migrate pull/build/rename/remove to it so concurrent image commands can't lose each other's registry entries. Drop ImageRegistry::save; the rename path's storage work stays outside the lock and dependent VM records are updated via vm::update. --- crates/ember-core/src/image/registry.rs | 73 ++++++++++++++----------- src/cli/image.rs | 34 +++++++----- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/crates/ember-core/src/image/registry.rs b/crates/ember-core/src/image/registry.rs index 17447a1..fe62457 100644 --- a/crates/ember-core/src/image/registry.rs +++ b/crates/ember-core/src/image/registry.rs @@ -48,12 +48,6 @@ impl ImageRegistry { .map(|opt| opt.unwrap_or_default()) } - /// Save the registry to the state store. - pub fn save(&self, store: &StateStore) -> Result<()> { - let path = store.image_registry_path(); - store.write(&path, self) - } - /// Add an image entry. Replaces any existing entry with the same local name. pub fn add(&mut self, entry: ImageEntry) { self.remove(&entry.local_name); @@ -148,17 +142,25 @@ fn now_iso8601() -> String { crate::state::vm::now_iso8601() } -/// Load the registry, remove an entry by local name, save, and return -/// the removed entry. Returns [`Error::ImageNotFound`] if not present. +/// Atomically read-modify-write the persisted registry under an exclusive +/// lock, materializing an empty registry if none exists yet. +/// +/// All registry mutations must go through here (rather than a load/save +/// pair) so concurrent `image` commands can't lose each other's changes. +pub fn update(store: &StateStore, f: impl FnOnce(&mut ImageRegistry) -> Result) -> Result { + store.update_with(&store.image_registry_path(), ImageRegistry::default, f) +} + +/// Remove an entry by local name and return it. Returns +/// [`Error::ImageNotFound`] if not present. pub fn remove_image(store: &StateStore, local_name: &str) -> Result { - let mut registry = ImageRegistry::load(store)?; - let entry = registry - .remove(local_name) - .ok_or_else(|| Error::ImageNotFound { - name: local_name.to_string(), - })?; - registry.save(store)?; - Ok(entry) + update(store, |registry| { + registry + .remove(local_name) + .ok_or_else(|| Error::ImageNotFound { + name: local_name.to_string(), + }) + }) } #[cfg(test)] @@ -259,28 +261,35 @@ mod tests { fn round_trip_through_state_store() { let (_dir, store) = test_store(); - let mut reg = ImageRegistry::default(); - reg.add(sample_entry("alpine")); - reg.add(sample_entry("ubuntu")); - reg.save(&store).unwrap(); + update(&store, |reg| { + reg.add(sample_entry("alpine")); + reg.add(sample_entry("ubuntu")); + Ok(()) + }) + .unwrap(); let loaded = ImageRegistry::load(&store).unwrap(); - assert_eq!(loaded, reg); assert_eq!(loaded.len(), 2); + assert!(loaded.exists("library-alpine-latest")); + assert!(loaded.exists("library-ubuntu-latest")); } #[test] fn save_and_reload_preserves_data() { let (_dir, store) = test_store(); - // Save, load, modify, save, load — verify consistency. - let mut reg = ImageRegistry::default(); - reg.add(sample_entry("alpine")); - reg.save(&store).unwrap(); + // Two independent update calls — verify consistency across reloads. + update(&store, |reg| { + reg.add(sample_entry("alpine")); + Ok(()) + }) + .unwrap(); - let mut reg2 = ImageRegistry::load(&store).unwrap(); - reg2.add(sample_entry("ubuntu")); - reg2.save(&store).unwrap(); + update(&store, |reg| { + reg.add(sample_entry("ubuntu")); + Ok(()) + }) + .unwrap(); let final_reg = ImageRegistry::load(&store).unwrap(); assert_eq!(final_reg.len(), 2); @@ -310,9 +319,11 @@ mod tests { fn remove_image_from_store() { let (_dir, store) = test_store(); - let mut reg = ImageRegistry::default(); - reg.add(sample_entry("alpine")); - reg.save(&store).unwrap(); + update(&store, |reg| { + reg.add(sample_entry("alpine")); + Ok(()) + }) + .unwrap(); let removed = remove_image(&store, "library-alpine-latest").unwrap(); assert_eq!(removed.local_name, "library-alpine-latest"); diff --git a/src/cli/image.rs b/src/cli/image.rs index b26d87f..440d2df 100644 --- a/src/cli/image.rs +++ b/src/cli/image.rs @@ -146,9 +146,10 @@ fn pull(args: &PullArgs, state_dir: &Path) -> anyhow::Result<()> { // Step 5: Register in local image registry. let disk = handle.disk_path.to_string_lossy().to_string(); let entry = new_entry(&reference, &disk, size_mib, handle.thin_id); - let mut registry = ImageRegistry::load(&store)?; - registry.add(entry); - registry.save(&store)?; + image::registry::update(&store, |registry| { + registry.add(entry); + Ok(()) + })?; rollback.commit(); @@ -222,9 +223,10 @@ fn build(args: &BuildArgs, state_dir: &Path) -> anyhow::Result<()> { // Step 5: Register in local image registry. let disk = handle.disk_path.to_string_lossy().to_string(); let entry = new_build_entry(&args.name, &local_name, &disk, size_mib, handle.thin_id); - let mut registry = ImageRegistry::load(&store)?; - registry.add(entry); - registry.save(&store)?; + image::registry::update(&store, |registry| { + registry.add(entry); + Ok(()) + })?; rollback.commit(); @@ -349,7 +351,9 @@ fn rename(args: &RenameArgs, state_dir: &Path) -> anyhow::Result<()> { // valid for ZFS datasets / dm device names / APFS files. let new_local_name = image::build::sanitize_name(&args.new_name)?; - let mut registry = ImageRegistry::load(&store)?; + // Snapshot the registry for validation and to read the source entry. + // The actual mutation is applied later under an exclusive lock. + let registry = ImageRegistry::load(&store)?; let old_local_name = resolve_local_name(®istry, &args.name)?; if old_local_name == new_local_name { @@ -386,18 +390,22 @@ fn rename(args: &RenameArgs, state_dir: &Path) -> anyhow::Result<()> { }; entry.reference = new_reference.clone(); - registry.remove(&old_local_name); - registry.add(entry); - registry.save(&store)?; + image::registry::update(&store, |reg| { + reg.remove(&old_local_name); + reg.add(entry); + Ok(()) + })?; // If the user-facing reference changed (built-image case), bring // dependent VM records along so `image delete` and similar // reference-matched lookups keep working. if new_reference != old_reference { - for mut v in vm::list(&store)? { + for v in vm::list(&store)? { if v.image == old_reference { - v.image = new_reference.clone(); - vm::save(&store, &v)?; + vm::update(&store, &v.name, |m| { + m.image = new_reference.clone(); + Ok(()) + })?; } } } From 1adda3ddd70b863a62cc00d090a37d24e9b8439e Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 4 Jun 2026 10:24:50 +0200 Subject: [PATCH 5/6] cli: persist config and vm metadata via locked updates Migrate every lifecycle command to vm::create / vm::update / store::create / store::update, take the per-VM operation lock for the whole command (fixing the create TOCTOU and double-start), download kernels outside the config lock, and make 'ember init' fail rather than clobber an initialized store. Drop the now-unused vm::save. --- crates/ember-core/src/state/vm.rs | 6 - crates/ember-linux/src/reconcile.rs | 19 +-- crates/ember-macos/src/reconcile.rs | 19 +-- src/cli/init.rs | 52 ++++++-- src/cli/vm.rs | 194 ++++++++++++++++++++-------- 5 files changed, 204 insertions(+), 86 deletions(-) diff --git a/crates/ember-core/src/state/vm.rs b/crates/ember-core/src/state/vm.rs index 46742ea..bc457af 100644 --- a/crates/ember-core/src/state/vm.rs +++ b/crates/ember-core/src/state/vm.rs @@ -267,12 +267,6 @@ pub fn update( store.update(&path, f) } -/// Overwrite a VM's metadata file. Fire-and-forget shim kept while callers -/// migrate to [`create`] / [`update`]. -pub fn save(store: &StateStore, vm: &VmMetadata) -> Result<()> { - store.write(&store.vm_metadata_path(&vm.name), vm) -} - /// List all VMs by reading metadata from each subdirectory under `vms/`. /// /// Skips directories that don't contain a valid `vm.json` (e.g., partially diff --git a/crates/ember-linux/src/reconcile.rs b/crates/ember-linux/src/reconcile.rs index 5abcac1..03bcde8 100644 --- a/crates/ember-linux/src/reconcile.rs +++ b/crates/ember-linux/src/reconcile.rs @@ -51,7 +51,7 @@ pub fn run(state_dir: &Path) { let mut active_tap_devices = HashSet::new(); // Phase 1: Reconcile VMs whose processes have died. - for mut metadata in vms { + for metadata in vms { match metadata.status { VmStatus::Running | VmStatus::Paused => {} _ => { @@ -68,7 +68,7 @@ pub fn run(state_dir: &Path) { "Warning: VM '{}' is {} but has no PID, marking stopped", metadata.name, metadata.status ); - mark_stopped(&store, &mut metadata); + mark_stopped(&store, &metadata); continue; } }; @@ -89,7 +89,7 @@ pub fn run(state_dir: &Path) { network::cleanup(&store, cfg, &metadata.name, net_info); } } - mark_stopped(&store, &mut metadata); + mark_stopped(&store, &metadata); } } @@ -117,11 +117,14 @@ pub fn run(state_dir: &Path) { } /// Mark a VM as Stopped, clearing its PID and network info. -fn mark_stopped(store: &StateStore, metadata: &mut vm::VmMetadata) { - metadata.status = VmStatus::Stopped; - metadata.pid = None; - metadata.network = None; - if let Err(e) = vm::save(store, metadata) { +fn mark_stopped(store: &StateStore, metadata: &vm::VmMetadata) { + let result = vm::update(store, &metadata.name, |m| { + m.status = VmStatus::Stopped; + m.pid = None; + m.network = None; + Ok(()) + }); + if let Err(e) = result { eprintln!( "Warning: failed to update VM '{}' state: {e}", metadata.name diff --git a/crates/ember-macos/src/reconcile.rs b/crates/ember-macos/src/reconcile.rs index 1adad0f..6c23ef6 100644 --- a/crates/ember-macos/src/reconcile.rs +++ b/crates/ember-macos/src/reconcile.rs @@ -35,7 +35,7 @@ pub fn run(state_dir: &Path) { } }; - for mut metadata in vms { + for metadata in vms { match metadata.status { VmStatus::Running | VmStatus::Paused => {} _ => continue, @@ -48,7 +48,7 @@ pub fn run(state_dir: &Path) { "Warning: VM '{}' is {} but has no PID, marking stopped", metadata.name, metadata.status ); - mark_stopped(&store, &mut metadata); + mark_stopped(&store, &metadata); continue; } }; @@ -59,19 +59,22 @@ pub fn run(state_dir: &Path) { "Warning: VM '{}' process (pid {pid}) is dead, marking stopped", metadata.name ); - mark_stopped(&store, &mut metadata); + mark_stopped(&store, &metadata); } } } /// Mark a VM as Stopped, clearing its PID and network info, /// and releasing its IP allocation. -fn mark_stopped(store: &StateStore, metadata: &mut vm::VmMetadata) { +fn mark_stopped(store: &StateStore, metadata: &vm::VmMetadata) { let _ = network::ip::release(store, &metadata.name); - metadata.status = VmStatus::Stopped; - metadata.pid = None; - metadata.network = None; - if let Err(e) = vm::save(store, metadata) { + let result = vm::update(store, &metadata.name, |m| { + m.status = VmStatus::Stopped; + m.pid = None; + m.network = None; + Ok(()) + }); + if let Err(e) = result { eprintln!( "Warning: failed to update VM '{}' state: {e}", metadata.name diff --git a/src/cli/init.rs b/src/cli/init.rs index cca3b01..f95243e 100644 --- a/src/cli/init.rs +++ b/src/cli/init.rs @@ -230,7 +230,15 @@ pub fn run(args: &InitArgs, state_dir: &Path) -> anyhow::Result<()> { dm_thin_block_size: resolved_block_size, dm_thin_mode: resolved_dm_thin_mode, }; - store.write(&store.config_path(), &config)?; + store + .create(&store.config_path(), &config) + .map_err(|e| match e { + ember_core::error::Error::AlreadyExists { .. } => anyhow::anyhow!( + "ember is already initialized at {} — run 'ember deinit' first to reconfigure", + store.config_path().display() + ), + other => other.into(), + })?; println!("Configuration written to {}", store.config_path().display()); println!("Instance id: {instance_id}"); println!("VM IP subnet: {ip_subnet}"); @@ -317,7 +325,7 @@ mod tests { state_dir: dir.path().to_path_buf(), ..zfs_config("testpool", "ember") }; - store.write(&store.config_path(), &config).unwrap(); + store.create(&store.config_path(), &config).unwrap(); let loaded: GlobalConfig = store.read(&store.config_path()).unwrap(); assert_eq!(loaded.pool, "testpool"); @@ -327,7 +335,7 @@ mod tests { } #[test] - fn global_config_overwritten_on_reinit() { + fn global_config_create_rejects_reinit() { let dir = tempfile::tempdir().unwrap(); let store = StateStore::new(dir.path().to_path_buf()); store.init().unwrap(); @@ -337,18 +345,46 @@ mod tests { state_dir: dir.path().to_path_buf(), ..zfs_config("pool1", "ds1") }; - store.write(&store.config_path(), &config1).unwrap(); + store.create(&store.config_path(), &config1).unwrap(); + // A second create must fail rather than clobber the existing config. let config2 = GlobalConfig { - kernel_path: Some(PathBuf::from("/kernels/vmlinux")), - wan_iface: Some("wlp2s0".to_string()), state_dir: dir.path().to_path_buf(), ..zfs_config("pool2", "ds2") }; - store.write(&store.config_path(), &config2).unwrap(); + let err = store.create(&store.config_path(), &config2).unwrap_err(); + assert!(matches!( + err, + ember_core::error::Error::AlreadyExists { .. } + )); + + // The original config is untouched. + let loaded: GlobalConfig = store.read(&store.config_path()).unwrap(); + assert_eq!(loaded, config1); + } + + #[test] + fn global_config_kernel_path_updated_via_delta() { + let dir = tempfile::tempdir().unwrap(); + let store = StateStore::new(dir.path().to_path_buf()); + store.init().unwrap(); + + let config = GlobalConfig { + state_dir: dir.path().to_path_buf(), + ..zfs_config("tank", "ember") + }; + store.create(&store.config_path(), &config).unwrap(); + + store + .update(&store.config_path(), |c: &mut GlobalConfig| { + c.kernel_path = Some(PathBuf::from("/kernels/vmlinux")); + Ok(()) + }) + .unwrap(); let loaded: GlobalConfig = store.read(&store.config_path()).unwrap(); - assert_eq!(loaded, config2); + assert_eq!(loaded.kernel_path, Some(PathBuf::from("/kernels/vmlinux"))); + assert_eq!(loaded.pool, "tank"); } #[test] diff --git a/src/cli/vm.rs b/src/cli/vm.rs index af271bc..3eaae2e 100644 --- a/src/cli/vm.rs +++ b/src/cli/vm.rs @@ -445,13 +445,20 @@ fn ensure_kernel( return Ok(path.clone()); } - // No kernel configured — download the default preset. + // No kernel configured — download the default preset. This runs + // outside the config lock; downloads must never be held under it. let default_spec = ember_core::kernel::KernelSpec::Preset(ember_core::kernel::DEFAULT_PRESET); let dest = default_spec.resolve(store)?; - // Persist so future creates skip the download. + // Persist so future creates skip the download. Re-read under the lock and + // only set the path if a concurrent create didn't already record one. + store.update(&store.config_path(), |cfg: &mut GlobalConfig| { + if cfg.kernel_path.is_none() { + cfg.kernel_path = Some(dest.clone()); + } + Ok(()) + })?; config.kernel_path = Some(dest.clone()); - store.write(&store.config_path(), config)?; Ok(dest) } @@ -482,6 +489,11 @@ fn create(args: &CreateArgs, state_dir: &Path) -> anyhow::Result<()> { // Resolve configuration: program defaults < YAML config < CLI flags. let resolved = resolve_create_config(args, yaml_config.as_ref())?; + // Serialize all lifecycle operations on this name for the whole command, + // so the exists-check below and the metadata create below are atomic with + // respect to a concurrent create/delete of the same VM. + let op = store.lock_vm(&resolved.name)?; + // Check VM doesn't already exist. if vm::exists(&store, &resolved.name) { anyhow::bail!("vm '{}' already exists", resolved.name); @@ -532,6 +544,10 @@ fn create(args: &CreateArgs, state_dir: &Path) -> anyhow::Result<()> { rollback.commit(); + // Release the per-VM op lock before delegating to `start`, which + // re-acquires it for the same name; holding it here would self-deadlock. + drop(op); + if !resolved.no_start { start( &StartArgs { @@ -623,7 +639,7 @@ fn create_post_clone( thin_id: pending.thin_id, }; - vm::save(store, &metadata)?; + vm::create(store, &metadata)?; println!("VM '{}' created successfully.", resolved.name); @@ -642,6 +658,9 @@ fn cp(args: &CpArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); let mut global_config: GlobalConfig = store.read(&store.config_path())?; + // Serialize lifecycle operations on the new VM name for the whole command. + let op = store.lock_vm(&args.name)?; + // Source must exist and be stopped. let source = vm::require_stopped(&store, &args.source, "copying")?; @@ -743,12 +762,15 @@ fn cp(args: &CpArgs, state_dir: &Path) -> anyhow::Result<()> { thin_id: pending.thin_id, }; - vm::save(&store, &metadata)?; + vm::create(&store, &metadata)?; rollback.commit(); println!("VM '{}' copied from '{}'.", args.name, args.source); + // Release the op lock before delegating to `start` (same name). + drop(op); + if !args.no_start { start( &StartArgs { @@ -777,9 +799,12 @@ fn rename(args: &RenameArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); + // Serialize lifecycle operations on the source name for the whole rename. + let _op = store.lock_vm(&args.name)?; + // Source must exist and be stopped (renaming touches paths the // hypervisor has open while running). - let mut metadata = vm::require_stopped(&store, &args.name, "renaming")?; + let metadata = vm::require_stopped(&store, &args.name, "renaming")?; // Target must not exist. if vm::exists(&store, &args.new_name) { @@ -811,19 +836,25 @@ fn rename(args: &RenameArgs, state_dir: &Path) -> anyhow::Result<()> { )); } - // Update metadata to point at the new name/paths and persist. - metadata.name = args.new_name.clone(); - metadata.disk_path = new_handle.disk_path.to_string_lossy().into_owned(); - metadata.thin_id = new_handle.thin_id; - metadata.api_socket = store.vm_dir(&args.new_name).join("firecracker.sock"); - vm::save(&store, &metadata)?; + // Update the moved metadata to point at the new name/paths. The + // vm.json now lives under the new directory (moved above), so this + // is a delta on the existing file rather than a fresh create. + vm::update(&store, &args.new_name, |m| { + m.name = args.new_name.clone(); + m.disk_path = new_handle.disk_path.to_string_lossy().into_owned(); + m.thin_id = new_handle.thin_id; + m.api_socket = store.vm_dir(&args.new_name).join("firecracker.sock"); + Ok(()) + })?; // Rewrite `parent_vm` on any forked children so the dependency // graph keeps resolving to a real VM. - for mut child in vm::list(&store)? { + for child in vm::list(&store)? { if child.parent_vm.as_deref() == Some(&args.name) { - child.parent_vm = Some(args.new_name.clone()); - vm::save(&store, &child)?; + vm::update(&store, &child.name, |m| { + m.parent_vm = Some(args.new_name.clone()); + Ok(()) + })?; } } @@ -884,6 +915,7 @@ fn start(args: &StartArgs, state_dir: &Path) -> anyhow::Result<()> { use ember_core::cleanup::Rollback; let store = StateStore::new(state_dir.to_path_buf()); + let _op = store.lock_vm(&args.name)?; let config: GlobalConfig = store.read(&store.config_path())?; // Load and validate VM state. @@ -948,9 +980,16 @@ fn start(args: &StartArgs, state_dir: &Path) -> anyhow::Result<()> { // ── Persist state ───────────────────────────────────────────── - metadata.status = VmStatus::Running; - metadata.pid = Some(pid); - vm::save(&store, &metadata)?; + // Apply the running-state delta under the lock. The expensive work + // (network setup, boot) is already done; only the field assignments + // happen while the lock is held. + let net = metadata.network.clone(); + vm::update(&store, &args.name, |m| { + m.status = VmStatus::Running; + m.pid = Some(pid); + m.network = net; + Ok(()) + })?; // Everything succeeded — keep all resources. rollback.commit(); @@ -966,9 +1005,10 @@ fn start(args: &StartArgs, state_dir: &Path) -> anyhow::Result<()> { /// socket → update metadata. fn stop(args: &StopArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); + let _op = store.lock_vm(&args.name)?; // Load and validate VM state. - let mut metadata = vm::load(&store, &args.name)?; + let metadata = vm::load(&store, &args.name)?; match metadata.status { VmStatus::Running | VmStatus::Paused => {} _ => { @@ -1008,10 +1048,12 @@ fn stop(args: &StopArgs, state_dir: &Path) -> anyhow::Result<()> { let _ = net_backend.teardown(&metadata, &config); // Update metadata. - metadata.status = VmStatus::Stopped; - metadata.pid = None; - metadata.network = None; - vm::save(&store, &metadata)?; + vm::update(&store, &args.name, |m| { + m.status = VmStatus::Stopped; + m.pid = None; + m.network = None; + Ok(()) + })?; println!("VM '{}' stopped.", args.name); Ok(()) @@ -1023,9 +1065,10 @@ fn stop(args: &StopArgs, state_dir: &Path) -> anyhow::Result<()> { /// Network and PID are preserved — the VM can be resumed or stopped from this state. fn pause(args: &PauseArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); + let _op = store.lock_vm(&args.name)?; // Load and validate VM state — only running VMs can be paused. - let mut metadata = vm::load(&store, &args.name)?; + let metadata = vm::load(&store, &args.name)?; match metadata.status { VmStatus::Running => {} _ => { @@ -1044,8 +1087,10 @@ fn pause(args: &PauseArgs, state_dir: &Path) -> anyhow::Result<()> { println!("Pausing VM '{}'...", args.name); Vm::pause(&metadata)?; - metadata.status = VmStatus::Paused; - vm::save(&store, &metadata)?; + vm::update(&store, &args.name, |m| { + m.status = VmStatus::Paused; + Ok(()) + })?; println!("VM '{}' paused.", args.name); Ok(()) @@ -1057,9 +1102,10 @@ fn pause(args: &PauseArgs, state_dir: &Path) -> anyhow::Result<()> { /// Network and PID were preserved during pause, so the VM resumes exactly where it left off. fn resume(args: &ResumeArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); + let _op = store.lock_vm(&args.name)?; // Load and validate VM state — only paused VMs can be resumed. - let mut metadata = vm::load(&store, &args.name)?; + let metadata = vm::load(&store, &args.name)?; match metadata.status { VmStatus::Paused => {} _ => { @@ -1078,8 +1124,10 @@ fn resume(args: &ResumeArgs, state_dir: &Path) -> anyhow::Result<()> { println!("Resuming VM '{}'...", args.name); Vm::resume(&metadata)?; - metadata.status = VmStatus::Running; - vm::save(&store, &metadata)?; + vm::update(&store, &args.name, |m| { + m.status = VmStatus::Running; + Ok(()) + })?; println!("VM '{}' resumed.", args.name); Ok(()) @@ -1091,7 +1139,8 @@ fn resume(args: &ResumeArgs, state_dir: &Path) -> anyhow::Result<()> { /// → grow disk → expand ext4 → update metadata. fn resize(args: &ResizeArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); - let mut metadata = vm::require_stopped(&store, &args.name, "resizing")?; + let _op = store.lock_vm(&args.name)?; + let metadata = vm::require_stopped(&store, &args.name, "resizing")?; // Convert size with unit to GiB. let new_gib = args @@ -1119,8 +1168,10 @@ fn resize(args: &ResizeArgs, state_dir: &Path) -> anyhow::Result<()> { storage.resize(&metadata, args.disk_size)?; // Update metadata. - metadata.disk_size_gib = new_gib; - vm::save(&store, &metadata)?; + vm::update(&store, &args.name, |m| { + m.disk_size_gib = new_gib; + Ok(()) + })?; println!( "VM '{}' disk resized from {} to {}.", @@ -1138,7 +1189,8 @@ fn resize(args: &ResizeArgs, state_dir: &Path) -> anyhow::Result<()> { /// VM to be stopped. fn update_config(args: &UpdateConfigArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); - let mut metadata = vm::require_stopped(&store, &args.name, "updating configuration")?; + let _op = store.lock_vm(&args.name)?; + vm::require_stopped(&store, &args.name, "updating configuration")?; // Require at least one field to update. if args.cpus.is_none() @@ -1151,53 +1203,82 @@ fn update_config(args: &UpdateConfigArgs, state_dir: &Path) -> anyhow::Result<() anyhow::bail!("no configuration changes specified"); } + // Compute the new field values up front, outside the metadata lock — + // validation may fail and the kernel branch may download. The locked + // delta below is pure field assignment. let mut changes = Vec::new(); - if let Some(cpus) = args.cpus { - if cpus == 0 { - anyhow::bail!("cpus must be at least 1"); + let new_cpus = match args.cpus { + Some(0) => anyhow::bail!("cpus must be at least 1"), + Some(cpus) => { + changes.push(format!("cpus: {cpus}")); + Some(cpus) } - metadata.cpus = cpus; - changes.push(format!("cpus: {cpus}")); - } + None => None, + }; - if let Some(ref memory) = args.memory { + let new_memory_mib = if let Some(ref memory) = args.memory { let mib = memory .to_mib() .map_err(|e| anyhow::anyhow!("invalid memory size: {e}"))?; - metadata.memory_mib = mib; changes.push(format!("memory: {}", format_bytes_binary(mib as u64 * MIB))); - } + Some(mib) + } else { + None + }; - if let Some(ref kernel) = args.kernel { + let new_kernel = if let Some(ref kernel) = args.kernel { let mut config: GlobalConfig = store.read(&store.config_path())?; let kernel_path = ensure_kernel(&Some(kernel.clone()), &mut config, &store)?; - metadata.kernel_path = kernel_path.clone(); changes.push(format!("kernel: {}", kernel_path.display())); - } + Some(kernel_path) + } else { + None + }; - if let Some(ref boot_args) = args.boot_args { + // `None` = unchanged, `Some(None)` = clear, `Some(Some(_))` = set. + let new_boot_args = args.boot_args.as_ref().map(|boot_args| { if boot_args.is_empty() { - metadata.boot_args = None; changes.push("boot-args: cleared".to_string()); + None } else { - metadata.boot_args = Some(boot_args.clone()); changes.push(format!("boot-args: {boot_args}")); + Some(boot_args.clone()) } - } + }); - if let Some(ref user) = args.ssh_user { - metadata.ssh.user = user.clone(); + let new_ssh_user = args.ssh_user.as_ref().map(|user| { changes.push(format!("ssh-user: {user}")); - } + user.clone() + }); - if let Some(ref key) = args.ssh_key { + let new_ssh_key = args.ssh_key.as_ref().map(|key| { let expanded = config::vm::expand_tilde(key); - metadata.ssh.key = expanded.clone(); changes.push(format!("ssh-key: {}", expanded.display())); - } + expanded + }); - vm::save(&store, &metadata)?; + vm::update(&store, &args.name, |m| { + if let Some(cpus) = new_cpus { + m.cpus = cpus; + } + if let Some(mib) = new_memory_mib { + m.memory_mib = mib; + } + if let Some(kernel_path) = new_kernel { + m.kernel_path = kernel_path; + } + if let Some(boot_args) = new_boot_args { + m.boot_args = boot_args; + } + if let Some(user) = new_ssh_user { + m.ssh.user = user; + } + if let Some(key) = new_ssh_key { + m.ssh.key = key; + } + Ok(()) + })?; println!("Updated VM '{}':", args.name); for change in &changes { @@ -1215,6 +1296,7 @@ fn update_config(args: &UpdateConfigArgs, state_dir: &Path) -> anyhow::Result<() /// Each cleanup step is idempotent — continues if the resource is already gone. fn rm(args: &RmArgs, state_dir: &Path) -> anyhow::Result<()> { let store = StateStore::new(state_dir.to_path_buf()); + let _op = store.lock_vm(&args.name)?; // Load VM metadata (must exist). let metadata = vm::load(&store, &args.name)?; From ce81a0778fedeb4e53649d5b2b38c3808b4419c0 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 4 Jun 2026 10:24:57 +0200 Subject: [PATCH 6/6] state: remove fire-and-forget write All mutations now go through update/update_with/create, which hold the lock across the whole transaction. Remove the public write so an unlocked read-then-write can't reintroduce the lost-update window. --- crates/ember-core/src/state/store.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/ember-core/src/state/store.rs b/crates/ember-core/src/state/store.rs index c14528b..677106e 100644 --- a/crates/ember-core/src/state/store.rs +++ b/crates/ember-core/src/state/store.rs @@ -264,16 +264,6 @@ impl StateStore { self.write_locked(path, data) } - /// Serialize and write a JSON file atomically, taking the exclusive lock. - /// - /// Fire-and-forget overwrite, kept while callers migrate to - /// [`update`](Self::update) / [`create`](Self::create); it does not guard - /// against a lost update across a preceding read. - pub fn write(&self, path: &Path, data: &T) -> Result<()> { - let _lock = FileLock::exclusive(path)?; - self.write_locked(path, data) - } - /// Deserialize the JSON file at `path`, returning `None` if it is absent. /// /// Takes no lock — the caller must already hold one.