Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/ember-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ pub enum Error {
#[error("state: {0}")]
State(String),

/// A state file already exists where a fresh create was expected.
#[error("{path}: already exists")]
AlreadyExists { path: PathBuf },

/// Storage pool error (dm-thin / btrfs / ZFS pool-level state, as
/// distinct from individual volume / dataset errors).
#[error("storage pool: {0}")]
Expand Down
73 changes: 42 additions & 31 deletions crates/ember-core/src/image/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ impl ImageRegistry {
.map(|opt| opt.unwrap_or_default())
}

/// Save the registry to the state store.
pub fn save(&self, store: &StateStore) -> Result<()> {
let path = store.image_registry_path();
store.write(&path, self)
}

/// Add an image entry. Replaces any existing entry with the same local name.
pub fn add(&mut self, entry: ImageEntry) {
self.remove(&entry.local_name);
Expand Down Expand Up @@ -148,17 +142,25 @@ fn now_iso8601() -> String {
crate::state::vm::now_iso8601()
}

/// Load the registry, remove an entry by local name, save, and return
/// the removed entry. Returns [`Error::ImageNotFound`] if not present.
/// Atomically read-modify-write the persisted registry under an exclusive
/// lock, materializing an empty registry if none exists yet.
///
/// All registry mutations must go through here (rather than a load/save
/// pair) so concurrent `image` commands can't lose each other's changes.
pub fn update<R>(store: &StateStore, f: impl FnOnce(&mut ImageRegistry) -> Result<R>) -> Result<R> {
store.update_with(&store.image_registry_path(), ImageRegistry::default, f)
}

/// Remove an entry by local name and return it. Returns
/// [`Error::ImageNotFound`] if not present.
pub fn remove_image(store: &StateStore, local_name: &str) -> Result<ImageEntry> {
let mut registry = ImageRegistry::load(store)?;
let entry = registry
.remove(local_name)
.ok_or_else(|| Error::ImageNotFound {
name: local_name.to_string(),
})?;
registry.save(store)?;
Ok(entry)
update(store, |registry| {
registry
.remove(local_name)
.ok_or_else(|| Error::ImageNotFound {
name: local_name.to_string(),
})
})
}

#[cfg(test)]
Expand Down Expand Up @@ -259,28 +261,35 @@ mod tests {
fn round_trip_through_state_store() {
let (_dir, store) = test_store();

let mut reg = ImageRegistry::default();
reg.add(sample_entry("alpine"));
reg.add(sample_entry("ubuntu"));
reg.save(&store).unwrap();
update(&store, |reg| {
reg.add(sample_entry("alpine"));
reg.add(sample_entry("ubuntu"));
Ok(())
})
.unwrap();

let loaded = ImageRegistry::load(&store).unwrap();
assert_eq!(loaded, reg);
assert_eq!(loaded.len(), 2);
assert!(loaded.exists("library-alpine-latest"));
assert!(loaded.exists("library-ubuntu-latest"));
}

#[test]
fn save_and_reload_preserves_data() {
let (_dir, store) = test_store();

// Save, load, modify, save, load — verify consistency.
let mut reg = ImageRegistry::default();
reg.add(sample_entry("alpine"));
reg.save(&store).unwrap();
// Two independent update calls — verify consistency across reloads.
update(&store, |reg| {
reg.add(sample_entry("alpine"));
Ok(())
})
.unwrap();

let mut reg2 = ImageRegistry::load(&store).unwrap();
reg2.add(sample_entry("ubuntu"));
reg2.save(&store).unwrap();
update(&store, |reg| {
reg.add(sample_entry("ubuntu"));
Ok(())
})
.unwrap();

let final_reg = ImageRegistry::load(&store).unwrap();
assert_eq!(final_reg.len(), 2);
Expand Down Expand Up @@ -310,9 +319,11 @@ mod tests {
fn remove_image_from_store() {
let (_dir, store) = test_store();

let mut reg = ImageRegistry::default();
reg.add(sample_entry("alpine"));
reg.save(&store).unwrap();
update(&store, |reg| {
reg.add(sample_entry("alpine"));
Ok(())
})
.unwrap();

let removed = remove_image(&store, "library-alpine-latest").unwrap();
assert_eq!(removed.local_name, "library-alpine-latest");
Expand Down
173 changes: 101 additions & 72 deletions crates/ember-core/src/network/ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ fn block_ips(base: Ipv4Addr, block_index: u32) -> IpAllocation {
/// Allocate a /30 block for a VM.
///
/// Finds the lowest-numbered available block in the subnet, records the
/// allocation, and persists it to the state store. The state store's
/// flock ensures safe concurrent access.
/// allocation, and persists it. The find-and-insert runs inside a single
/// locked read-modify-write so concurrent allocators can't both claim the
/// same block.
pub fn allocate(store: &StateStore, subnet: &str, vm_name: &str) -> Result<IpAllocation> {
let path = store.network_allocations_path();
let (base, prefix) = parse_cidr(subnet)?;
Expand All @@ -130,35 +131,36 @@ pub fn allocate(store: &StateStore, subnet: &str, vm_name: &str) -> Result<IpAll
}
let max = max_blocks(prefix);

let mut allocs: IpAllocations = store
.read_optional(&path)?
.unwrap_or_else(|| IpAllocations {
let block_index = store.update_with(
&path,
|| IpAllocations {
base_subnet: subnet.to_string(),
allocations: HashMap::new(),
});

// Verify the subnet hasn't changed since allocations started.
if allocs.base_subnet != subnet {
return Err(Error::Network(format!(
"subnet mismatch: state has '{}', requested '{subnet}'",
allocs.base_subnet
)));
}

// Find the first free block.
let block_index = (0..max)
.find(|i| !allocs.allocations.contains_key(i))
.ok_or_else(|| {
Error::Network(format!(
"no free /30 blocks in {subnet} (all {max} blocks allocated)"
))
})?;

let allocation = block_ips(base, block_index);
allocs.allocations.insert(block_index, vm_name.to_string());
store.write(&path, &allocs)?;
},
|allocs| {
// Verify the subnet hasn't changed since allocations started.
if allocs.base_subnet != subnet {
return Err(Error::Network(format!(
"subnet mismatch: state has '{}', requested '{subnet}'",
allocs.base_subnet
)));
}

Ok(allocation)
// Find the first free block.
let block_index = (0..max)
.find(|i| !allocs.allocations.contains_key(i))
.ok_or_else(|| {
Error::Network(format!(
"no free /30 blocks in {subnet} (all {max} blocks allocated)"
))
})?;

allocs.allocations.insert(block_index, vm_name.to_string());
Ok(block_index)
},
)?;

Ok(block_ips(base, block_index))
}

/// Allocate a single /32 address for a VM in a shared subnet.
Expand All @@ -185,44 +187,46 @@ pub fn allocate_single(
let path = store.network_allocations_path();
let (base, prefix) = parse_cidr(subnet)?;
let max = 1u32 << (32 - prefix);
let base_u32 = u32::from(base);

let mut allocs: IpAllocations = store
.read_optional(&path)?
.unwrap_or_else(|| IpAllocations {
let block_index = store.update_with(
&path,
|| IpAllocations {
base_subnet: subnet.to_string(),
allocations: HashMap::new(),
});

if allocs.base_subnet != subnet {
return Err(Error::Network(format!(
"subnet mismatch: state has '{}', requested '{subnet}'",
allocs.base_subnet
)));
}

let base_u32 = u32::from(base);

// Walk the subnet looking for an unallocated, non-reserved slot.
// Skipping reserved addresses keeps the gateway (and the wider
// /24's network/broadcast when carved into /27s) un-handout-able
// without the caller having to seed allocations.json.
let block_index = (0..max)
.find(|i| {
if allocs.allocations.contains_key(i) {
return false;
},
|allocs| {
if allocs.base_subnet != subnet {
return Err(Error::Network(format!(
"subnet mismatch: state has '{}', requested '{subnet}'",
allocs.base_subnet
)));
}
let addr = Ipv4Addr::from(base_u32 + i);
!reserved.contains(&addr)
})
.ok_or_else(|| {
Error::Network(format!(
"no free addresses in {subnet} (all {max} candidates allocated or reserved)"
))
})?;

// Walk the subnet looking for an unallocated, non-reserved slot.
// Skipping reserved addresses keeps the gateway (and the wider
// /24's network/broadcast when carved into /27s) un-handout-able
// without the caller having to seed allocations.json.
let block_index = (0..max)
.find(|i| {
if allocs.allocations.contains_key(i) {
return false;
}
let addr = Ipv4Addr::from(base_u32 + i);
!reserved.contains(&addr)
})
.ok_or_else(|| {
Error::Network(format!(
"no free addresses in {subnet} (all {max} candidates allocated or reserved)"
))
})?;

allocs.allocations.insert(block_index, vm_name.to_string());
Ok(block_index)
},
)?;

let guest_ip = Ipv4Addr::from(base_u32 + block_index);
allocs.allocations.insert(block_index, vm_name.to_string());
store.write(&path, &allocs)?;

Ok(IpAllocation {
block_index,
Expand All @@ -239,20 +243,17 @@ pub fn allocate_single(
/// has no allocation or the allocations file doesn't exist.
pub fn release(store: &StateStore, vm_name: &str) -> Result<()> {
let path = store.network_allocations_path();
let mut allocs: IpAllocations = match store.read_optional(&path)? {
Some(a) => a,
None => return Ok(()),
};

let before = allocs.allocations.len();
allocs.allocations.retain(|_, name| name != vm_name);

// Only write back if something changed.
if allocs.allocations.len() != before {
store.write(&path, &allocs)?;
// Nothing to release before any allocation has been made. Avoid
// materializing an empty allocations file (which would also have no
// valid `base_subnet`).
if !path.exists() {
return Ok(());
}

Ok(())
store.update(&path, |allocs: &mut IpAllocations| {
allocs.allocations.retain(|_, name| name != vm_name);
Ok(())
})
}

#[cfg(test)]
Expand Down Expand Up @@ -358,6 +359,34 @@ mod tests {
assert_eq!(alloc.guest_ip, "10.100.0.2");
}

#[test]
fn concurrent_allocations_are_distinct() {
// Regression test for the double-allocation bug: N threads allocate
// concurrently against one subnet; every block index must be unique.
use std::collections::HashSet;
use std::sync::Arc;

let dir = tempfile::tempdir().unwrap();
let store = Arc::new(StateStore::new(dir.path().to_path_buf()));
store.init().unwrap();

const THREADS: u32 = 12;
let handles: Vec<_> = (0..THREADS)
.map(|i| {
let store = Arc::clone(&store);
std::thread::spawn(move || {
allocate(&store, "10.100.0.0/16", &format!("vm{i}"))
.unwrap()
.block_index
})
})
.collect();

let blocks: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let unique: HashSet<u32> = blocks.iter().copied().collect();
assert_eq!(unique.len(), THREADS as usize, "blocks: {blocks:?}");
}

#[test]
fn allocate_sequential() {
let (_dir, store) = test_store();
Expand Down
Loading
Loading