Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
40e2184
design: buffer-managed dataflow state
DAlperin Jun 10, 2026
22ccdb4
design: integrate buffer-managed state with differential's Chunk abst…
DAlperin Jun 10, 2026
d8b7b23
design: incremental migration and swap coexistence
DAlperin Jun 10, 2026
25a722a
design: swap-backed extent store for filesystem-less nodes
DAlperin Jun 10, 2026
d66f677
prototype: swap-extent buffer pool, layers 2 and 3
DAlperin Jun 10, 2026
59da42d
column-pager: wire buffer pool behind dyncfgs for staging
DAlperin Jun 10, 2026
db9e26f
column-pager: log spill mechanism resolution for debugging
DAlperin Jun 10, 2026
a1c30b7
column-pager: keep tiered singleton configured in pool mode; resolve …
DAlperin Jun 10, 2026
0e1074e
design: batched-seek cursors as the probe prefetch story
DAlperin Jun 10, 2026
df13c7a
pool: off-worker spill threads for eviction I/O
DAlperin Jun 11, 2026
41d3da2
compute-types: default pool spill threads to 2
DAlperin Jun 11, 2026
1a5c0b5
pool: insert_with writes serialized columns directly into the slot
DAlperin Jun 11, 2026
c920501
pool: 64 GiB class reservations; degrade to heap on slot exhaustion
DAlperin Jun 11, 2026
123ed18
pool: scope slot lifetime to residency, dropping the stable-address i…
DAlperin Jun 11, 2026
095852c
pool: single-flight budget enforcement over a resident-only queue
DAlperin Jun 11, 2026
48803bc
pool: size extents to the compressed payload, not lz4 worst case
DAlperin Jun 11, 2026
a0e6489
pool: add 4 and 8 MiB size classes for boundary-overshooting chunks
DAlperin Jun 11, 2026
777f725
storage: drain upsert stash seal chunk-at-a-time
DAlperin Jun 11, 2026
f0efa89
design: record staging prototype findings; add remote-extents direction
DAlperin Jun 11, 2026
568a0dd
pool: cleanup pass from complexity review
DAlperin Jun 11, 2026
4ac9f0f
storage: tidy upsert drain seal path
DAlperin Jun 11, 2026
a390c75
timely-util: collapse column pager config globals
DAlperin Jun 11, 2026
5729723
pool: rename elided frees to writes elided
DAlperin Jun 11, 2026
053a1d9
timely-util: keep short merge-batcher chains resident
DAlperin Jun 11, 2026
e9cf871
compute: rename pool spill-threads dyncfg to column_paged_batcher_spi…
DAlperin Jun 11, 2026
6277181
pool: warm slot reuse and hugepage-backed large classes
DAlperin Jun 12, 2026
a287d49
pool: eager backing to compressed-but-resident off the workers
DAlperin Jun 12, 2026
c9c685b
pool: compressed-resident extent tier under an RSS target
DAlperin Jun 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

671 changes: 671 additions & 0 deletions doc/developer/design/20260610_buffer_managed_state.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ def get_default_system_parameters(
"column_paged_batcher_budget_fraction",
"column_paged_batcher_lz4",
"column_paged_batcher_swap_pageout",
"column_paged_batcher_use_pool",
"column_paged_batcher_pool_spill_threads",
"enable_upsert_paged_spill",
"enable_lgalloc_eager_reclamation",
"lgalloc_background_interval",
Expand Down
6 changes: 6 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,12 @@ def __init__(
self.flags_with_values["column_paged_batcher_swap_pageout"] = (
BOOLEAN_FLAG_VALUES
)
self.flags_with_values["column_paged_batcher_use_pool"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["column_paged_batcher_pool_spill_threads"] = [
"0",
"2",
"4",
]
self.flags_with_values["enable_upsert_paged_spill"] = BOOLEAN_FLAG_VALUES

# If you are adding a new config flag in Materialize, consider using it
Expand Down
73 changes: 71 additions & 2 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config<bool> = Config::new(
pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_budget_fraction",
0.05,
"Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).",
"Fraction of physical RAM the column-paged batcher may hold resident before spilling \
(resident budgets derive from RAM, never from announced limits that include swap). \
Total budget = max(ram * fraction, 128 MiB).",
);

/// Compress chunks the column-paged batcher spills, using lz4. Only
Expand Down Expand Up @@ -115,6 +116,70 @@ pub const COLUMN_PAGED_BATCHER_SWAP_PAGEOUT: Config<bool> = Config::new(
meaningful when `column_paged_batcher_lz4 = true` and the swap backend is active.",
);

/// Route column-paged batcher spill through the buffer pool
/// (`mz_ore::pool`, swap-backed extents) instead of the tiered pager
/// backends. The pool owns residency: chunks stay resident at stable
/// addresses until its budget (the same fraction-derived total as the
/// tiered policy's) forces compression into swap-backed extents, and
/// chunks consumed before eviction never cost a write at all. The backend
/// and lz4 configs are ignored in pool mode; the pool always compresses at
/// the eviction boundary. Falls back to the tiered path if the pool's
/// virtual reservation fails.
pub const COLUMN_PAGED_BATCHER_USE_POOL: Config<bool> = Config::new(
"column_paged_batcher_use_pool",
false,
"Route column-paged batcher spill through the buffer pool (swap-backed extents) instead of \
the tiered pager backends. Only meaningful when `enable_column_paged_batcher_spill = true`.",
);

/// Number of buffer-pool spill threads performing eviction I/O (lz4
/// compression plus the synchronous-reclaim `MADV_PAGEOUT`) off the threads
/// that trip the budget. Zero evicts inline on the calling thread, which
/// measurably convoys workers behind eviction I/O at hydration eviction
/// rates. Thread spawning is once per process: raising the value later has
/// no effect beyond re-enabling, and lowering it to zero falls back to
/// inline eviction while spawned threads idle.
pub const COLUMN_PAGED_BATCHER_SPILL_WORKER_COUNT: Config<usize> = Config::new(
"column_paged_batcher_spill_worker_count",
2,
"Buffer-pool spill threads for off-worker eviction I/O; 0 evicts inline on the caller.",
);

/// Eagerly compress unbacked buffer-pool chunks to `BackedResident` on idle
/// spill threads (write-behind). The chunk stays readable in its slot while
/// a compressed extent accumulates on the swap device, so budget-driven
/// eviction becomes a pure page release instead of a compression. Trades
/// background CPU (compression of chunks that may die before pressure
/// reaches them) for near-free pressure response.
pub const COLUMN_PAGED_BATCHER_EAGER_BACKING: Config<bool> = Config::new(
"column_paged_batcher_eager_backing",
false,
"Eagerly compress buffer-pool chunks to compressed-but-resident on idle spill threads, so \
budget-driven eviction is a pure page release. Only meaningful in pool mode with spill \
workers.",
);

/// Ceiling on the buffer pool's total RSS, as a fraction of *physical RAM*
/// (never the announced limit, which includes swap on swap-provisioned
/// nodes). The compressed-but-resident extent tier is the headroom above the
/// slot budget and warm cap: chunks evicted from the budget stay in RAM
/// compressed (~5.6x denser; reads decompress without faulting) until this
/// ceiling forces the oldest extents out to the swap device via
/// `MADV_PAGEOUT`. Zero collapses the tier: extents page out as soon as
/// they are written.
///
/// The default pairs with the 0.05 budget default to leave ~20% of RAM for
/// the compressed tier — the same share zswap's default compressed pool
/// takes, and roughly RAM-sized logical coverage at the measured ~5.6x
/// ratio — while keeping three quarters of RAM for everything else in the
/// process.
pub const COLUMN_PAGED_BATCHER_POOL_RSS_TARGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_pool_rss_target_fraction",
0.25,
"Ceiling on the buffer pool's total RSS as a fraction of physical RAM; the headroom above \
the slot budget holds compressed-but-resident extents. Zero pages extents out immediately.",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -536,4 +601,8 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
.add(&COLUMN_PAGED_BATCHER_LZ4)
.add(&COLUMN_PAGED_BATCHER_SWAP_PAGEOUT)
.add(&COLUMN_PAGED_BATCHER_USE_POOL)
.add(&COLUMN_PAGED_BATCHER_SPILL_WORKER_COUNT)
.add(&COLUMN_PAGED_BATCHER_EAGER_BACKING)
.add(&COLUMN_PAGED_BATCHER_POOL_RSS_TARGET_FRACTION)
}
81 changes: 63 additions & 18 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,39 +324,84 @@ impl ComputeState {
// available, swap otherwise.
{
use mz_ore::pager::Backend;
use mz_timely_util::column_pager::{Codec, apply_tiered_config};
use mz_timely_util::column_pager::{
Codec, PoolPagerConfig, apply_pool_config, apply_tiered_config,
};

let enabled = ENABLE_COLUMN_PAGED_BATCHER_SPILL.get(config);
let use_pool = COLUMN_PAGED_BATCHER_USE_POOL.get(config);
let spill_threads = COLUMN_PAGED_BATCHER_SPILL_WORKER_COUNT.get(config);
let eager_backing = COLUMN_PAGED_BATCHER_EAGER_BACKING.get(config);
let codec = COLUMN_PAGED_BATCHER_LZ4.get(config).then_some(Codec::Lz4);
let swap_pageout = COLUMN_PAGED_BATCHER_SWAP_PAGEOUT.get(config);

// Budget derivation: fraction × announced memory limit, with a
// 128 MiB floor so the no-pressure case doesn't page per chunk.
// Falls back to a 4 GiB assumption if no limit was announced
// (e.g. dev environments).
// Budget derivation: fraction × physical RAM, with a 128 MiB
// floor so the no-pressure case doesn't page per chunk. Resident
// budgets derive from RAM — never from the announced memory
// limit, which on swap-provisioned nodes deliberately includes
// swap for the memory limiter's purposes. Falls back to a 4 GiB
// assumption if detection fails. The pool and tiered paths share
// the derivation: the budget bounds resident bytes either way.
const MIB: usize = 1024 * 1024;
const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB;
let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT);
const DEFAULT_RAM: usize = 4 * 1024 * MIB;
let ram = mz_ore::memory::physical_memory_bytes().unwrap_or(DEFAULT_RAM);
let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0);
let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB);
let total = usize::cast_lossy(f64::cast_lossy(ram) * fraction).max(128 * MIB);
let target_fraction = COLUMN_PAGED_BATCHER_POOL_RSS_TARGET_FRACTION
.get(config)
.max(0.0);
let rss_target = usize::cast_lossy(f64::cast_lossy(ram) * target_fraction);

let backend = if self.context.scratch_directory.is_some() {
Backend::File
} else {
Backend::Swap
};

debug!(
let pool_config = PoolPagerConfig {
enabled,
?backend,
?codec,
swap_pageout,
fraction,
mem_limit,
budget_bytes = total,
"column-paged batcher: applying tiered config",
);
apply_tiered_config(enabled, total, backend, codec, swap_pageout);
budget_bytes: total,
spill_threads,
eager_backing,
rss_target_bytes: rss_target,
};
if use_pool && apply_pool_config(pool_config) {
// Keep the tiered singleton configured even though the pool
// is the installed mechanism: consumers that captured a
// tiered pager (boot-time config ordering between the
// compute and storage protocols is unconstrained) must see
// the operator's budget and codec, not the singleton's
// zero-budget, codec-less boot state.
mz_timely_util::column_pager::tiered_policy().reconfigure(total, backend, codec);
info!(
enabled,
fraction,
ram,
budget_bytes = total,
spill_threads,
eager_backing,
rss_target_bytes = rss_target,
"column-paged batcher: applying pool config",
);
} else {
if use_pool {
warn!(
"column-paged batcher: buffer pool unavailable; \
falling back to tiered config",
);
}
info!(
enabled,
?backend,
?codec,
swap_pageout,
fraction,
ram,
budget_bytes = total,
"column-paged batcher: applying tiered config",
);
apply_tiered_config(enabled, total, backend, codec, swap_pageout);
}
}

// Remember the maintenance interval locally to avoid reading it from the config set on
Expand Down
5 changes: 1 addition & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ pub async fn serve(
assert_eq!(storage_log_readers.len(), workers_per_process);
storage_log_readers.into_iter().map(Some).collect()
};
mz_timely_util::column_pager::metrics::register(
metrics_registry,
mz_timely_util::column_pager::tiered_policy(),
);
mz_timely_util::column_pager::metrics::register(metrics_registry);

let config = Config {
persist_clients,
Expand Down
32 changes: 8 additions & 24 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2038,32 +2038,16 @@ mod tests {
);
}

/// A [`PagingPolicy`] that always spills to the swap backend, uncompressed.
///
/// The default global pager keeps every chunk resident; installing this drives the actual
/// spill path so the tests exercise [`Chunk::column`]'s page-in through [`mz_ore::pager`].
///
/// [`PagingPolicy`]: column_pager::PagingPolicy
struct ForceSwap;

impl column_pager::PagingPolicy for ForceSwap {
fn decide(&self, _hint: column_pager::PageHint) -> column_pager::PageDecision {
column_pager::PageDecision::Page {
backend: mz_ore::pager::Backend::Swap,
codec: None,
}
}
fn record(&self, _event: column_pager::PageEvent) {}
}

/// Install a global pager that spills every chunk to swap for the duration of `f`, then
/// restore the default (disabled) pager. The global pager is process-wide; concurrent tests
/// only ever observe a correct round-trip regardless of backend, so racing on it is benign.
/// Configure the global pager to spill every chunk to swap (uncompressed) for the duration
/// of `f`, then restore the default (disabled) pager. A zero tiered budget makes every
/// `decide` answer `Page`, driving the actual spill path so the tests exercise
/// [`Chunk::column`]'s page-in through [`mz_ore::pager`]. The pager configuration is
/// process-wide; concurrent tests only ever observe a correct round-trip regardless of
/// backend, so racing on it is benign.
fn with_swap_pager<R>(f: impl FnOnce() -> R) -> R {
use std::sync::Arc;
column_pager::set_global_pager(column_pager::ColumnPager::new(Arc::new(ForceSwap)));
column_pager::apply_tiered_config(true, 0, mz_ore::pager::Backend::Swap, None, false);
let result = f();
column_pager::set_global_pager(column_pager::ColumnPager::disabled());
column_pager::apply_tiered_config(false, 0, mz_ore::pager::Backend::Swap, None, false);
result
}

Expand Down
3 changes: 2 additions & 1 deletion src/ore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ipnet.workspace = true
itertools.workspace = true
lgalloc = { workspace = true, optional = true }
libc = { workspace = true, optional = true }
lz4_flex = { workspace = true, optional = true }
mz-ore-proc = { path = "../ore-proc", default-features = false }
num.workspace = true
num-traits = { workspace = true, optional = true }
Expand Down Expand Up @@ -146,7 +147,7 @@ assert-no-tracing = ["ctor"]
assert = ["assert-no-tracing", "ctor", "tracing"]
proptest = ["dep:proptest", "proptest-derive"]
overflowing = ["assert"]
pager = ["dep:bytemuck", "libc", "rand", "dep:tracing"]
pager = ["dep:bytemuck", "libc", "rand", "dep:tracing", "dep:lz4_flex"]

[[test]]
name = "future"
Expand Down
4 changes: 4 additions & 0 deletions src/ore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub mod hint;
pub mod id_gen;
pub mod iter;
pub mod lex;
pub mod memory;
#[cfg_attr(nightly_doc_features, doc(cfg(feature = "metrics")))]
#[cfg(feature = "metrics")]
pub mod metrics;
Expand All @@ -70,6 +71,9 @@ pub mod pager;
pub mod panic;
pub mod path;
pub mod permutations;
#[cfg_attr(nightly_doc_features, doc(cfg(all(feature = "pager", unix))))]
#[cfg(all(feature = "pager", unix))]
pub mod pool;
#[cfg(feature = "process")]
pub mod process;
#[cfg(feature = "region")]
Expand Down
Loading
Loading