Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 90 additions & 4 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! This module provide `EqualityDeleteWriter`.

use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::RecordBatch;
Expand Down Expand Up @@ -68,17 +69,48 @@ pub struct EqualityDeleteWriterConfig {
projector: RecordBatchProjector,
}

fn validate_equality_ids(equality_ids: &[i32], original_schema: &SchemaRef) -> Result<()> {
if equality_ids.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Equality delete field ids must not be empty.",
));
}

let mut seen = HashSet::with_capacity(equality_ids.len());
for id in equality_ids {
if !seen.insert(*id) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Duplicate equality delete field id: {id}"),
));
}

if original_schema.field_by_id(*id).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid equality delete field id: {id}"),
)
.with_context("field_id", id.to_string()));
}
}

Ok(())
}

impl EqualityDeleteWriterConfig {
/// Create a new `DataFileWriterConfig` with equality ids.
pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
validate_equality_ids(&equality_ids, &original_schema)?;

let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
let projector = RecordBatchProjector::new(
original_arrow_schema,
&equality_ids,
// The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids
// and https://iceberg.apache.org/spec/#equality-delete-files
// - The identifier field ids must be used for primitive types.
// - The identifier field ids must not be used for floating point types or nullable fields.
// Equality delete fields follow identifier-field type restrictions,
// except optional columns and columns nested under optional structs are allowed.
// Project only primitive, non-floating fields; RecordBatchProjector's traversal
// keeps fields under maps and lists unreachable.
|field| {
// Only primitive type is allowed to be used for identifier field ids
if field.data_type().is_nested()
Expand Down Expand Up @@ -212,6 +244,7 @@ mod test {
use tempfile::TempDir;
use uuid::Uuid;

use crate::ErrorKind;
use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
use crate::io::FileIO;
use crate::spec::{
Expand Down Expand Up @@ -458,6 +491,59 @@ mod test {
Ok(())
}

fn equality_id_validation_schema() -> Arc<Schema> {
Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
)
}

#[test]
fn test_equality_delete_rejects_empty_equality_ids() {
let err =
EqualityDeleteWriterConfig::new(vec![], equality_id_validation_schema()).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.to_string()
.contains("Equality delete field ids must not be empty."),
"{err}"
);
}

#[test]
fn test_equality_delete_rejects_duplicate_equality_ids() {
let err = EqualityDeleteWriterConfig::new(vec![1, 1], equality_id_validation_schema())
.unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.to_string()
.contains("Duplicate equality delete field id: 1"),
"{err}"
);
}

#[test]
fn test_equality_delete_rejects_missing_equality_id() {
let err =
EqualityDeleteWriterConfig::new(vec![99], equality_id_validation_schema()).unwrap_err();

assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.to_string()
.contains("Invalid equality delete field id: 99"),
"{err}"
);
assert!(err.to_string().contains("field_id: 99"), "{err}");
}

#[tokio::test]
async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> {
let schema = Arc::new(
Expand Down
Loading