Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ where
bfs_arranged(edges, roots)
}

use crate::trace::TraceReader;
use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader};
use crate::operators::arrange::Arranged;

/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs_arranged<'scope, N, Tr>(edges: Arranged<'scope, Tr>, roots: VecCollection<'scope, Tr::Time, N>) -> VecCollection<'scope, Tr::Time, (N, u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Batch: Navigable>+Clone+'static,
for<'a> BatchCursor<Tr>: Cursor<Key<'a>=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=isize>,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
5 changes: 3 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
bidijkstra_arranged(forward, reverse, goals)
}

use crate::trace::TraceReader;
use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader};
use crate::operators::arrange::Arranged;

/// Bi-directional Dijkstra search using arranged forward and reverse edge collections.
Expand All @@ -40,7 +40,8 @@ pub fn bidijkstra_arranged<'scope, N, Tr>(
) -> VecCollection<'scope, Tr::Time, ((N,N), u32)>
where
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Batch: Navigable>+Clone+'static,
for<'a> BatchCursor<Tr>: Cursor<Key<'a>=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=isize>,
{
let outer = forward.stream.scope();
outer.iterative::<u64,_,_>(|inner| {
Expand Down
5 changes: 3 additions & 2 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ where
propagate_core(edges.arrange_by_key(), nodes, logic)
}

use crate::trace::TraceReader;
use crate::trace::{BatchCursor, Cursor, Navigable, TraceReader};
use crate::operators::arrange::arrangement::Arranged;

/// Propagates labels forward, retaining the minimum label.
Expand All @@ -58,7 +58,8 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R, Time: Hash>+Clone+'static,
Tr: TraceReader<Batch: Navigable, Time: Hash>+Clone+'static,
for<'a> BatchCursor<Tr>: Cursor<Key<'a>=&'a N, Val<'a>=&'a N, Time=Tr::Time, Diff=R>,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
33 changes: 21 additions & 12 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ pub mod vec {
use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
use crate::hashable::Hashable;
use crate::trace::{BatchCursor, BatchDiff, BatchKey, BatchVal, Navigable};
use crate::trace::Cursor;

/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
///
Expand Down Expand Up @@ -781,9 +783,10 @@ pub mod vec {
/// ```
pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
T2: Trace<Batch: Navigable, Time=T>+'static,
for<'a> BatchCursor<T2>: Cursor<Key<'a>= &'a K, ValOwn = V, Time = T2::Time, Diff: Abelian>,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, BatchDiff<T2>)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, BatchDiff<T2>)>)+'static,
{
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
if !input.is_empty() { logic(key, input, change); }
Expand All @@ -800,9 +803,10 @@ pub mod vec {
pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
where
V: Clone+'static,
T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=T>+'static,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
T2: Trace<Batch: Navigable, Time=T>+'static,
for<'a> BatchCursor<T2>: Cursor<Key<'a>=&'a K, ValOwn = V, Time = T2::Time>,
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, BatchDiff<T2>)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,BatchDiff<T2>)>, &mut Vec<(V, BatchDiff<T2>)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core::<_,Bu,_,_>(
Expand Down Expand Up @@ -962,9 +966,10 @@ pub mod vec {
pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
where
Ba: crate::trace::Batcher<Output=Vec<((D, ()), T, R)>, Time=T> + 'static,
Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
Tr: crate::trace::Trace<Batch: Navigable, Time=T>+'static,
for<'a> BatchCursor<Tr>: Cursor<Time=Tr::Time, Diff=R>,
Bu: crate::trace::Builder<Time=Tr::Time, Input=Vec<((D, ()), T, R)>, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
F: Fn(BatchKey<'_, Tr>, BatchVal<'_, Tr>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
Expand Down Expand Up @@ -1237,12 +1242,16 @@ pub mod vec {
/// .assert_eq(z);
/// });
/// ```
pub fn join_core<Tr2,I,L> (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,<R as Multiply<Tr2::Diff>>::Output>
pub fn join_core<Tr2,I,L,R2> (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,<R as Multiply<R2>>::Output>
where
Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=T>+Clone+'static,
R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
Tr2: crate::trace::TraceReader<Batch: Navigable, Time=T>+Clone+'static,
for<'a> BatchCursor<Tr2>: Cursor<Key<'a>=&'a K>,
// Pin the cursor diff to a named param `R2`: a `Multiply` bound on a projection does not
// connect to its use-site (the solver normalizes the use but not the bound's subject).
BatchCursor<Tr2>: Cursor<Diff = R2, Time = T>,
R: Multiply<R2, Output: Semigroup+'static>,
I: IntoIterator<Item: crate::Data>,
L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
L: FnMut(&K,&V,BatchVal<'_, Tr2>)->I+'static,
{
self.arrange_by_key()
.join_core(stream2, result)
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/columnar/collection/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ where
{
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
use crate::trace::{BatchReader, Cursor};
use crate::trace::{Navigable, Cursor};
use crate::AsCollection;

arranged.stream
Expand Down
35 changes: 27 additions & 8 deletions differential-dataflow/src/columnar/trace/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use timely::progress::frontier::AntichainRef;

use crate::consolidation::Consolidate;
use crate::lattice::Lattice;
use crate::trace::Navigable;
use crate::trace::cursor::Cursor;
use crate::trace::implementations::{BatchContainer, WithLayout};
use crate::trace::implementations::{BatchContainer, Layout, WithLayout};

use crate::columnar::layout::{ColumnarLayout, ColumnarUpdate, Coltainer};
use crate::columnar::updates::{child_range, UpdatesBuilder, UpdatesTyped};
Expand Down Expand Up @@ -205,6 +206,18 @@ impl<U: ColumnarUpdate> WithLayout for ColChunkCursor<U> {
impl<U: ColumnarUpdate> Cursor for ColChunkCursor<U> {
type Storage = ColChunk<U>;

type KeyContainer = <ColumnarLayout<U> as Layout>::KeyContainer;
type Key<'a> = <<ColumnarLayout<U> as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
type ValContainer = <ColumnarLayout<U> as Layout>::ValContainer;
type Val<'a> = <<ColumnarLayout<U> as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
type ValOwn = <<ColumnarLayout<U> as Layout>::ValContainer as BatchContainer>::Owned;
type TimeContainer = <ColumnarLayout<U> as Layout>::TimeContainer;
type TimeGat<'a> = <<ColumnarLayout<U> as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
type Time = <<ColumnarLayout<U> as Layout>::TimeContainer as BatchContainer>::Owned;
type DiffContainer = <ColumnarLayout<U> as Layout>::DiffContainer;
type DiffGat<'a> = <<ColumnarLayout<U> as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
type Diff = <<ColumnarLayout<U> as Layout>::DiffContainer as BatchContainer>::Owned;

fn key_valid(&self, s: &Self::Storage) -> bool { self.key_cursor < s.trie().view().keys.values.len() }
fn val_valid(&self, s: &Self::Storage) -> bool {
let view = s.trie().view();
Expand Down Expand Up @@ -279,19 +292,24 @@ fn consolidated<U: ColumnarUpdate>(merged: UpdatesTyped<U>) -> UpdatesTyped<U> {
if merged.diffs.values.len() == merged.times.values.len() { merged } else { merged.filter_zero() }
}

impl<U: ColumnarUpdate> Chunk for ColChunk<U>
impl<U: ColumnarUpdate> Navigable for ColChunk<U>
where U::Time: 'static {
type Cursor = ColChunkCursor<U>;

const TARGET: usize = TARGET;

fn cursor(&self) -> Self::Cursor {
ColChunkCursor { key_cursor: 0, val_cursor: 0, phantom: PhantomData }
}
}

impl<U: ColumnarUpdate> Chunk for ColChunk<U>
where U::Time: 'static {
type Time = <<ColumnarLayout<U> as Layout>::TimeContainer as BatchContainer>::Owned;

const TARGET: usize = TARGET;

fn bounds(&self) -> (
(Self::Key<'_>, Self::Val<'_>, Self::TimeGat<'_>),
(Self::Key<'_>, Self::Val<'_>, Self::TimeGat<'_>),
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
(<Self::Cursor as Cursor>::Key<'_>, <Self::Cursor as Cursor>::Val<'_>, <Self::Cursor as Cursor>::TimeGat<'_>),
) {
match self {
ColChunk::Resident(rc) => {
Expand Down Expand Up @@ -530,6 +548,7 @@ mod test {
use super::{ColChunk, Chunk};
use crate::columnar::updates::UpdatesTyped;
use crate::trace::chunk::merge_chains;
use crate::trace::Navigable;

type Upd = (u64, u64, u64, i64);

Expand Down Expand Up @@ -625,7 +644,7 @@ mod test {
#[test]
fn cursor_handles_straddle() {
use crate::trace::cursor::Cursor;
use crate::trace::{BatchReader, Description};
use crate::trace::Description;
use crate::trace::chunk::ChunkBatch;
use timely::progress::Antichain;

Expand Down Expand Up @@ -656,7 +675,7 @@ mod test {
// resumable merge -> advance -> settle pipeline end to end.
#[test]
fn batch_merger_resumable_matches_reference() {
use crate::trace::{BatchReader, Description, Merger};
use crate::trace::{Description, Merger};
use crate::trace::chunk::{ChunkBatch, ChunkBatchMerger, is_graded};
use crate::trace::cursor::Cursor;
use crate::consolidation::consolidate_updates;
Expand Down
6 changes: 1 addition & 5 deletions differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@
logging: Option<crate::logging::Logger>,
}

use crate::trace::implementations::WithLayout;
impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
type Layout = Tr::Layout;
}

impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {

type Time = Tr::Time;
type Batch = Tr::Batch;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
Expand Down Expand Up @@ -287,7 +283,7 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 286 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down Expand Up @@ -420,7 +416,7 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 419 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
Loading
Loading