diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 8a9428993..1a419b8a5 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -18,14 +18,15 @@ where bfs_arranged(edges, roots) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader}; use crate::operators::arrange::Arranged; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)> where N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: TraceReader+Clone+'static, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=isize>, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index fce691af5..4d2034382 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -29,7 +29,7 @@ where bidijkstra_arranged(forward, reverse, goals) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader}; use crate::operators::arrange::Arranged; /// Bi-directional Dijkstra search using arranged forward and reverse edge collections. @@ -40,7 +40,8 @@ pub fn bidijkstra_arranged<'scope, N, Tr>( ) -> VecCollection<'scope, Tr::Time, ((N,N), u32)> where N: ExchangeData+Hash, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static, + Tr: TraceReader+Clone+'static, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=isize>, { let outer = forward.stream.scope(); outer.iterative::(|inner| { diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index a1afe57ff..a691b3279 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -43,7 +43,7 @@ where propagate_core(edges.arrange_by_key(), nodes, logic) } -use crate::trace::TraceReader; +use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader}; use crate::operators::arrange::arrangement::Arranged; /// Propagates labels forward, retaining the minimum label. @@ -58,7 +58,8 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=R, Time: Hash>+Clone+'static, + Tr: TraceReader+Clone+'static, + for<'a> BatchCursor: Cursor=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=R>, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index a319f5c0b..5defe78c2 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -329,6 +329,8 @@ pub mod vec { use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::hashable::Hashable; + use crate::trace::{BatchCursor, BatchDiff, BatchKey, BatchVal, Navigable}; + use crate::trace::Cursor; /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers. /// @@ -781,9 +783,10 @@ pub mod vec { /// ``` pub fn reduce_abelian(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent> where - T2: for<'a> Trace= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, + T2: Trace+'static, + for<'a> BatchCursor: Cursor= &'a K, ValOwn = V, Time = T2::Time, Diff: Abelian>, + Bu: Builder)>, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V, BatchDiff)>)+'static, { self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -800,9 +803,10 @@ pub mod vec { pub fn reduce_core(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent> where V: Clone+'static, - T2: for<'a> Trace=&'a K, ValOwn = V, Time=T>+'static, - Bu: Builder, Output = T2::Batch>, - L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, + T2: Trace+'static, + for<'a> BatchCursor: Cursor=&'a K, ValOwn = V, Time = T2::Time>, + Bu: Builder)>, Output = T2::Batch>, + L: FnMut(&K, &[(&V, R)], &mut Vec<(V,BatchDiff)>, &mut Vec<(V, BatchDiff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_core::<_,Bu,_,_>( @@ -962,9 +966,10 @@ pub mod vec { pub fn consolidate_named(self, name: &str, reify: F) -> Self where Ba: crate::trace::Batcher, Time=T> + 'static, - Tr: for<'a> crate::trace::Trace+'static, + Tr: crate::trace::Trace+'static, + for<'a> BatchCursor: Cursor, Bu: crate::trace::Builder, Output=Tr::Batch>, - F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, + F: Fn(BatchKey<'_, Tr>, BatchVal<'_, Tr>) -> D + 'static, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) @@ -1237,12 +1242,16 @@ pub mod vec { /// .assert_eq(z); /// }); /// ``` - pub fn join_core (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,>::Output> + pub fn join_core (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,>::Output> where - Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=T>+Clone+'static, - R: Multiply, + Tr2: crate::trace::TraceReader+Clone+'static, + for<'a> BatchCursor: Cursor=&'a K>, + // Pin the cursor diff to a named param `R2`: a `Multiply` bound on a projection does not + // connect to its use-site (the solver normalizes the use but not the bound's subject). + BatchCursor: Cursor, + R: Multiply, I: IntoIterator, - L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, + L: FnMut(&K,&V,BatchVal<'_, Tr2>)->I+'static, { self.arrange_by_key() .join_core(stream2, result) diff --git a/differential-dataflow/src/columnar/collection/operators.rs b/differential-dataflow/src/columnar/collection/operators.rs index f3208b104..c5f00d540 100644 --- a/differential-dataflow/src/columnar/collection/operators.rs +++ b/differential-dataflow/src/columnar/collection/operators.rs @@ -160,7 +160,7 @@ where { use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Pipeline; - use crate::trace::{BatchReader, Cursor}; + use crate::trace::{Navigable, Cursor}; use crate::AsCollection; arranged.stream diff --git a/differential-dataflow/src/columnar/trace/chunk.rs b/differential-dataflow/src/columnar/trace/chunk.rs index 75c363bd3..f4cb6718f 100644 --- a/differential-dataflow/src/columnar/trace/chunk.rs +++ b/differential-dataflow/src/columnar/trace/chunk.rs @@ -40,8 +40,9 @@ use timely::progress::frontier::AntichainRef; use crate::consolidation::Consolidate; use crate::lattice::Lattice; +use crate::trace::Navigable; use crate::trace::cursor::Cursor; -use crate::trace::implementations::{BatchContainer, WithLayout}; +use crate::trace::implementations::{BatchContainer, Layout, WithLayout}; use crate::columnar::layout::{ColumnarLayout, ColumnarUpdate, Coltainer}; use crate::columnar::updates::{child_range, UpdatesBuilder, UpdatesTyped}; @@ -205,6 +206,18 @@ impl WithLayout for ColChunkCursor { impl Cursor for ColChunkCursor { type Storage = ColChunk; + type KeyContainer = as Layout>::KeyContainer; + type Key<'a> = < as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>; + type ValContainer = as Layout>::ValContainer; + type Val<'a> = < as Layout>::ValContainer as BatchContainer>::ReadItem<'a>; + type ValOwn = < as Layout>::ValContainer as BatchContainer>::Owned; + type TimeContainer = as Layout>::TimeContainer; + type TimeGat<'a> = < as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>; + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + type DiffContainer = as Layout>::DiffContainer; + type DiffGat<'a> = < as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>; + type Diff = < as Layout>::DiffContainer as BatchContainer>::Owned; + fn key_valid(&self, s: &Self::Storage) -> bool { self.key_cursor < s.trie().view().keys.values.len() } fn val_valid(&self, s: &Self::Storage) -> bool { let view = s.trie().view(); @@ -279,19 +292,24 @@ fn consolidated(merged: UpdatesTyped) -> UpdatesTyped { if merged.diffs.values.len() == merged.times.values.len() { merged } else { merged.filter_zero() } } -impl Chunk for ColChunk +impl Navigable for ColChunk where U::Time: 'static { type Cursor = ColChunkCursor; - const TARGET: usize = TARGET; - fn cursor(&self) -> Self::Cursor { ColChunkCursor { key_cursor: 0, val_cursor: 0, phantom: PhantomData } } +} + +impl Chunk for ColChunk +where U::Time: 'static { + type Time = < as Layout>::TimeContainer as BatchContainer>::Owned; + + const TARGET: usize = TARGET; fn bounds(&self) -> ( - (Self::Key<'_>, Self::Val<'_>, Self::TimeGat<'_>), - (Self::Key<'_>, Self::Val<'_>, Self::TimeGat<'_>), + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), + (::Key<'_>, ::Val<'_>, ::TimeGat<'_>), ) { match self { ColChunk::Resident(rc) => { @@ -530,6 +548,7 @@ mod test { use super::{ColChunk, Chunk}; use crate::columnar::updates::UpdatesTyped; use crate::trace::chunk::merge_chains; + use crate::trace::Navigable; type Upd = (u64, u64, u64, i64); @@ -625,7 +644,7 @@ mod test { #[test] fn cursor_handles_straddle() { use crate::trace::cursor::Cursor; - use crate::trace::{BatchReader, Description}; + use crate::trace::Description; use crate::trace::chunk::ChunkBatch; use timely::progress::Antichain; @@ -656,7 +675,7 @@ mod test { // resumable merge -> advance -> settle pipeline end to end. #[test] fn batch_merger_resumable_matches_reference() { - use crate::trace::{BatchReader, Description, Merger}; + use crate::trace::{Description, Merger}; use crate::trace::chunk::{ChunkBatch, ChunkBatchMerger, is_graded}; use crate::trace::cursor::Cursor; use crate::consolidation::consolidate_updates; diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 41f964072..47fafaf62 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -35,13 +35,9 @@ pub struct TraceAgent { logging: Option, } -use crate::trace::implementations::WithLayout; -impl WithLayout for TraceAgent { - type Layout = Tr::Layout; -} - impl TraceReader for TraceAgent { + type Time = Tr::Time; type Batch = Tr::Batch; fn set_logical_compaction(&mut self, frontier: AntichainRef) { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 06ba1f005..0e908cf6d 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use timely::dataflow::operators::Capability; use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; -use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; +use crate::trace::{self, Trace, TraceReader, Navigable, Batcher, Builder, Cursor, BatchCursor, BatchDiff, BatchKey, BatchTimeGat, BatchVal, BatchValOwn}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -99,8 +99,9 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// not vary with the new timestamp coordinate. pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt> where + Tr::Batch: Navigable, TInner: Refines+Lattice+'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, + F: FnMut(BatchKey<'_, Tr>, BatchVal<'_, Tr>, BatchTimeGat<'_, Tr>)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -138,9 +139,11 @@ impl<'scope, Tr: TraceReader> Arranged<'scope, Tr> { /// The underlying `Stream>>` is a much more efficient way to access the data, /// and this method should only be used when the data need to be transformed or exchanged, rather than /// supplied as arguments to an operator using the same key-value structure. - pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff> + pub fn as_collection(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, BatchDiff> where - L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static, + Tr::Batch: Navigable, + BatchCursor: Cursor