From 99f2a388879cd720371c723fc212a7368dc0ec13 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 27 Jun 2026 12:40:34 -0400 Subject: [PATCH] Remove `TraceReader::Cursor`; hard-wire `CursorList` Replace the `Cursor` and `Storage` associated types with a single `cursor_storage(upper) -> Option>` primitive, and default `cursor`/`cursor_through` to assemble a `CursorList` over those batches' cursors. The cursor is now always `CursorList<::Cursor>` and the storage `Vec`. The time wrappers (enter, enter_at, frontier) consequently wrap each *batch* cursor INSIDE the `CursorList` rather than wrapping the whole cursor outside it; their now-dead outside cursors (`CursorEnter`, `CursorFrontier`) are removed. `Spine::cursor_through` becomes `cursor_storage` (returns the batch vector), and `TraceAgent` delegates `cursor_storage` to its inner trace. Results are bit-identical and the full test suite passes; the change is performance-neutral on the enter/enter_at and general cursor paths. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/operators/arrange/agent.rs | 6 +- differential-dataflow/src/operators/reduce.rs | 4 +- .../src/trace/implementations/spine_fueled.rs | 19 +---- differential-dataflow/src/trace/mod.rs | 49 ++++++------ .../src/trace/wrappers/enter.rs | 70 +---------------- .../src/trace/wrappers/enter_at.rs | 77 +------------------ .../src/trace/wrappers/frontier.rs | 72 +---------------- 7 files changed, 40 insertions(+), 257 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index d58957356..41f964072 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -43,8 +43,6 @@ impl WithLayout for TraceAgent { impl TraceReader for TraceAgent { type Batch = Tr::Batch; - type Storage = Tr::Storage; - type Cursor = Tr::Cursor; fn set_logical_compaction(&mut self, frontier: AntichainRef) { // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`. @@ -68,8 +66,8 @@ impl TraceReader for TraceAgent { fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() } - fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { - self.trace.borrow_mut().trace.cursor_through(frontier) + fn cursor_storage(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option> { + self.trace.borrow_mut().trace.cursor_storage(frontier) } fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 60b4406c5..592c360a2 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -127,8 +127,8 @@ where if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) { // cursors for navigating input and output traces. - let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); - let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); + let (mut source_cursor, ref source_storage) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); + let (mut output_cursor, ref output_storage) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); // Prepare an output buffer and builder for each capability. diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index 0df6b3bc8..5cdb115da 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -70,8 +70,7 @@ use crate::logging::Logger; -use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; -use crate::trace::cursor::CursorList; +use crate::trace::{Batch, Trace, TraceReader, ExertionLogic}; use crate::trace::Merger; use ::timely::dataflow::operators::generic::OperatorInfo; @@ -107,18 +106,14 @@ impl WithLayout for Spine { impl TraceReader for Spine { type Batch = B; - type Storage = Vec; - type Cursor = CursorList<::Cursor>; - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_storage(&mut self, upper: AntichainRef) -> Option> { // If `upper` is the minimum frontier, we can return an empty cursor. // This can happen with operators that are written to expect the ability to acquire cursors // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly. if upper.less_equal(&::minimum()) { - let cursors = Vec::new(); - let storage = Vec::new(); - return Some((CursorList::new(cursors, &storage), storage)); + return Some(Vec::new()); } // The supplied `upper` should have the property that for each of our @@ -139,7 +134,6 @@ impl TraceReader for Spine { // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1)))); assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper)); - let mut cursors = Vec::new(); let mut storage = Vec::new(); for merge_state in self.merging.iter().rev() { @@ -148,17 +142,14 @@ impl TraceReader for Spine { match variant { MergeVariant::InProgress(batch1, batch2, _) => { if !batch1.is_empty() { - cursors.push(batch1.cursor()); storage.push(batch1.clone()); } if !batch2.is_empty() { - cursors.push(batch2.cursor()); storage.push(batch2.clone()); } }, MergeVariant::Complete(Some((batch, _))) => { if !batch.is_empty() { - cursors.push(batch.cursor()); storage.push(batch.clone()); } } @@ -167,7 +158,6 @@ impl TraceReader for Spine { }, MergeState::Single(Some(batch)) => { if !batch.is_empty() { - cursors.push(batch.cursor()); storage.push(batch.clone()); } }, @@ -197,13 +187,12 @@ impl TraceReader for Spine { // include pending batches if include_upper { - cursors.push(batch.cursor()); storage.push(batch.clone()); } } } - Some((CursorList::new(cursors, &storage), storage)) + Some(storage) } #[inline] fn set_logical_compaction(&mut self, frontier: AntichainRef) { diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 61ee30d65..65e7abdd8 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -19,6 +19,7 @@ use timely::progress::Timestamp; use crate::logging::Logger; pub use self::cursor::Cursor; +use self::cursor::CursorList; pub use self::description::Description; use crate::trace::implementations::LayoutExt; @@ -69,30 +70,21 @@ pub trait TraceReader : LayoutExt { >; - /// Storage type for `Self::Cursor`. Likely related to `Self::Batch`. - type Storage; - - /// The type used to enumerate the collections contents. - type Cursor: - Cursor + - WithLayout + - for<'a> LayoutExt< - Key<'a> = Self::Key<'a>, - Val<'a> = Self::Val<'a>, - ValOwn = Self::ValOwn, - Time = Self::Time, - TimeGat<'a> = Self::TimeGat<'a>, - Diff = Self::Diff, - DiffGat<'a> = Self::DiffGat<'a>, - KeyContainer = Self::KeyContainer, - ValContainer = Self::ValContainer, - TimeContainer = Self::TimeContainer, - DiffContainer = Self::DiffContainer, - >; - + /// Acquires the non-empty sequence of batches a cursor would draw from, restricted to updates + /// at times not greater or equal to an element of `upper`. + /// + /// This is the sole primitive each `TraceReader` must implement to expose its contents: the + /// `cursor` and `cursor_through` methods assemble a [`CursorList`] over these batches' cursors. + /// The returned `Vec` plays the role of the cursor's storage (the cursor borrows from it). + /// + /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from + /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should + /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return the batches. This + /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. + fn cursor_storage(&mut self, upper: AntichainRef) -> Option>; /// Provides a cursor over updates contained in the trace. - fn cursor(&mut self) -> (Self::Cursor, Self::Storage) { + fn cursor(&mut self) -> (CursorList<::Cursor>, Vec) { if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) { cursor } @@ -104,11 +96,14 @@ pub trait TraceReader : LayoutExt { /// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or /// equal to an element of `upper`. /// - /// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from - /// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should - /// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This - /// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses. - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)>; + /// The cursor is a [`CursorList`] that merges the cursors of the batches returned by + /// [`cursor_storage`](TraceReader::cursor_storage); see that method for the contract on `upper`. + fn cursor_through(&mut self, upper: AntichainRef) -> Option<(CursorList<::Cursor>, Vec)> { + let storage = self.cursor_storage(upper)?; + let cursors = storage.iter().map(|batch| batch.cursor()).collect::>(); + let cursor = CursorList::new(cursors, &storage); + Some((cursor, storage)) + } /// Advances the frontier that constrains logical compaction. /// diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 137fe5496..f4f9909fd 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -45,8 +45,6 @@ where TInner: Refines+Lattice, { type Batch = BatchEnter; - type Storage = Tr::Storage; - type Cursor = CursorEnter; fn map_batches(&self, mut f: F) { self.trace.map_batches(|batch| { @@ -84,12 +82,13 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_storage(&mut self, upper: AntichainRef) -> Option> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); } - self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y)) + let storage = self.trace.cursor_storage(self.stash1.borrow())?; + Some(storage.into_iter().map(|batch| BatchEnter::make_from(batch)).collect()) } } @@ -162,70 +161,7 @@ where } } -/// Wrapper to provide cursor to nested scope. -pub struct CursorEnter { - phantom: ::std::marker::PhantomData, - cursor: C, -} - use crate::trace::implementations::{Layout, WithLayout}; -impl WithLayout for CursorEnter -where - C: Cursor, - TInner: Refines+Lattice, -{ - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl CursorEnter { - fn new(cursor: C) -> Self { - CursorEnter { - phantom: ::std::marker::PhantomData, - cursor, - } - } -} - -impl Cursor for CursorEnter -where - C: Cursor, - TInner: Refines+Lattice, -{ - type Storage = C::Storage; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } - - #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - self.cursor.map_times(storage, |time, diff| { - logic(&TInner::to_inner(C::owned_time(time)), diff) - }) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } -} - - /// Wrapper to provide cursor to nested scope. pub struct BatchCursorEnter { diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 2851eaf38..84fcbaf68 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -65,8 +65,6 @@ where G: FnMut(&TInner)->Tr::Time+Clone+'static, { type Batch = BatchEnter; - type Storage = Tr::Storage; - type Cursor = CursorEnter; fn map_batches(&self, mut f: F2) { let logic = self.logic.clone(); @@ -105,12 +103,14 @@ where self.stash2.borrow() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_storage(&mut self, upper: AntichainRef) -> Option> { self.stash1.clear(); for time in upper.iter() { self.stash1.insert(time.clone().to_outer()); } - self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y)) + let logic = self.logic.clone(); + let storage = self.trace.cursor_storage(self.stash1.borrow())?; + Some(storage.into_iter().map(|batch| BatchEnter::make_from(batch, logic.clone())).collect()) } } @@ -189,76 +189,7 @@ where } } -/// Wrapper to provide cursor to nested scope. -pub struct CursorEnter { - phantom: ::std::marker::PhantomData, - cursor: C, - logic: F, -} - use crate::trace::implementations::{Layout, WithLayout}; -impl WithLayout for CursorEnter -where - C: Cursor, - TInner: Refines+Lattice, -{ - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl CursorEnter { - fn new(cursor: C, logic: F) -> Self { - CursorEnter { - phantom: ::std::marker::PhantomData, - cursor, - logic, - } - } -} - -impl Cursor for CursorEnter -where - C: Cursor, - TInner: Refines+Lattice, - F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner, -{ - type Storage = C::Storage; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } - - #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - let key = self.key(storage); - let val = self.val(storage); - let logic2 = &mut self.logic; - self.cursor.map_times(storage, |time, diff| { - logic(&logic2(key, val, time), diff) - }) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } -} - - /// Wrapper to provide cursor to nested scope. pub struct BatchCursorEnter { diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 5d521831b..de8b29c35 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -44,8 +44,6 @@ impl WithLayout for TraceFrontier { impl TraceReader for TraceFrontier { type Batch = BatchFrontier; - type Storage = Tr::Storage; - type Cursor = CursorFrontier; fn map_batches(&self, mut f: F) { let since = self.since.borrow(); @@ -59,10 +57,11 @@ impl TraceReader for TraceFrontier { fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_storage(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option> { + let storage = self.trace.cursor_storage(upper)?; let since = self.since.borrow(); let until = self.until.borrow(); - self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y)) + Some(storage.into_iter().map(|batch| BatchFrontier::make_from(batch, since, until)).collect()) } } @@ -118,72 +117,7 @@ impl BatchFrontier { } } -/// Wrapper to provide cursor to nested scope. -pub struct CursorFrontier { - cursor: C, - since: Antichain, - until: Antichain -} - use crate::trace::implementations::{Layout, WithLayout}; -impl WithLayout for CursorFrontier { - type Layout = ( - ::KeyContainer, - ::ValContainer, - Vec, - ::DiffContainer, - ::OffsetContainer, - ); -} - -impl CursorFrontier { - fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { - CursorFrontier { - cursor, - since: since.to_owned(), - until: until.to_owned(), - } - } -} - -impl Cursor for CursorFrontier { - - type Storage = C::Storage; - - #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } - #[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) } - - #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } - #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - - #[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_key(storage) } - #[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option> { self.cursor.get_val(storage) } - - #[inline] - fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { - let since = self.since.borrow(); - let until = self.until.borrow(); - let mut temp: C::Time = ::minimum(); - self.cursor.map_times(storage, |time, diff| { - C::clone_time_onto(time, &mut temp); - temp.advance_by(since); - if !until.less_equal(&temp) { - logic(&temp, diff); - } - }) - } - - #[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) } - #[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) } - - #[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) } - #[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) } - - #[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) } - #[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) } -} - - /// Wrapper to provide cursor to nested scope. pub struct BatchCursorFrontier {