Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 160 additions & 2 deletions crates/catalog/rest/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

use std::collections::HashMap;

use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
use iceberg::spec::{
Schema, SortOrder, TableMetadata, Transform, UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::{
Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, TableUpdate,
};
use serde::{Serialize as _, Serializer};
use serde_derive::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -256,7 +259,10 @@ pub struct CreateTableRequest {
/// Table schema
pub schema: Schema,
/// Optional partition specification. If not provided, the table will be unpartitioned.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "serialize_partition_spec_omit_none_ids"
)]
pub partition_spec: Option<UnboundPartitionSpec>,
/// Optional sort order for the table
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -269,6 +275,73 @@ pub struct CreateTableRequest {
pub properties: HashMap<String, String>,
}

/// Serialize a `CreateTableRequest::partition_spec` value, omitting the
/// inner `spec-id` / `field-id` keys when they are `None`.
///
/// The core `UnboundPartitionSpec` / `UnboundPartitionField` types always
/// emit those keys (as `null` when unset) because `TableUpdate::AddSpec`
/// requires `spec-id` to be present on the wire. `CreateTableRequest` is
/// a separate REST surface where the server assigns these ids, so the
/// client should omit them entirely when not set rather than send
/// explicit `null`s. This serializer localizes that REST-only behavior
/// without coupling the core spec type to it.
///
/// Paired with `skip_serializing_if = "Option::is_none"` on the field, so
/// this function is only invoked when the outer `Option` is `Some`. The
/// `None` arm is defensive.
fn serialize_partition_spec_omit_none_ids<S>(
partition_spec: &Option<UnboundPartitionSpec>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Shadow types that mirror the wire shape of `UnboundPartitionSpec` /
// `UnboundPartitionField` but skip the optional id fields when `None`.
#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
struct CreateTableUnboundPartitionField<'a> {
source_id: i32,
#[serde(skip_serializing_if = "Option::is_none")]
field_id: Option<i32>,
name: &'a str,
transform: &'a Transform,
}

#[derive(Serialize)]
#[serde(rename_all = "kebab-case")]
struct CreateTableUnboundPartitionSpec<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
spec_id: Option<i32>,
fields: Vec<CreateTableUnboundPartitionField<'a>>,
}

impl<'a> From<&'a UnboundPartitionField> for CreateTableUnboundPartitionField<'a> {
fn from(field: &'a UnboundPartitionField) -> Self {
Self {
source_id: field.source_id,
field_id: field.field_id,
name: &field.name,
transform: &field.transform,
}
}
}

impl<'a> From<&'a UnboundPartitionSpec> for CreateTableUnboundPartitionSpec<'a> {
fn from(spec: &'a UnboundPartitionSpec) -> Self {
Self {
spec_id: spec.spec_id(),
fields: spec.fields().iter().map(Into::into).collect(),
}
}
}

match partition_spec {
Some(spec) => CreateTableUnboundPartitionSpec::from(spec).serialize(serializer),
None => serializer.serialize_none(),
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
/// Request to commit updates to a table.
///
Expand Down Expand Up @@ -472,4 +545,89 @@ mod tests {
assert_eq!(request.stage_create, None);
assert!(request.properties.is_empty());
}

#[test]
fn test_create_table_request_partition_spec_skips_inner_none_ids() {
// When `partition_spec` is `Some` but the inner spec / fields have
// no `spec_id` / `field_id` set, those inner keys must be omitted
// from the serialized REST payload (the server assigns them).
// The core `UnboundPartitionSpec` serializes those keys as `null`
// by design; this localization lives only on the REST layer.
use iceberg::spec::{Transform, UnboundPartitionField, UnboundPartitionSpec};

let request = CreateTableRequest {
name: "tbl1".to_string(),
location: None,
schema: test_create_table_request_schema(),
partition_spec: Some(
UnboundPartitionSpec::builder()
.add_partition_field(2, "bar_id".to_string(), Transform::Identity)
.expect("builder add_partition_field failed")
.build(),
),
write_order: None,
stage_create: None,
properties: HashMap::new(),
};

let serialized = serde_json::to_value(&request).expect("Serialization failed");
let spec = serialized
.get("partition-spec")
.and_then(|s| s.as_object())
.expect("partition-spec must be present and an object");
assert!(
!spec.contains_key("spec-id"),
"spec-id must be omitted when None on REST payload; got {serialized}",
);
let fields = spec
.get("fields")
.and_then(|f| f.as_array())
.expect("fields must be a JSON array");
let field = fields[0]
.as_object()
.expect("each field must be a JSON object");
assert!(
!field.contains_key("field-id"),
"field-id must be omitted when None on REST payload; got {serialized}",
);

// When the ids are set on the inner spec, they appear on the wire.
let request = CreateTableRequest {
name: "tbl1".to_string(),
location: None,
schema: test_create_table_request_schema(),
partition_spec: Some(
UnboundPartitionSpec::builder()
.with_spec_id(1)
.add_partition_fields(vec![
UnboundPartitionField::builder()
.source_id(2)
.field_id(1000)
.name("bar_id".to_string())
.transform(Transform::Identity)
.build(),
])
.expect("builder add_partition_fields failed")
.build(),
),
write_order: None,
stage_create: None,
properties: HashMap::new(),
};
let serialized = serde_json::to_value(&request).expect("Serialization failed");
assert_eq!(
Some(&serde_json::json!(1)),
serialized
.get("partition-spec")
.and_then(|s| s.get("spec-id"))
);
assert_eq!(
Some(&serde_json::json!(1000)),
serialized
.get("partition-spec")
.and_then(|s| s.get("fields"))
.and_then(|f| f.get(0))
.and_then(|f| f.get("field-id"))
);
}
}
42 changes: 42 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,48 @@ mod tests {
);
}

#[test]
fn test_add_spec_emits_spec_id_key() {
// Regression guard: a Java REST server's `PartitionSpecParser.fromJson`
// throws `IllegalArgumentException: Cannot parse missing int: spec-id`
// when the `spec-id` key is missing on the wire. The inner
// `UnboundPartitionSpec` therefore must always emit the `spec-id`
// key on `TableUpdate::AddSpec`, even before `add_partition_spec`
// has populated it. Re-introducing
// `#[serde(skip_serializing_if = "Option::is_none")]` on
// `UnboundPartitionSpec::spec_id` would silently break this.
let update = TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.build(),
};

let value = serde_json::to_value(&update).unwrap();
let spec = value
.get("spec")
.and_then(|s| s.as_object())
.expect("AddSpec must serialize `spec` as a JSON object");
assert!(
spec.contains_key("spec-id"),
"AddSpec must emit `spec-id` on the wire even when None; got {value}",
);

// And when set, the key holds the value.
let update = TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.with_spec_id(7)
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.build(),
};
let value = serde_json::to_value(&update).unwrap();
assert_eq!(
Some(&serde_json::json!(7)),
value.get("spec").and_then(|s| s.get("spec-id"))
);
}

#[test]
fn test_set_default_spec() {
test_serde_json(
Expand Down
40 changes: 33 additions & 7 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ pub struct UnboundPartitionField {
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
#[builder(default, setter(strip_option(fallback = field_id_opt)))]
#[serde(skip_serializing_if = "Option::is_none")]
pub field_id: Option<i32>,
/// A partition name.
pub name: String,
Expand All @@ -261,7 +260,6 @@ pub struct UnboundPartitionField {
#[serde(rename_all = "kebab-case")]
pub struct UnboundPartitionSpec {
/// Identifier for PartitionSpec
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) spec_id: Option<i32>,
/// Details of the partition spec
pub(crate) fields: Vec<UnboundPartitionField>,
Expand Down Expand Up @@ -915,24 +913,30 @@ mod tests {
}

#[test]
fn test_unbound_partition_spec_serialization_skips_none_fields() {
fn test_unbound_partition_spec_serialization_emits_none_as_null() {
// The core `UnboundPartitionSpec` / `UnboundPartitionField` types must
// always emit `spec-id` and `field-id` keys on the wire, even when
// they are `None`. `TableUpdate::AddSpec` relies on this contract:
// a Java REST server's `PartitionSpecParser.fromJson` requires
// `spec-id` to be present on the JSON object. Per-request types that
// need the keys omitted (e.g. REST `CreateTableRequest`) localize
// that behavior via a custom serializer.
let spec = UnboundPartitionSpec::builder()
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.build();

let value = serde_json::to_value(&spec).unwrap();
let object = value.as_object().unwrap();
assert!(!object.contains_key("spec-id"));
assert_eq!(Some(&serde_json::Value::Null), object.get("spec-id"));
let field = object["fields"][0].as_object().unwrap();
assert!(!field.contains_key("field-id"));
assert_eq!(Some(&serde_json::Value::Null), field.get("field-id"));

let value = serde_json::to_value(spec.with_spec_id(1)).unwrap();
let object = value.as_object().unwrap();
assert_eq!(Some(&serde_json::json!(1)), object.get("spec-id"));

// Explicit nulls must still deserialize to `None` for backwards
// compatibility.
// Explicit nulls must still deserialize to `None`.
let spec: UnboundPartitionSpec = serde_json::from_str(
r#"{
"spec-id": null,
Expand All @@ -946,6 +950,28 @@ mod tests {
assert_eq!(None, spec.fields[0].field_id);
}

#[test]
fn test_unbound_partition_field_field_id_set_is_serialized() {
// Positive case for the `field-id` round-trip: when `field_id` is
// `Some`, the value must appear under the kebab-case `field-id` key.
// Mirrors viirya's review ask on #2610.
let field = UnboundPartitionField::builder()
.source_id(4)
.field_id(1000)
.name("ts_day".to_string())
.transform(Transform::Day)
.build();

let value = serde_json::to_value(&field).unwrap();
let object = value.as_object().unwrap();
assert_eq!(Some(&serde_json::json!(1000)), object.get("field-id"));

// Round-trip back to confirm the key is the canonical kebab-case
// form and not `field_id`.
let parsed: UnboundPartitionField = serde_json::from_value(value).unwrap();
assert_eq!(Some(1000), parsed.field_id);
}

#[test]
fn test_new_unpartition() {
let schema = Schema::builder()
Expand Down
Loading