diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 36fde117ab..07b3363f94 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; @@ -79,6 +79,18 @@ impl FastAppendAction { self.snapshot_properties = snapshot_properties; self } + + /// Collapse files sharing a path to their first occurrence, so a single + /// manifest never references the same file twice. Always runs (unlike the + /// `check_duplicate`-gated cross-snapshot check) since it is in-memory only. + fn dedupe_added_files(&self) -> Vec { + let mut seen = HashSet::with_capacity(self.added_data_files.len()); + self.added_data_files + .iter() + .filter(|data_file| seen.insert(data_file.file_path.as_str())) + .cloned() + .collect() + } } #[async_trait] @@ -89,7 +101,7 @@ impl TransactionAction for FastAppendAction { self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), - self.added_data_files.clone(), + self.dedupe_added_files(), ); // validate added files @@ -160,9 +172,9 @@ mod tests { use crate::io::FileIO; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, SnapshotRef, Struct, - TableMetadata, + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, + ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, SnapshotRef, + Struct, TableMetadata, }; use crate::table::Table; use crate::test_utils::test_runtime; @@ -363,6 +375,27 @@ mod tests { ); } + /// Load the data files written by a single-manifest fast-append commit. + async fn committed_data_files(table: &Table, updates: &[TableUpdate]) -> Vec { + let TableUpdate::AddSnapshot { snapshot } = &updates[0] else { + unreachable!("first update is always AddSnapshot") + }; + let manifest_list = table + .manifest_list_reader(&SnapshotRef::new(snapshot.clone())) + .load() + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap() + .entries() + .iter() + .map(|entry| entry.data_file().clone()) + .collect() + } + #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); @@ -466,6 +499,64 @@ mod tests { assert!(Arc::new(action).commit(&table).await.is_err()); } + #[tokio::test] + async fn test_fast_append_dedupes_intra_batch_duplicate_paths() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = |size: u64, records: u64| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(size) + .record_count(records) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // Same path three times: the manifest keeps a single entry, the first one. + let action = tx.fast_append().add_data_files(vec![ + make_file(100, 10), + make_file(200, 20), + make_file(300, 30), + ]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let files = committed_data_files(&table, &action_commit.take_updates()).await; + assert_eq!(1, files.len()); + assert_eq!(100, files[0].file_size_in_bytes()); + } + + #[tokio::test] + async fn test_fast_append_dedupes_regardless_of_check_duplicate_flag() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = || { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + // `check_duplicate` only gates the cross-snapshot check; intra-batch dedupe runs regardless. + let action = tx + .fast_append() + .with_check_duplicate(false) + .add_data_files(vec![make_file(), make_file()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let files = committed_data_files(&table, &action_commit.take_updates()).await; + assert_eq!(1, files.len()); + } + #[tokio::test] async fn test_fast_append() { let table = make_v2_minimal_table();