From 6bf1eeee2786a253ad9bafc3cacdfb9029ea9eaa Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 25 Jun 2026 12:25:28 -0500 Subject: [PATCH 1/2] fix(merge-insert): apply Delete/Fail on the indexed-scan path An indexed merge-insert delete by a composite primary key whose columns are all indexed silently removed nothing: `merge_insert(when_matched = Delete, use_index = true)` reported `deleted = 0` and the rows stayed live, resurfacing in later reads. `WhenMatched::Delete` is only implemented in the v2 plan (`DeleteOnlyMergeInsertExec`), which never uses a scalar index. When every join column is indexed, `can_use_create_plan` routes the merge to the legacy `Merger` instead, whose matched-row handler only ever distinguished `DoNothing` from "update" -- it folded both `Delete` and `Fail` into the update path. So a fully-indexed delete rewrote the matched rows in place (0 deletes), and a fully-indexed `Fail` silently updated instead of erroring. A single indexed column hits the same path, so this was never composite-specific. Make the legacy `Merger` dispatch every `WhenMatched` variant explicitly, mirroring the v2 classifier (`merge_insert_action`): - `Delete` collects matched row ids as deletions and emits no replacement. - `Fail` aborts on any match with the same message as the v2 path. - A delete-only commit branch drains the merger and applies the deletions (resolving ids to addresses via the row-id index for stable row ids) without writing fragments -- keeping the O(keys) indexed delete rather than falling back to a full table scan. - The partial-schema update commit branch cannot express deletions, so combining `Delete` with inserts from a partial-schema source now returns a descriptive error instead of silently dropping the deletes. Tests cover composite and single-column indexed delete, multi-fragment and stable-row-id variants, an unindexed-remainder fragment, delete combined with insert, and `Fail` on an indexed key. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/dataset/write/merge_insert.rs | 608 ++++++++++++++++--- 1 file changed, 528 insertions(+), 80 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 3889bdc66d5..af5868e0e7d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1785,7 +1785,66 @@ impl MergeInsertJob { .try_flatten(); let stream = RecordBatchStreamAdapter::new(merger_schema, stream); - let (operation, affected_rows) = if !is_full_schema { + // A partial-schema source can patch columns or delete matched rows, but + // not both in one commit: the writer rejects subschema rows up front and + // the delete cannot be folded into the write. Reject that combination. + if !is_full_schema + && matches!(self.params.when_matched, WhenMatched::Delete) + && self.params.insert_not_matched + { + return Err(Error::not_supported_source("Combining when_matched(Delete) with inserts from a partial-schema source is not supported; provide the full target schema in the source".into())); + } + + // The commit strategy follows what the merge does to matched rows. A + // pure delete (no inserts) writes nothing: the merger emits no batches + // and only records the matched row ids. This holds for any source schema + // width, so it is keyed on the operation rather than `is_full_schema`. + let is_delete_only = matches!(self.params.when_matched, WhenMatched::Delete) + && !self.params.insert_not_matched; + + let (operation, affected_rows) = if is_delete_only { + // Consume the stream so the merger records the matched row ids in + // `deleted_rows`; it produces no batches. + let drained: Vec = Box::pin(stream).try_collect().await?; + debug_assert!(drained.is_empty(), "delete-only merge must not emit rows"); + + let removed_row_ids = Arc::into_inner(deleted_rows).unwrap().into_inner().unwrap(); + let removed_row_addr_vec = + if let Some(row_id_index) = get_row_id_index(&self.dataset).await? { + removed_row_ids + .iter() + .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) + .collect::>() + } else { + removed_row_ids + }; + let removed_row_addrs = RoaringTreemap::from_iter(removed_row_addr_vec.into_iter()); + + let (updated_fragments, removed_fragment_ids) = + Self::apply_deletions(&self.dataset, &removed_row_addrs).await?; + + let operation = Operation::Update { + removed_fragment_ids, + updated_fragments, + new_fragments: vec![], + fields_modified: vec![], + merged_generations: self.params.merged_generations.clone(), + fields_for_preserving_frag_bitmap: full_schema + .fields + .iter() + .map(|f| f.id as u32) + .collect(), + update_mode: Some(RewriteRows), + inserted_rows_filter: None, // not implemented for v1 + updated_fragment_offsets: None, + }; + + let affected_rows = Some(RowAddrTreeMap::from(removed_row_addrs)); + (operation, affected_rows) + } else if !is_full_schema { + // Non-delete partial-schema merge: patch the provided columns into + // existing fragments in place. (Delete is handled above; a wider + // full source takes the row-rewrite branch below.) if !matches!( self.params.delete_not_matched_by_source, WhenNotMatchedBySource::Keep @@ -2341,98 +2400,122 @@ impl Merger { // borrow checker (the stream needs to be `sync` since it crosses an await point) let mut deleted_row_ids = self.deleted_rows.lock().unwrap(); - if self.params.when_matched != WhenMatched::DoNothing { - let mut matched = arrow::compute::filter_record_batch(&batch, &in_both)?; - - if let Some(match_filter) = self.match_filter_expr { - let unzipped = unzip_batch(&matched, &self.schema); - let filtered = match_filter.evaluate(&unzipped)?; - match filtered { - ColumnarValue::Array(mask) => { - // Some rows matched, filter down and replace those rows - matched = arrow::compute::filter_record_batch(&matched, mask.as_boolean())?; - } - ColumnarValue::Scalar(scalar) => { - if let ScalarValue::Boolean(Some(true)) = scalar { - // All rows matched, go ahead and replace the whole batch - } else { - // Nothing matched, replace nothing - matched = RecordBatch::new_empty(matched.schema()); + // Each `WhenMatched` variant handles the matched rows (`in_both`) + // differently. + let match_filter_expr = self.match_filter_expr; + match &self.params.when_matched { + WhenMatched::DoNothing => {} + WhenMatched::Delete => { + // Matched rows are removed, not rewritten: record their row ids + // for the commit to delete and emit no replacement batch. + let matched_row_ids = arrow::compute::filter(batch.column(row_id_col), &in_both)?; + let row_ids = matched_row_ids.as_primitive::(); + merge_statistics.num_deleted_rows += row_ids.len() as u64; + deleted_row_ids.extend(row_ids.values()); + } + WhenMatched::Fail => { + // Any matched row aborts the whole operation. + if let Some(row_idx) = (0..in_both.len()).find(|&i| in_both.value(i)) { + return Err(DataFusionError::Execution(format!( + "Merge insert failed: found matching row with key values: {}", + format_key_values_on_columns(&batch, row_idx, &self.params.on) + ))); + } + } + WhenMatched::UpdateAll | WhenMatched::UpdateIf(_) | WhenMatched::UpdateIfExpr(_) => { + let mut matched = arrow::compute::filter_record_batch(&batch, &in_both)?; + + if let Some(match_filter) = match_filter_expr { + let unzipped = unzip_batch(&matched, &self.schema); + let filtered = match_filter.evaluate(&unzipped)?; + match filtered { + ColumnarValue::Array(mask) => { + // Some rows matched, filter down and replace those rows + matched = + arrow::compute::filter_record_batch(&matched, mask.as_boolean())?; + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(Some(true)) = scalar { + // All rows matched, go ahead and replace the whole batch + } else { + // Nothing matched, replace nothing + matched = RecordBatch::new_empty(matched.schema()); + } } } } - } - merge_statistics.num_updated_rows += matched.num_rows() as u64; + merge_statistics.num_updated_rows += matched.num_rows() as u64; - // If the filter eliminated all rows then its important we don't try and write - // the batch at all. Writing an empty batch currently panics - if matched.num_rows() > 0 { - let row_ids = matched.column(row_id_col).as_primitive::(); + // If the filter eliminated all rows then its important we don't try and write + // the batch at all. Writing an empty batch currently panics + if matched.num_rows() > 0 { + let row_ids = matched.column(row_id_col).as_primitive::(); - let mut processed_row_ids = self.processed_row_ids.lock().unwrap(); - let mut keep_indices: Vec = Vec::with_capacity(matched.num_rows()); - for (row_idx, &row_id) in row_ids.values().iter().enumerate() { - if processed_row_ids.insert(row_id) { - keep_indices.push(row_idx as u32); - } else { - match self.params.source_dedupe_behavior { - SourceDedupeBehavior::Fail => { - return Err(create_duplicate_row_error( - &matched, - row_idx, - &self.params.on, - )); - } - SourceDedupeBehavior::FirstSeen => { - // Skip this duplicate row (don't add to keep_indices) + let mut processed_row_ids = self.processed_row_ids.lock().unwrap(); + let mut keep_indices: Vec = Vec::with_capacity(matched.num_rows()); + for (row_idx, &row_id) in row_ids.values().iter().enumerate() { + if processed_row_ids.insert(row_id) { + keep_indices.push(row_idx as u32); + } else { + match self.params.source_dedupe_behavior { + SourceDedupeBehavior::Fail => { + return Err(create_duplicate_row_error( + &matched, + row_idx, + &self.params.on, + )); + } + SourceDedupeBehavior::FirstSeen => { + // Skip this duplicate row (don't add to keep_indices) + } } } } - } - drop(processed_row_ids); - - // Filter out duplicate rows if any were skipped - let num_skipped = matched.num_rows() - keep_indices.len(); - if num_skipped > 0 { - merge_statistics.num_skipped_duplicates += num_skipped as u64; - merge_statistics.num_updated_rows -= num_skipped as u64; + drop(processed_row_ids); - let indices = UInt32Array::from(keep_indices); - matched = take_record_batch(&matched, &indices)?; - } + // Filter out duplicate rows if any were skipped + let num_skipped = matched.num_rows() - keep_indices.len(); + if num_skipped > 0 { + merge_statistics.num_skipped_duplicates += num_skipped as u64; + merge_statistics.num_updated_rows -= num_skipped as u64; - // Only process and write if there are remaining rows after filtering duplicates - if matched.num_rows() > 0 { - // Get row_ids again after filtering (if any duplicates were removed) - let row_ids = matched.column(row_id_col).as_primitive::(); - deleted_row_ids.extend(row_ids.values()); - if self.enable_stable_row_ids { - self.updating_row_ids - .lock() - .unwrap() - .capture(row_ids.values())?; + let indices = UInt32Array::from(keep_indices); + matched = take_record_batch(&matched, &indices)?; } - let projection = if let Some(row_addr_col) = row_addr_col { - let mut cols = Vec::from_iter(left_cols.iter().cloned()); - cols.push(row_addr_col); - cols - } else { - #[allow(clippy::redundant_clone)] - left_cols.clone() - }; - let matched = matched.project(&projection)?; - // The payload columns of an outer join are always nullable. We need to restore - // non-nullable to columns that were originally non-nullable. This should be safe - // since the not_matched rows should all be valid on the right_cols - // - // Sadly we can't use with_schema because it doesn't let you toggle nullability - let matched = RecordBatch::try_new( - self.output_schema.clone(), - Vec::from_iter(matched.columns().iter().cloned()), - )?; - batches.push(Ok(matched)); + // Only process and write if there are remaining rows after filtering duplicates + if matched.num_rows() > 0 { + // Get row_ids again after filtering (if any duplicates were removed) + let row_ids = matched.column(row_id_col).as_primitive::(); + deleted_row_ids.extend(row_ids.values()); + if self.enable_stable_row_ids { + self.updating_row_ids + .lock() + .unwrap() + .capture(row_ids.values())?; + } + + let projection = if let Some(row_addr_col) = row_addr_col { + let mut cols = Vec::from_iter(left_cols.iter().cloned()); + cols.push(row_addr_col); + cols + } else { + #[allow(clippy::redundant_clone)] + left_cols.clone() + }; + let matched = matched.project(&projection)?; + // The payload columns of an outer join are always nullable. We need to restore + // non-nullable to columns that were originally non-nullable. This should be safe + // since the not_matched rows should all be valid on the right_cols + // + // Sadly we can't use with_schema because it doesn't let you toggle nullability + let matched = RecordBatch::try_new( + self.output_schema.clone(), + Vec::from_iter(matched.columns().iter().cloned()), + )?; + batches.push(Ok(matched)); + } } } } @@ -4067,6 +4150,371 @@ mod tests { assert_eq!(updated_ds.count_rows(None).await.unwrap(), 3); } + /// Composite-key delete-only merge_insert (`when_matched(Delete)`, + /// `when_not_matched_by_source(Keep)`) removes the matched rows for every + /// combination of which join columns carry a scalar index, including when + /// every column is indexed. + #[rstest::rstest] + #[case::index_on_both(true, true)] + #[case::index_on_first(true, false)] + #[case::index_on_second(false, true)] + #[case::no_index(false, false)] + #[tokio::test] + async fn test_indexed_merge_insert_composite_key_delete( + #[case] index_on_a: bool, + #[case] index_on_b: bool, + ) { + let initial = record_batch!( + ("a", Int32, [1, 1, 2, 2]), + ("b", Int32, [10, 20, 10, 20]), + ("value", Int32, [100, 200, 300, 400]) + ) + .unwrap(); + let schema = initial.schema(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + let params = ScalarIndexParams::default(); + if index_on_a { + ds.create_index(&["a"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + } + if index_on_b { + ds.create_index(&["b"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + } + + // Delete (1, 10) by composite key. Only key columns in the source. + let source = record_batch!(("a", Int32, [1]), ("b", Int32, [10])).unwrap(); + + let (updated_ds, stats) = + MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string(), "b".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 1, "matched row must be deleted"); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 3); + assert_eq!( + updated_ds + .count_rows(Some("a = 1 AND b = 10".to_string())) + .await + .unwrap(), + 0, + "(1, 10) must be gone after the delete" + ); + // The sibling key (1, 20) must remain untouched. + assert_eq!( + updated_ds + .count_rows(Some("a = 1 AND b = 20 AND value = 200".to_string())) + .await + .unwrap(), + 1, + ); + } + + /// Delete-only merge_insert on a single indexed key removes the matched + /// rows (the indexed path is not exclusive to composite keys). + #[tokio::test] + async fn test_indexed_merge_insert_single_key_delete() { + let initial = record_batch!( + ("id", Int32, [1, 2, 3, 4]), + ("value", Int32, [10, 20, 30, 40]) + ) + .unwrap(); + let schema = initial.schema(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + ds.create_index( + &["id"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + let source = record_batch!(("id", Int32, [2, 4])).unwrap(); + + let (updated_ds, stats) = MergeInsertBuilder::try_new(Arc::new(ds), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 2); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 2); + assert_eq!( + updated_ds + .count_rows(Some("id = 2 OR id = 4".to_string())) + .await + .unwrap(), + 0, + ); + } + + /// `when_matched(Fail)` on an indexed key aborts the operation when a + /// source row matches an existing key, and inserts cleanly when none do. + #[tokio::test] + async fn test_indexed_merge_insert_when_matched_fail() { + let initial = + record_batch!(("id", Int32, [1, 2, 3]), ("value", Int32, [10, 20, 30])).unwrap(); + let schema = initial.schema(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + ds.create_index( + &["id"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + let ds = Arc::new(ds); + + // A source row matching an existing key must fail the operation. + let matching = record_batch!(("id", Int32, [2]), ("value", Int32, [999])).unwrap(); + let err = MergeInsertBuilder::try_new(ds.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::Fail) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(matching.clone())], + matching.schema(), + ))) + .await + .unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("Merge insert failed"), "got: {msg}"); + assert!(msg.contains("found matching row"), "got: {msg}"); + + // A source with no matching key inserts without failing. + let new_rows = record_batch!(("id", Int32, [4]), ("value", Int32, [40])).unwrap(); + let (updated_ds, stats) = MergeInsertBuilder::try_new(ds.clone(), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::Fail) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(new_rows.clone())], + new_rows.schema(), + ))) + .await + .unwrap(); + assert_eq!(stats.num_inserted_rows, 1); + assert_eq!(stats.num_updated_rows, 0); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 4); + } + + /// Fully-indexed composite-key `when_matched(Delete)` combined with + /// `when_not_matched(InsertAll)` must both delete matched rows and write + /// the inserted rows. + #[tokio::test] + async fn test_indexed_merge_insert_composite_key_delete_with_insert() { + let initial = record_batch!( + ("a", Int32, [1, 1, 2, 2]), + ("b", Int32, [10, 20, 10, 20]), + ("value", Int32, [100, 200, 300, 400]) + ) + .unwrap(); + let schema = initial.schema(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + let params = ScalarIndexParams::default(); + ds.create_index(&["a"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + ds.create_index(&["b"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Source matches (1, 10) -> delete, and (3, 30) is new -> insert. + let source = record_batch!( + ("a", Int32, [1, 3]), + ("b", Int32, [10, 30]), + ("value", Int32, [999, 333]) + ) + .unwrap(); + + let (updated_ds, stats) = + MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string(), "b".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 1); + assert_eq!(stats.num_inserted_rows, 1); + // 4 - 1 deleted + 1 inserted = 4. + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 4); + assert_eq!( + updated_ds + .count_rows(Some("a = 1 AND b = 10".to_string())) + .await + .unwrap(), + 0, + "matched row must be deleted, not updated" + ); + assert_eq!( + updated_ds + .count_rows(Some("a = 3 AND b = 30 AND value = 333".to_string())) + .await + .unwrap(), + 1, + "unmatched source row must be inserted" + ); + } + + /// Fully-indexed composite-key delete across multiple fragments, with + /// stable row ids on/off. Exercises the indexed-scan delete commit + /// path: matched row ids are resolved to addresses (via the row-id + /// index when stable) and removed without rewriting any fragments. + /// Also covers an appended fragment that neither index covers, so the + /// delete must reach rows via the unindexed-remainder union too. + #[rstest::rstest] + #[case(true)] + #[case(false)] + #[tokio::test] + async fn test_indexed_merge_insert_composite_key_delete_multi_fragment( + #[case] enable_stable_row_ids: bool, + ) { + let initial = record_batch!( + ("a", Int32, [1, 1, 2, 2]), + ("b", Int32, [10, 20, 10, 20]), + ("value", Int32, [100, 200, 300, 400]) + ) + .unwrap(); + let schema = initial.schema(); + + // One row per fragment so the delete spans multiple fragments. + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + Some(WriteParams { + max_rows_per_file: 1, + enable_stable_row_ids, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = ScalarIndexParams::default(); + ds.create_index(&["a"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + ds.create_index(&["b"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Append a row AFTER the indices are built so it lives in a fragment + // neither index covers. The delete must still reach it. + let appended = record_batch!( + ("a", Int32, [3]), + ("b", Int32, [30]), + ("value", Int32, [500]) + ) + .unwrap(); + ds.append( + RecordBatchIterator::new(vec![Ok(appended.clone())], appended.schema()), + None, + ) + .await + .unwrap(); + + // Delete an indexed row (1, 10) and the unindexed appended row (3, 30). + let source = record_batch!(("a", Int32, [1, 3]), ("b", Int32, [10, 30])).unwrap(); + + let (updated_ds, stats) = + MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string(), "b".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 2); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 3); + assert_eq!( + updated_ds + .count_rows(Some("(a = 1 AND b = 10) OR (a = 3 AND b = 30)".to_string())) + .await + .unwrap(), + 0, + "both matched rows must be gone" + ); + // Untouched rows survive with their original values. + assert_eq!( + updated_ds + .count_rows(Some("a = 2 AND b = 20 AND value = 400".to_string())) + .await + .unwrap(), + 1, + ); + } + /// Composite-key `MapIndexExec` formats its Display so plans expose /// every probed column, and `with_new_children` round-trips the full /// lookup list rather than collapsing back to a single-column form. From b4ad33e4002741c189b9668b0f1bd479f7757466 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 29 Jun 2026 08:20:20 -0500 Subject: [PATCH 2/2] fix(merge-insert): apply source_dedupe_behavior to deletes across v1 and v2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two review comments on the merge-insert delete paths. A source with duplicate keys matching the same target row was handled inconsistently by deletes: the row id was counted once per joined match while the commit deleted it a single time (over-reported num_deleted_rows), and the default source_dedupe_behavior::Fail — honored for updates — was silently ignored. Make every delete engine apply the same policy as updates: * v1 legacy Merger: dedupe matched row ids via processed_row_ids; on a repeat, Fail aborts (naming the ambiguous key) and FirstSeen skips + counts a skipped duplicate. * v2 FullSchemaMergeInsertExec (Delete + InsertAll): mirror the UpdateAll arm. Target-only deletes from delete_not_matched_by_source share the action but never duplicate, so they never trip Fail. * v2 DeleteOnlyMergeInsertExec: thread source_dedupe_behavior + on_columns into collect_deletions, detect duplicates via the treemap insert, and fold the skipped-duplicate metric into its (previously hardcoded-0) stats. Deletes now count each removed row once and reject ambiguous sources by default, matching update semantics. The second fix: a partial-schema source combining Delete with InsertAll was forced onto the indexed-scan path (which can't fold a delete into a partial write) and rejected as NotSupported. Keep that combination off the scalar-index route so it falls through to the v2 plan, which fills omitted nullable columns. Tests cover Fail/FirstSeen source-duplicate deletes on the v1 indexed path and both v2 plans, plus a fully-indexed partial-schema delete+insert. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance/src/dataset/write/merge_insert.rs | 298 +++++++++++++++++- .../dataset/write/merge_insert/exec/delete.rs | 39 ++- .../dataset/write/merge_insert/exec/write.rs | 23 ++ 3 files changed, 350 insertions(+), 10 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index af5868e0e7d..c43d66fbd1a 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1691,7 +1691,17 @@ impl MergeInsertJob { } } + // A partial-schema source that both deletes matched rows and inserts + // unmatched rows cannot be expressed by the indexed-scan delete path + // (the delete cannot be folded into a partial write). Keep it off the + // scalar-index route so it falls through to the v2 plan, which handles + // delete + insert directly. + let is_partial_delete_with_insert = is_subset_schema + && self.params.insert_not_matched + && matches!(self.params.when_matched, WhenMatched::Delete); + let would_use_scalar_index = if self.params.use_index + && !is_partial_delete_with_insert && matches!( self.params.delete_not_matched_by_source, WhenNotMatchedBySource::Keep @@ -2407,11 +2417,35 @@ impl Merger { WhenMatched::DoNothing => {} WhenMatched::Delete => { // Matched rows are removed, not rewritten: record their row ids - // for the commit to delete and emit no replacement batch. - let matched_row_ids = arrow::compute::filter(batch.column(row_id_col), &in_both)?; - let row_ids = matched_row_ids.as_primitive::(); - merge_statistics.num_deleted_rows += row_ids.len() as u64; - deleted_row_ids.extend(row_ids.values()); + // for the commit to delete and emit no replacement batch. A + // source with duplicate keys matches the same target row more + // than once; apply the same `source_dedupe_behavior` policy as + // updates so a duplicate either aborts (`Fail`) or is skipped + // and counted once (`FirstSeen`) — the commit deletes the row a + // single time regardless. + let matched = arrow::compute::filter_record_batch(&batch, &in_both)?; + let row_ids = matched.column(row_id_col).as_primitive::(); + + let mut processed_row_ids = self.processed_row_ids.lock().unwrap(); + for (row_idx, &row_id) in row_ids.values().iter().enumerate() { + if processed_row_ids.insert(row_id) { + merge_statistics.num_deleted_rows += 1; + deleted_row_ids.push(row_id); + } else { + match self.params.source_dedupe_behavior { + SourceDedupeBehavior::Fail => { + return Err(create_duplicate_row_error( + &matched, + row_idx, + &self.params.on, + )); + } + SourceDedupeBehavior::FirstSeen => { + merge_statistics.num_skipped_duplicates += 1; + } + } + } + } } WhenMatched::Fail => { // Any matched row aborts the whole operation. @@ -4421,6 +4455,260 @@ mod tests { ); } + /// A delete whose source contains duplicate keys matching the same target + /// row applies `source_dedupe_behavior` on the indexed-scan path, exactly + /// like an update: the default `Fail` aborts (naming the ambiguous key), + /// while `FirstSeen` removes and counts the row once and reports the extra + /// match as a skipped duplicate. + #[rstest::rstest] + #[case::fail(SourceDedupeBehavior::Fail)] + #[case::first_seen(SourceDedupeBehavior::FirstSeen)] + #[tokio::test] + async fn test_indexed_merge_insert_delete_source_duplicates( + #[case] behavior: SourceDedupeBehavior, + ) { + let initial = record_batch!( + ("a", Int32, [1, 1, 2, 2]), + ("b", Int32, [10, 20, 10, 20]), + ("value", Int32, [100, 200, 300, 400]) + ) + .unwrap(); + let schema = initial.schema(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + // Index every join column so the merge takes the indexed-scan delete path. + let params = ScalarIndexParams::default(); + ds.create_index(&["a"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + ds.create_index(&["b"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Two source rows collide on the same target key (1, 10). + let source = record_batch!(("a", Int32, [1, 1]), ("b", Int32, [10, 10])).unwrap(); + + let result = + MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string(), "b".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::DoNothing) + .source_dedupe_behavior(behavior) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await; + + if behavior == SourceDedupeBehavior::Fail { + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Ambiguous merge inserts") && err.contains("a = 1"), + "Fail must abort naming the ambiguous key, got: {err}" + ); + return; + } + + let (updated_ds, stats) = result.unwrap(); + assert_eq!(stats.num_deleted_rows, 1); + assert_eq!(stats.num_skipped_duplicates, 1); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 3); + assert_eq!( + updated_ds + .count_rows(Some("a = 1 AND b = 10".to_string())) + .await + .unwrap(), + 0, + "the matched row must be removed exactly once" + ); + } + + /// The v2 plans apply the same `source_dedupe_behavior` to deletes when the + /// source has duplicate keys matching one target row — covering both + /// `FullSchemaMergeInsertExec` (`Delete + InsertAll`) and + /// `DeleteOnlyMergeInsertExec` (pure delete). No scalar index, so routing + /// stays on the v2 path. + #[rstest::rstest] + #[case::full_schema_fail(true, SourceDedupeBehavior::Fail)] + #[case::full_schema_first_seen(true, SourceDedupeBehavior::FirstSeen)] + #[case::delete_only_fail(false, SourceDedupeBehavior::Fail)] + #[case::delete_only_first_seen(false, SourceDedupeBehavior::FirstSeen)] + #[tokio::test] + async fn test_v2_merge_insert_delete_source_duplicates( + #[case] with_insert: bool, + #[case] behavior: SourceDedupeBehavior, + ) { + let initial = + record_batch!(("a", Int32, [1, 2, 3]), ("value", Int32, [10, 20, 30])).unwrap(); + let schema = initial.schema(); + + let ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial.clone())], schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + // Two source rows collide on target key a=1. With insert, a=4 is new. + let (source, when_not_matched, expected_inserted, expected_total) = if with_insert { + ( + record_batch!(("a", Int32, [1, 1, 4]), ("value", Int32, [99, 99, 40])).unwrap(), + WhenNotMatched::InsertAll, + 1, + 3, // 3 - 1 deleted + 1 inserted + ) + } else { + ( + record_batch!(("a", Int32, [1, 1])).unwrap(), + WhenNotMatched::DoNothing, + 0, + 2, // 3 - 1 deleted + ) + }; + + let result = MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(when_not_matched) + .source_dedupe_behavior(behavior) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await; + + if behavior == SourceDedupeBehavior::Fail { + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Ambiguous merge inserts") && err.contains("a = 1"), + "Fail must abort naming the ambiguous key, got: {err}" + ); + return; + } + + let (updated_ds, stats) = result.unwrap(); + assert_eq!(stats.num_deleted_rows, 1, "the matched row is removed once"); + assert_eq!(stats.num_skipped_duplicates, 1); + assert_eq!(stats.num_inserted_rows, expected_inserted); + assert_eq!(updated_ds.count_rows(None).await.unwrap(), expected_total); + assert_eq!( + updated_ds + .count_rows(Some("a = 1".to_string())) + .await + .unwrap(), + 0, + "the matched row must be removed exactly once" + ); + } + + /// A partial-schema source that combines `when_matched(Delete)` with + /// `when_not_matched(InsertAll)` must succeed even when every join key is + /// indexed. The indexed-scan delete path cannot fold a delete into a + /// partial write, so this case routes to the v2 plan (which fills omitted + /// nullable target columns) instead of being rejected. + #[tokio::test] + async fn test_indexed_merge_insert_partial_schema_delete_with_insert() { + // Target carries two nullable non-key columns; the source omits `note`. + let full_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + Field::new("note", DataType::Utf8, true), + ])); + let full_batch = RecordBatch::try_new( + full_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 1, 2, 2])), + Arc::new(Int32Array::from(vec![10, 20, 10, 20])), + Arc::new(Int32Array::from(vec![100, 200, 300, 400])), + Arc::new(StringArray::from(vec!["w", "x", "y", "z"])), + ], + ) + .unwrap(); + + let mut ds = Dataset::write( + RecordBatchIterator::new(vec![Ok(full_batch)], full_schema.clone()), + "memory://", + None, + ) + .await + .unwrap(); + + let params = ScalarIndexParams::default(); + ds.create_index(&["a"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + ds.create_index(&["b"], IndexType::Scalar, None, ¶ms, false) + .await + .unwrap(); + + // Source deletes matched (1, 10) and inserts new (3, 30), omitting `note`. + let partial_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + ])); + let source = RecordBatch::try_new( + partial_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 3])), + Arc::new(Int32Array::from(vec![10, 30])), + Arc::new(Int32Array::from(vec![999, 333])), + ], + ) + .unwrap(); + + let (updated_ds, stats) = + MergeInsertBuilder::try_new(Arc::new(ds), vec!["a".to_string(), "b".to_string()]) + .unwrap() + .when_matched(WhenMatched::Delete) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(source.clone())], + source.schema(), + ))) + .await + .unwrap(); + + assert_eq!(stats.num_deleted_rows, 1); + assert_eq!(stats.num_inserted_rows, 1); + // 4 - 1 deleted + 1 inserted = 4. + assert_eq!(updated_ds.count_rows(None).await.unwrap(), 4); + assert_eq!( + updated_ds + .count_rows(Some("a = 1 AND b = 10".to_string())) + .await + .unwrap(), + 0, + "matched row must be deleted, not updated" + ); + // Inserted row carries the omitted `note` column as NULL. + assert_eq!( + updated_ds + .count_rows(Some( + "a = 3 AND b = 30 AND value = 333 AND note IS NULL".to_string() + )) + .await + .unwrap(), + 1, + "unmatched source row must be inserted with omitted column NULL-filled" + ); + } + /// Fully-indexed composite-key delete across multiple fragments, with /// stable row ids on/off. Exercises the indexed-scan delete commit /// path: matched row ids are resolved to addresses (via the row-id diff --git a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs index be7897e2441..0c63b99fe69 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/delete.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/delete.rs @@ -22,7 +22,10 @@ use roaring::RoaringTreemap; use crate::Dataset; use crate::dataset::transaction::{Operation, Transaction}; use crate::dataset::write::merge_insert::assign_action::Action; -use crate::dataset::write::merge_insert::{MERGE_ACTION_COLUMN, MergeInsertParams, MergeStats}; +use crate::dataset::write::merge_insert::{ + MERGE_ACTION_COLUMN, MergeInsertParams, MergeStats, SourceDedupeBehavior, + create_duplicate_row_error, +}; use super::{MergeInsertMetrics, apply_deletions}; @@ -101,6 +104,8 @@ impl DeleteOnlyMergeInsertExec { async fn collect_deletions( mut input_stream: SendableRecordBatchStream, metrics: MergeInsertMetrics, + source_dedupe_behavior: SourceDedupeBehavior, + on_columns: &[String], ) -> DFResult { let schema = input_stream.schema(); @@ -156,8 +161,25 @@ impl DeleteOnlyMergeInsertExec { if action == Action::Delete && !row_addr_array.is_null(row_idx) { let row_addr = row_addr_array.value(row_idx); - delete_row_addrs.insert(row_addr); - metrics.num_deleted_rows.add(1); + // The treemap dedupes addresses, so a repeat insert signals + // a duplicate source row matching the same target; apply the + // same dedupe policy as updates. (Delete-only never carries + // `delete_not_matched_by_source`, so every delete here is a + // source match.) + if delete_row_addrs.insert(row_addr) { + metrics.num_deleted_rows.add(1); + } else { + match source_dedupe_behavior { + SourceDedupeBehavior::Fail => { + return Err(create_duplicate_row_error( + &batch, row_idx, on_columns, + )); + } + SourceDedupeBehavior::FirstSeen => { + metrics.num_skipped_duplicates.add(1); + } + } + } } } } @@ -261,9 +283,16 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec { let transaction_holder = self.transaction.clone(); let affected_rows_holder = self.affected_rows.clone(); let merged_generations = self.params.merged_generations.clone(); + let source_dedupe_behavior = self.params.source_dedupe_behavior; + let on_columns = self.params.on.clone(); let result_stream = futures::stream::once(async move { - let delete_row_addrs = Self::collect_deletions(input_stream, metrics).await?; + // `metrics` is moved into `collect_deletions`; keep a handle on the + // skipped-duplicate counter so it can be folded into the stats below. + let skipped_duplicates = metrics.num_skipped_duplicates.clone(); + let delete_row_addrs = + Self::collect_deletions(input_stream, metrics, source_dedupe_behavior, &on_columns) + .await?; let (updated_fragments, removed_fragment_ids) = apply_deletions(&dataset, &delete_row_addrs) @@ -297,7 +326,7 @@ impl ExecutionPlan for DeleteOnlyMergeInsertExec { bytes_written: 0, num_files_written: 0, num_attempts: 1, - num_skipped_duplicates: 0, + num_skipped_duplicates: skipped_duplicates.value() as u64, }; if let Ok(mut transaction_guard) = transaction_holder.lock() { diff --git a/rust/lance/src/dataset/write/merge_insert/exec/write.rs b/rust/lance/src/dataset/write/merge_insert/exec/write.rs index d80c3b084f6..c3c3c71b6a3 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec/write.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec/write.rs @@ -103,6 +103,29 @@ impl MergeState { // Delete action - only delete, don't write back if !row_addr_array.is_null(row_idx) { let row_addr = row_addr_array.value(row_idx); + let row_id = row_id_array.value(row_idx); + + // A source with duplicate keys matches the same target row + // more than once; apply the same dedupe policy as updates. + // (Target-only deletes from `delete_not_matched_by_source` + // also reach here but never duplicate, so they never trip + // `Fail`.) + if !self.processed_row_ids.insert(row_id) { + match self.source_dedupe_behavior { + SourceDedupeBehavior::Fail => { + return Err(create_duplicate_row_error( + batch, + row_idx, + &self.on_columns, + )); + } + SourceDedupeBehavior::FirstSeen => { + self.metrics.num_skipped_duplicates.add(1); + return Ok(None); // Skip this duplicate row + } + } + } + self.delete_row_addrs.insert(row_addr); self.metrics.num_deleted_rows.add(1); }