Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {

type Batch = Tr::Batch;
type Storage = Tr::Storage;
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
Expand All @@ -68,8 +66,8 @@
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
fn cursor_storage(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<Vec<Self::Batch>> {
self.trace.borrow_mut().trace.cursor_storage(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
}
Expand Down Expand Up @@ -289,7 +287,7 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 290 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down Expand Up @@ -422,7 +420,7 @@
let activator = scope.activator_for(Rc::clone(&info.address));
let queue = self.new_listener(activator);

let activator = scope.activator_for(info.address);

Check warning on line 423 in differential-dataflow/src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`activator` shadows a previous, unrelated binding
*shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));

capabilities.borrow_mut().as_mut().unwrap().insert(capability);
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@
if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {

// cursors for navigating input and output traces.
let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
let (mut source_cursor, ref source_storage) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
let (mut output_cursor, ref output_storage) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);

// Prepare an output buffer and builder for each capability.
Expand Down Expand Up @@ -269,7 +269,7 @@
pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);

// Update `capabilities` to reflect pending times.
let mut frontier = Antichain::<Tr1::Time>::new();

Check warning on line 272 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`mut frontier` shadows a previous, unrelated binding
let mut owned_time = Tr1::Time::minimum();
for pos in 0 .. pending_time.len() {
Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
Expand Down Expand Up @@ -360,7 +360,7 @@
}
}
#[inline(never)]
pub fn compute<'a, K, C1, C2, C3, L>(

Check warning on line 363 in differential-dataflow/src/operators/reduce.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

this function has too many arguments (10/7)
&mut self,
key: K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
Expand Down
19 changes: 4 additions & 15 deletions differential-dataflow/src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@


use crate::logging::Logger;
use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use crate::trace::cursor::CursorList;
use crate::trace::{Batch, Trace, TraceReader, ExertionLogic};
use crate::trace::Merger;

use ::timely::dataflow::operators::generic::OperatorInfo;
Expand Down Expand Up @@ -107,18 +106,14 @@ impl<B: Batch> WithLayout for Spine<B> {
impl<B: Batch+Clone+'static> TraceReader for Spine<B> {

type Batch = B;
type Storage = Vec<B>;
type Cursor = CursorList<<B as BatchReader>::Cursor>;

fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_storage(&mut self, upper: AntichainRef<Self::Time>) -> Option<Vec<Self::Batch>> {

// If `upper` is the minimum frontier, we can return an empty cursor.
// This can happen with operators that are written to expect the ability to acquire cursors
// for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly.
if upper.less_equal(&<Self::Time as timely::progress::Timestamp>::minimum()) {
let cursors = Vec::new();
let storage = Vec::new();
return Some((CursorList::new(cursors, &storage), storage));
return Some(Vec::new());
}

// The supplied `upper` should have the property that for each of our
Expand All @@ -139,7 +134,6 @@ impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
// assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1))));
assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper));

let mut cursors = Vec::new();
let mut storage = Vec::new();

for merge_state in self.merging.iter().rev() {
Expand All @@ -148,17 +142,14 @@ impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
match variant {
MergeVariant::InProgress(batch1, batch2, _) => {
if !batch1.is_empty() {
cursors.push(batch1.cursor());
storage.push(batch1.clone());
}
if !batch2.is_empty() {
cursors.push(batch2.cursor());
storage.push(batch2.clone());
}
},
MergeVariant::Complete(Some((batch, _))) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
Expand All @@ -167,7 +158,6 @@ impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
},
MergeState::Single(Some(batch)) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
},
Expand Down Expand Up @@ -197,13 +187,12 @@ impl<B: Batch+Clone+'static> TraceReader for Spine<B> {

// include pending batches
if include_upper {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
}

Some((CursorList::new(cursors, &storage), storage))
Some(storage)
}
#[inline]
fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
Expand Down
49 changes: 22 additions & 27 deletions differential-dataflow/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use timely::progress::Timestamp;

use crate::logging::Logger;
pub use self::cursor::Cursor;
use self::cursor::CursorList;
pub use self::description::Description;

use crate::trace::implementations::LayoutExt;
Expand Down Expand Up @@ -69,30 +70,21 @@ pub trait TraceReader : LayoutExt {
>;


/// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
type Storage;

/// The type used to enumerate the collections contents.
type Cursor:
Cursor<Storage=Self::Storage> +
WithLayout<Layout = Self::Layout> +
for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;

/// Acquires the non-empty sequence of batches a cursor would draw from, restricted to updates
/// at times not greater or equal to an element of `upper`.
///
/// This is the sole primitive each `TraceReader` must implement to expose its contents: the
/// `cursor` and `cursor_through` methods assemble a [`CursorList`] over these batches' cursors.
/// The returned `Vec` plays the role of the cursor's storage (the cursor borrows from it).
///
/// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
/// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
/// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return the batches. This
/// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
fn cursor_storage(&mut self, upper: AntichainRef<Self::Time>) -> Option<Vec<Self::Batch>>;

/// Provides a cursor over updates contained in the trace.
fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
fn cursor(&mut self) -> (CursorList<<Self::Batch as BatchReader>::Cursor>, Vec<Self::Batch>) {
if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
cursor
}
Expand All @@ -104,11 +96,14 @@ pub trait TraceReader : LayoutExt {
/// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
/// equal to an element of `upper`.
///
/// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
/// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
/// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
/// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
/// The cursor is a [`CursorList`] that merges the cursors of the batches returned by
/// [`cursor_storage`](TraceReader::cursor_storage); see that method for the contract on `upper`.
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(CursorList<<Self::Batch as BatchReader>::Cursor>, Vec<Self::Batch>)> {
let storage = self.cursor_storage(upper)?;
let cursors = storage.iter().map(|batch| batch.cursor()).collect::<Vec<_>>();
let cursor = CursorList::new(cursors, &storage);
Some((cursor, storage))
}

/// Advances the frontier that constrains logical compaction.
///
Expand Down
70 changes: 3 additions & 67 deletions differential-dataflow/src/trace/wrappers/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ where
TInner: Refines<Tr::Time>+Lattice,
{
type Batch = BatchEnter<Tr::Batch, TInner>;
type Storage = Tr::Storage;
type Cursor = CursorEnter<Tr::Cursor, TInner>;

fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
self.trace.map_batches(|batch| {
Expand Down Expand Up @@ -84,12 +82,13 @@ where
self.stash2.borrow()
}

fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_storage(&mut self, upper: AntichainRef<TInner>) -> Option<Vec<Self::Batch>> {
self.stash1.clear();
for time in upper.iter() {
self.stash1.insert(time.clone().to_outer());
}
self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x), y))
let storage = self.trace.cursor_storage(self.stash1.borrow())?;
Some(storage.into_iter().map(|batch| BatchEnter::make_from(batch)).collect())
}
}

Expand Down Expand Up @@ -162,70 +161,7 @@ where
}
}

/// Wrapper to provide cursor to nested scope.
pub struct CursorEnter<C, TInner> {
phantom: ::std::marker::PhantomData<TInner>,
cursor: C,
}

use crate::trace::implementations::{Layout, WithLayout};
impl<C, TInner> WithLayout for CursorEnter<C, TInner>
where
C: Cursor,
TInner: Refines<C::Time>+Lattice,
{
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<TInner>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}

impl<C, TInner> CursorEnter<C, TInner> {
fn new(cursor: C) -> Self {
CursorEnter {
phantom: ::std::marker::PhantomData,
cursor,
}
}
}

impl<C, TInner> Cursor for CursorEnter<C, TInner>
where
C: Cursor,
TInner: Refines<C::Time>+Lattice,
{
type Storage = C::Storage;

#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }

#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
self.cursor.map_times(storage, |time, diff| {
logic(&TInner::to_inner(C::owned_time(time)), diff)
})
}

#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }

#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }

#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}



/// Wrapper to provide cursor to nested scope.
pub struct BatchCursorEnter<C, TInner> {
Expand Down
77 changes: 4 additions & 73 deletions differential-dataflow/src/trace/wrappers/enter_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ where
G: FnMut(&TInner)->Tr::Time+Clone+'static,
{
type Batch = BatchEnter<Tr::Batch, TInner,F>;
type Storage = Tr::Storage;
type Cursor = CursorEnter<Tr::Cursor, TInner,F>;

fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
let logic = self.logic.clone();
Expand Down Expand Up @@ -105,12 +103,14 @@ where
self.stash2.borrow()
}

fn cursor_through(&mut self, upper: AntichainRef<TInner>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_storage(&mut self, upper: AntichainRef<TInner>) -> Option<Vec<Self::Batch>> {
self.stash1.clear();
for time in upper.iter() {
self.stash1.insert(time.clone().to_outer());
}
self.trace.cursor_through(self.stash1.borrow()).map(|(x,y)| (CursorEnter::new(x, self.logic.clone()), y))
let logic = self.logic.clone();
let storage = self.trace.cursor_storage(self.stash1.borrow())?;
Some(storage.into_iter().map(|batch| BatchEnter::make_from(batch, logic.clone())).collect())
}
}

Expand Down Expand Up @@ -189,76 +189,7 @@ where
}
}

/// Wrapper to provide cursor to nested scope.
pub struct CursorEnter<C, TInner, F> {
phantom: ::std::marker::PhantomData<TInner>,
cursor: C,
logic: F,
}

use crate::trace::implementations::{Layout, WithLayout};
impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
where
C: Cursor,
TInner: Refines<C::Time>+Lattice,
{
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<TInner>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}

impl<C, TInner, F> CursorEnter<C, TInner, F> {
fn new(cursor: C, logic: F) -> Self {
CursorEnter {
phantom: ::std::marker::PhantomData,
cursor,
logic,
}
}
}

impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
where
C: Cursor,
TInner: Refines<C::Time>+Lattice,
F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>)->TInner,
{
type Storage = C::Storage;

#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }

#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }

#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }

#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) {
let key = self.key(storage);
let val = self.val(storage);
let logic2 = &mut self.logic;
self.cursor.map_times(storage, |time, diff| {
logic(&logic2(key, val, time), diff)
})
}

#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }

#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }

#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}



/// Wrapper to provide cursor to nested scope.
pub struct BatchCursorEnter<C, TInner, F> {
Expand Down
Loading
Loading