From f2202a67e296c467ddd7ba68b93e7c626763643d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 27 Jun 2026 16:17:03 -0400 Subject: [PATCH 1/5] Initial tidy --- differential-dataflow/src/operators/count.rs | 5 +--- differential-dataflow/src/operators/reduce.rs | 5 +--- .../src/operators/threshold.rs | 5 +--- differential-dataflow/src/trace/cursor/mod.rs | 28 +++++++++++++++++++ differential-dataflow/src/trace/mod.rs | 20 +------------ 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index b0de75c62..90cfe43d9 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -74,7 +74,6 @@ where move |(input, _frontier), output| { - let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); // Downgrade previous upper limit to be current lower limit. @@ -88,7 +87,6 @@ where } for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order - batch_cursors.push(batch.cursor()); batch_storage.push(batch); } }); @@ -97,8 +95,7 @@ where let mut session = output.session(&capability); - use crate::trace::cursor::CursorList; - let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage); + let (mut batch_cursor, batch_storage) = crate::trace::cursor::cursor_list(batch_storage); let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap(); while let Some(key) = batch_cursor.get_key(&batch_storage) { diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 592c360a2..3331932eb 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -14,7 +14,6 @@ use timely::dataflow::channels::pact::Pipeline; use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; -use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; use crate::trace::TraceReader; @@ -94,7 +93,6 @@ where // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)` // may result in outputs at `(1, 1)` as well, even with no input at that time. - let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); // Downgrade previous upper limit to be current lower limit. @@ -106,7 +104,6 @@ where capabilities.insert(capability.retain(0)); for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); - batch_cursors.push(batch.cursor()); batch_storage.push(batch); } }); @@ -129,7 +126,7 @@ where // cursors for navigating input and output traces. let (mut source_cursor, ref source_storage) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let (mut output_cursor, ref output_storage) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor"); - let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage); + let (mut batch_cursor, ref batch_storage) = crate::trace::cursor::cursor_list(batch_storage); // Prepare an output buffer and builder for each capability. // TODO: It would be better if all updates went into one batch, but timely dataflow prevents diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index f712ae7ba..8be3887aa 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -124,7 +124,6 @@ where move |(input, _frontier), output| { - let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); // Downgrde previous upper limit to be current lower limit. @@ -138,7 +137,6 @@ where } for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order - batch_cursors.push(batch.cursor()); batch_storage.push(batch); } }); @@ -147,8 +145,7 @@ where let mut session = output.session(&capability); - use crate::trace::cursor::CursorList; - let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage); + let (mut batch_cursor, batch_storage) = crate::trace::cursor::cursor_list(batch_storage); let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap(); while let Some(key) = batch_cursor.get_key(&batch_storage) { diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index 71ebc0966..aea507d6f 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -11,6 +11,34 @@ pub use self::cursor_list::CursorList; use crate::trace::implementations::LayoutExt; +/// A batch type that has a cursor for navigation. +/// +/// This is the entry point for accessing batch data through cursors, and the place that opinions +/// about keys and values are introduced (via the `Cursor` associated type). Cut-and-merge assembly +/// is the trace's concern: [`TraceReader::cursor_storage`](crate::trace::TraceReader::cursor_storage) +/// selects the batches and the defaulted +/// [`TraceReader::cursor_through`](crate::trace::TraceReader::cursor_through) builds a [`CursorList`] +/// over their per-batch cursors. +pub trait Navigable { + + /// The cursor type. + type Cursor: Cursor; + + /// Acquire a cursor suitable for the instance. + fn cursor(&self) -> Self::Cursor; +} + +/// Assembles a merged cursor over a sequence of batches. +/// +/// The batches become the cursor's storage and are returned alongside the cursor; they must be kept +/// alive and handed to the cursor's navigation methods. This is the shared assembly behind +/// `TraceReader::cursor_through` and the per-round input cursors in `reduce` / `count` / `threshold`. +pub fn cursor_list(batches: Vec) -> (CursorList, Vec) { + let cursors = batches.iter().map(|batch| batch.cursor()).collect::>(); + let cursor = CursorList::new(cursors, &batches); + (cursor, batches) +} + /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor : LayoutExt { diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 65e7abdd8..5c5c8407d 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -27,21 +27,6 @@ use crate::trace::implementations::LayoutExt; /// A type used to express how much effort a trace should exert even in the absence of updates. pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize)])->Option+Send+Sync>; -// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and -// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys -// and vals that change the `Ord` implementation, or stash hash codes, or the like. -// -// This complicates what requirements we make so that the trace is still usable by someone who knows only about -// the base key and value types. For example, the complex types should likely dereference to the simpler types, -// so that the user can make sense of the result as if they were given references to the simpler types. At the -// same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint) -// and we should, somehow, be able to take a reference to the simple types to compare against the more complex -// types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and -// really don't need anything more complex than a reference, but we can't form an owned copy of the complex type -// without cloning it. -// -// We could just start by cloning things. Worry about wrapping references later on. - /// A trace whose contents may be read. /// /// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods @@ -99,10 +84,7 @@ pub trait TraceReader : LayoutExt { /// The cursor is a [`CursorList`] that merges the cursors of the batches returned by /// [`cursor_storage`](TraceReader::cursor_storage); see that method for the contract on `upper`. fn cursor_through(&mut self, upper: AntichainRef) -> Option<(CursorList<::Cursor>, Vec)> { - let storage = self.cursor_storage(upper)?; - let cursors = storage.iter().map(|batch| batch.cursor()).collect::>(); - let cursor = CursorList::new(cursors, &storage); - Some((cursor, storage)) + Some(self::cursor::cursor_list(self.cursor_storage(upper)?)) } /// Advances the frontier that constrains logical compaction. From fab483ae35ee238a7835d9fd8894bda39d71d9a0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 27 Jun 2026 21:44:34 -0400 Subject: [PATCH 2/5] Move layout off Trace/Batch onto the Cursor; remove LayoutExt Trace and Batch are now key/val-blind, carrying only Time. The borrowed/owned key, val, time, and diff types and their containers move onto the Cursor (and Chunk), reached via `::Cursor`. The LayoutExt trait and its blanket impl are deleted; Layout and WithLayout remain for batch construction only. This is behavior-preserving: every cursor still exists and is found the same way at runtime; only where the key/val/layout types are named has moved. All lib tests pass unchanged. Operators read key/val/diff from the batch cursor's GATs rather than the trace, so `as LayoutExt` becomes `as Cursor` throughout. join_core (Arranged and Collection) gained a diff type-param to pin a Multiply-on-projection bound the solver would not otherwise connect. A shared trace::BatchCursor alias names a trace's cursor type. Co-Authored-By: Claude Opus 4.8 --- .../src/algorithms/graphs/bfs.rs | 7 +- .../src/algorithms/graphs/bijkstra.rs | 7 +- .../src/algorithms/graphs/propagate.rs | 7 +- differential-dataflow/src/collection.rs | 37 ++++--- .../src/columnar/collection/operators.rs | 2 +- .../src/columnar/trace/chunk.rs | 31 +++++- .../src/operators/arrange/agent.rs | 6 +- .../src/operators/arrange/arrangement.rs | 72 +++++++------ .../src/operators/arrange/upsert.rs | 15 +-- differential-dataflow/src/operators/count.rs | 28 ++--- differential-dataflow/src/operators/join.rs | 9 +- differential-dataflow/src/operators/reduce.rs | 27 ++--- .../src/operators/threshold.rs | 26 ++--- differential-dataflow/src/trace/chunk/mod.rs | 67 +++++++++--- differential-dataflow/src/trace/chunk/vec.rs | 31 +++++- .../src/trace/cursor/cursor_list.rs | 21 +++- differential-dataflow/src/trace/cursor/mod.rs | 48 ++++++++- .../src/trace/implementations/mod.rs | 57 ---------- .../src/trace/implementations/ord_neu.rs | 48 ++++++++- .../src/trace/implementations/spine_fueled.rs | 6 +- differential-dataflow/src/trace/mod.rs | 100 ++++++++---------- .../src/trace/wrappers/enter.rs | 70 +++++------- .../src/trace/wrappers/enter_at.rs | 80 ++++++-------- .../src/trace/wrappers/frontier.rs | 56 +++++----- dogsdogsdogs/src/operators/count.rs | 11 +- dogsdogsdogs/src/operators/half_join.rs | 39 +++---- dogsdogsdogs/src/operators/half_join2.rs | 30 +++--- dogsdogsdogs/src/operators/lookup_map.rs | 21 ++-- dogsdogsdogs/src/operators/propose.rs | 37 ++++--- dogsdogsdogs/src/operators/validate.rs | 18 ++-- 30 files changed, 568 insertions(+), 446 deletions(-) diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 8a9428993..05c3454ac 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -18,14 +18,17 @@ where bfs_arranged(edges, roots) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Navigable, TraceReader}; +use crate::trace::Cursor; use crate::operators::arrange::Arranged; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)> where N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Diff=isize>, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index fce691af5..71c7d210d 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -29,7 +29,8 @@ where bidijkstra_arranged(forward, reverse, goals) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Navigable, TraceReader}; +use crate::trace::Cursor; use crate::operators::arrange::Arranged; /// Bi-directional Dijkstra search using arranged forward and reverse edge collections. @@ -40,7 +41,9 @@ pub fn bidijkstra_arranged<'scope, N, Tr>( ) -> VecCollection<'scope, Tr::Time, ((N,N), u32)> where N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Diff=isize>, { let outer = forward.stream.scope(); outer.iterative::(|inner| { diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index a1afe57ff..f6555384a 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -43,7 +43,8 @@ where propagate_core(edges.arrange_by_key(), nodes, logic) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Navigable, TraceReader}; +use crate::trace::Cursor; use crate::operators::arrange::arrangement::Arranged; /// Propagates labels forward, retaining the minimum label. @@ -58,7 +59,9 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=R, Time: Hash>+Clone+'static, + Tr: TraceReader+Clone+'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Diff=R>, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index a319f5c0b..631e580bb 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -329,6 +329,8 @@ pub mod vec { use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::hashable::Hashable; + use crate::trace::{BatchCursor, Navigable}; + use crate::trace::Cursor; /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers. /// @@ -781,9 +783,11 @@ pub mod vec { /// ``` pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent> where - T2: for<'a> Trace= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, + T2: Trace+'static, + T2::Batch: Navigable, + for<'a> BatchCursor: Cursor= &'a K, ValOwn = V, Diff: Abelian>, + Bu: Builder as Cursor>::Diff)>, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V, as Cursor>::Diff)>)+'static, { self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -800,9 +804,11 @@ pub mod vec { pub fn reduce_core(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent> where V: Clone+'static, - T2: for<'a> Trace=&'a K, ValOwn = V, Time=T>+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, + T2: Trace+'static, + T2::Batch: Navigable, + for<'a> BatchCursor: Cursor=&'a K, ValOwn = V>, + Bu: Builder as Cursor>::Diff)>, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V, as Cursor>::Diff)>, &mut Vec<(V, as Cursor>::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_core::<_,Bu,_,_>( @@ -962,9 +968,11 @@ pub mod vec { pub fn consolidate_named(self, name: &str, reify: F) -> Self where Ba: crate::trace::Batcher, Time=T> + 'static, - Tr: for<'a> crate::trace::Trace+'static, + Tr: crate::trace::Trace+'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor, Bu: crate::trace::Builder, Output=Tr::Batch>, - F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, + F: Fn( as Cursor>::Key<'_>, as Cursor>::Val<'_>) -> D + 'static, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) @@ -1237,12 +1245,17 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn join_core (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,>::Output> + pub fn join_core (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,>::Output> where - Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=T>+Clone+'static, - R: Multiply, + Tr2: crate::trace::TraceReader+Clone+'static, + Tr2::Batch: Navigable, + for<'a> BatchCursor: Cursor=&'a K>, + // Pin the cursor diff to a named param `R2`: a `Multiply` bound on a projection does not + // connect to its use-site (the solver normalizes the use but not the bound's subject). + BatchCursor: Cursor, + R: Multiply, I: IntoIterator, - L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, + L: FnMut(&K,&V, as Cursor>::Val<'_>)->I+'static, { self.arrange_by_key() .join_core(stream2, result) diff --git a/differential-dataflow/src/columnar/collection/operators.rs b/differential-dataflow/src/columnar/collection/operators.rs index f3208b104..c5f00d540 100644 --- a/differential-dataflow/src/columnar/collection/operators.rs +++ b/differential-dataflow/src/columnar/collection/operators.rs @@ -160,7 +160,7 @@ where { use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Pipeline; - use crate::trace::{BatchReader, Cursor}; + use crate::trace::{Navigable, Cursor}; use crate::AsCollection; arranged.stream diff --git a/differential-dataflow/src/columnar/trace/chunk.rs b/differential-dataflow/src/columnar/trace/chunk.rs index 75c363bd3..6f371d7a0 100644 --- a/differential-dataflow/src/columnar/trace/chunk.rs +++ b/differential-dataflow/src/columnar/trace/chunk.rs @@ -41,7 +41,7 @@ use timely::progress::frontier::AntichainRef; use crate::consolidation::Consolidate; use crate::lattice::Lattice; use crate::trace::cursor::Cursor; -use crate::trace::implementations::{BatchContainer, WithLayout}; +use crate::trace::implementations::{BatchContainer, Layout, WithLayout}; use crate::columnar::layout::{ColumnarLayout, ColumnarUpdate, Coltainer}; use crate::columnar::updates::{child_range, UpdatesBuilder, UpdatesTyped}; @@ -205,6 +205,22 @@ impl WithLayout for ColChunkCursor { impl Cursor for ColChunkCursor { type Storage = ColChunk; + type KeyContainer = as Layout>::KeyContainer; + type Key<'a> = < as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>; + type ValContainer = as Layout>::ValContainer; + type Val<'a> = < as Layout>::ValContainer as BatchContainer>::ReadItem<'a>; + type ValOwn = < as Layout>::ValContainer as BatchContainer>::Owned; + type TimeContainer = as Layout>::TimeContainer; + type TimeGat<'a> = < as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>; + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + type DiffContainer = as Layout>::DiffContainer; + type DiffGat<'a> = < as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>; + type Diff = < as Layout>::DiffContainer as BatchContainer>::Owned; + #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { < as Layout>::ValContainer as BatchContainer>::into_owned(val) } + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { < as Layout>::TimeContainer as BatchContainer>::into_owned(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { < as Layout>::DiffContainer as BatchContainer>::into_owned(diff) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { < as Layout>::TimeContainer as BatchContainer>::clone_onto(time, onto) } + fn key_valid(&self, s: &Self::Storage) -> bool { self.key_cursor < s.trie().view().keys.values.len() } fn val_valid(&self, s: &Self::Storage) -> bool { let view = s.trie().view(); @@ -283,6 +299,18 @@ impl Chunk for ColChunk where U::Time: 'static { type Cursor = ColChunkCursor; + type KeyContainer = as Layout>::KeyContainer; + type Key<'a> = < as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>; + type ValContainer = as Layout>::ValContainer; + type Val<'a> = < as Layout>::ValContainer as BatchContainer>::ReadItem<'a>; + type ValOwn = < as Layout>::ValContainer as BatchContainer>::Owned; + type TimeContainer = as Layout>::TimeContainer; + type TimeGat<'a> = < as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>; + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + type DiffContainer = as Layout>::DiffContainer; + type DiffGat<'a> = < as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>; + type Diff = < as Layout>::DiffContainer as BatchContainer>::Owned; + const TARGET: usize = TARGET; fn cursor(&self) -> Self::Cursor { @@ -530,6 +558,7 @@ mod test { use super::{ColChunk, Chunk}; use crate::columnar::updates::UpdatesTyped; use crate::trace::chunk::merge_chains; + use crate::trace::Navigable; type Upd = (u64, u64, u64, i64); diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 41f964072..47fafaf62 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -35,13 +35,9 @@ pub struct TraceAgent { logging: Option, } -use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgent { - type Layout = Tr::Layout; -} - impl TraceReader for TraceAgent { + type Time = Tr::Time; type Batch = Tr::Batch; fn set_logical_compaction(&mut self, frontier: AntichainRef) { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 06ba1f005..a9027cd06 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use timely::dataflow::operators::Capability; use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; -use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; +use crate::trace::{self, Trace, TraceReader, Navigable, Batcher, Builder, Cursor, BatchCursor}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -99,8 +99,9 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// not vary with the new timestamp coordinate. pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt> where + Tr::Batch: Navigable, TInner: Refines+Lattice+'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, + F: FnMut( as Cursor>::Key<'_>, as Cursor>::Val<'_>, as Cursor>::TimeGat<'_>)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -138,9 +139,10 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff> + pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, as Cursor>::Diff> where - L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static, + Tr::Batch: Navigable, + L: FnMut( as Cursor>::Key<'_>, as Cursor>::Val<'_>) -> D+'static, { self.flat_map_ref(move |key, val| Some(logic(key,val))) } @@ -154,11 +156,12 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// The method takes `K` and `V` as generic arguments, in order to constrain the reference types to support /// cloning into owned types. If this bound does not work, the `as_collection` method allows arbitrary logic /// on the reference types. - pub fn as_vecs(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff> + pub fn as_vecs(self) -> VecCollection<'scope, Tr::Time, (K, V), as Cursor>::Diff> where K: crate::ExchangeData, V: crate::ExchangeData, - Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor = &'a K, Val<'a> = &'a V>, { self.flat_map_ref(move |key, val| [(key.clone(), val.clone())]) } @@ -167,10 +170,11 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// /// The supplied logic may produce an iterator over output values, allowing either /// filtering or flat mapping as part of the extraction. - pub fn flat_map_ref(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> + pub fn flat_map_ref(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, as Cursor>::Diff> where + Tr::Batch: Navigable, I: IntoIterator, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, + L: FnMut( as Cursor>::Key<'_>, as Cursor>::Val<'_>) -> I+'static, { Self::flat_map_batches(self.stream, logic) } @@ -182,10 +186,11 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// /// This method exists for streams of batches without the corresponding arrangement. /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. - pub fn flat_map_batches(stream: Stream<'scope, Tr::Time, Vec>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff> + pub fn flat_map_batches(stream: Stream<'scope, Tr::Time, Vec>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, as Cursor>::Diff> where + Tr::Batch: Navigable, I: IntoIterator, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, + L: FnMut( as Cursor>::Key<'_>, as Cursor>::Val<'_>) -> I+'static, { stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { input.for_each(|time, data| { @@ -197,7 +202,7 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { while let Some(val) = cursor.get_val(batch) { for datum in logic(key, val) { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff))); + session.give((datum.clone(), as Cursor>::owned_time(time), as Cursor>::owned_diff(diff))); }); } cursor.step_val(batch); @@ -218,14 +223,22 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { /// A convenience method to join and produce `VecCollection` output. /// /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion. - pub fn join_core(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,>::Output> + pub fn join_core(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,>::Output> where - Tr2: for<'a> TraceReader=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static, - Tr1::Diff: Multiply, + Tr1::Batch: Navigable, + Tr2: TraceReader+Clone+'static, + Tr2::Batch: Navigable, + // Pin the cursor diffs to named params `R1`/`R2`: a `Multiply` bound on a projection + // does not connect to its use-site (the solver normalizes the use but not the bound's + // subject), so we constrain plain params instead. + BatchCursor: Cursor, + BatchCursor: Cursor, + for<'a> BatchCursor: Cursor = as Cursor>::Key<'a>>, + R1: Multiply + Clone, I: IntoIterator, - L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static + L: FnMut( as Cursor>::Key<'_>, as Cursor>::Val<'_>, as Cursor>::Val<'_>)->I+'static { - let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| { + let mut result = move |k: as Cursor>::Key<'_>, v1: as Cursor>::Val<'_>, v2: as Cursor>::Val<'_>, t: Tr1::Time, r1: &R1, r2: &R2| { let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) }; @@ -250,15 +263,13 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { /// A direct implementation of `ReduceCore::reduce_abelian`. pub fn reduce_abelian(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent> where - Tr2: for<'a> Trace< - Key<'a>= Tr1::Key<'a>, - ValOwn: Data, - Time=Tr1::Time, - Diff: Abelian, - >+'static, + Tr1::Batch: Navigable, + Tr2: Trace+'static, + Tr2::Batch: Navigable, + for<'a> BatchCursor: Cursor = as Cursor>::Key<'a>, ValOwn: Data, Diff: Abelian>, Bu: Builder, - L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, - P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, + L: FnMut( as Cursor>::Key<'_>, &[( as Cursor>::Val<'_>, as Cursor>::Diff)], &mut Vec<( as Cursor>::ValOwn, as Cursor>::Diff)>)+'static, + P: FnMut(&mut Bu::Input, as Cursor>::Key<'_>, &mut Vec<( as Cursor>::ValOwn, Tr2::Time, as Cursor>::Diff)>) + 'static, { self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| { if !input.is_empty() { @@ -272,14 +283,13 @@ impl<'scope, Tr1: TraceReader+'static> Arranged<'scope, Tr1> { /// A direct implementation of `ReduceCore::reduce_core`. pub fn reduce_core(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent> where - Tr2: for<'a> Trace< - Key<'a>=Tr1::Key<'a>, - ValOwn: Data, - Time=Tr1::Time, - >+'static, + Tr1::Batch: Navigable, + Tr2: Trace+'static, + Tr2::Batch: Navigable, + for<'a> BatchCursor: Cursor = as Cursor>::Key<'a>, ValOwn: Data>, Bu: Builder, - L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static, - P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static, + L: FnMut( as Cursor>::Key<'_>, &[( as Cursor>::Val<'_>, as Cursor>::Diff)], &mut Vec<( as Cursor>::ValOwn, as Cursor>::Diff)>, &mut Vec<( as Cursor>::ValOwn, as Cursor>::Diff)>)+'static, + P: FnMut(&mut Bu::Input, as Cursor>::Key<'_>, &mut Vec<( as Cursor>::ValOwn, Tr2::Time, as Cursor>::Diff)>) + 'static, { use crate::operators::reduce::reduce_trace; reduce_trace::<_,Bu,_,_,_>(self, name, logic, push) diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 60fb51f26..c43b98958 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -109,7 +109,7 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; -use crate::trace::{self, Trace, TraceReader, Cursor}; +use crate::trace::{self, Trace, TraceReader, Cursor, Navigable, BatchCursor}; use crate::{ExchangeData, Hashable}; use crate::trace::implementations::containers::BatchContainer; @@ -133,13 +133,14 @@ pub fn arrange_from_upsert<'scope, Bu, Tr, K, V>( where K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, - Tr: for<'a> Trace< + Tr: Trace+'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor< Key<'a> = &'a K, Val<'a> = &'a V, - Time: TotalOrder+ExchangeData, Diff=isize, - >+'static, - Bu: Builder, Output = Tr::Batch>, + >, + Bu: Builder as Cursor>::Diff)>, Output = Tr::Batch>, { let mut reader: Option> = None; @@ -234,7 +235,7 @@ where // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); let mut builder = Bu::new(); - let mut key_con = Tr::KeyContainer::with_capacity(1); + let mut key_con = as Cursor>::KeyContainer::with_capacity(1); for (key, mut list) in to_process { key_con.clear(); key_con.push_ref(&key); @@ -248,7 +249,7 @@ where // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; - trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff)); + trace_cursor.map_times(&trace_storage, |_time, diff| count += as Cursor>::owned_diff(diff)); assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 90cfe43d9..ab695041f 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -11,7 +11,7 @@ use crate::difference::{IsZero, Semigroup}; use crate::hashable::Hashable; use crate::collection::AsCollection; use crate::operators::arrange::Arranged; -use crate::trace::{BatchReader, Cursor, TraceReader}; +use crate::trace::{BatchCursor, BatchReader, Cursor, Navigable, TraceReader}; /// Extension trait for the `count` differential dataflow method. pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice, K: ExchangeData, R: Semigroup> : Sized { @@ -52,17 +52,19 @@ where } } -impl<'scope, K, Tr> CountTotal<'scope, Tr::Time, K, Tr::Diff> for Arranged<'scope, Tr> +impl<'scope, K, Tr> CountTotal<'scope, Tr::Time, K, as Cursor>::Diff> for Arranged<'scope, Tr> where - Tr: for<'a> TraceReader< + Tr: TraceReader + Clone + 'static, + Tr::Batch: Navigable, + for<'a> BatchCursor: Cursor< Key<'a> = &'a K, - Val<'a>=&'a (), - Time: TotalOrder, - Diff: ExchangeData+Semigroup> - >+Clone+'static, + Val<'a> = &'a (), + Time = Tr::Time, + Diff: ExchangeData + Semigroup< as Cursor>::DiffGat<'a>>, + >, K: ExchangeData, { - fn count_total_core + 'static>(self) -> VecCollection<'scope, Tr::Time, (K, Tr::Diff), R2> { + fn count_total_core + 'static>(self) -> VecCollection<'scope, Tr::Time, (K, as Cursor>::Diff), R2> { let mut trace = self.trace.clone(); @@ -99,13 +101,13 @@ where let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap(); while let Some(key) = batch_cursor.get_key(&batch_storage) { - let mut count: Option = None; + let mut count: Option< as Cursor>::Diff> = None; trace_cursor.seek_key(&trace_storage, key); if trace_cursor.get_key(&trace_storage) == Some(key) { trace_cursor.map_times(&trace_storage, |_, diff| { count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(Tr::owned_diff(diff)); } + if count.is_none() { count = Some( as Cursor>::owned_diff(diff)); } }); } @@ -113,14 +115,14 @@ where if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.clone(), count.clone()), Tr::owned_time(time), R2::from(-1i8))); + session.give(((key.clone(), count.clone()), as Cursor>::owned_time(time), R2::from(-1i8))); } } count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(Tr::owned_diff(diff)); } + if count.is_none() { count = Some( as Cursor>::owned_diff(diff)); } if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.clone(), count.clone()), Tr::owned_time(time), R2::from(1i8))); + session.give(((key.clone(), count.clone()), as Cursor>::owned_time(time), R2::from(1i8))); } } }); diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 15829b605..4bd5368e7 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -16,7 +16,7 @@ use timely::dataflow::operators::Capability; use crate::lattice::Lattice; use crate::operators::arrange::Arranged; -use crate::trace::{BatchReader, Cursor}; +use crate::trace::{BatchCursor, BatchReader, Navigable, Cursor}; use crate::operators::ValueHistory; use crate::trace::TraceReader; @@ -69,8 +69,11 @@ impl, D> PushInto for EffortBuilder { pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, arranged2: Arranged<'scope, Tr2>, mut result: L) -> Stream<'scope, Tr1::Time, CB::Container> where Tr1: TraceReader+'static, - Tr2: for<'a> TraceReader=Tr1::Key<'a>, Time = Tr1::Time>+'static, - L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession>)+'static, + Tr1::Batch: Navigable, + Tr2: TraceReader