diff --git a/crates/iceberg/src/encryption/handler.rs b/crates/iceberg/src/encryption/handler.rs new file mode 100644 index 0000000000..1aea759df0 --- /dev/null +++ b/crates/iceberg/src/encryption/handler.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable generation of per-file `key_metadata` on the write path. + +use std::fmt::Debug; + +use aes_gcm::aead::OsRng; +use aes_gcm::aead::rand_core::RngCore; +use async_trait::async_trait; + +use super::crypto::{AesKeySize, SecureKey}; +use super::key_metadata::StandardKeyMetadata; +use crate::Result; + +/// AAD prefix length in bytes. +/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. +const AAD_PREFIX_LENGTH: usize = 16; + +/// Produces the per-file `key_metadata` that the writer attaches to each +/// emitted [`DataFile`] and uses to encrypt the file. +/// +/// The spec defines `key_metadata` (field 131) as implementation-specific. The +/// reference *standard* encryption scheme stores a [`StandardKeyMetadata`] +/// containing a fresh plaintext DEK + AAD prefix per file, generated locally +/// without a KMS round-trip (see [`StandardFileEncryptionHandler`]). Other +/// schemes may need to call out to a KMS to wrap a freshly minted DEK, hence +/// the `async` signature. +/// +/// This is the write-side counterpart of `FileKeyResolver` on the read path: +/// readers resolve `key_metadata` bytes back into a [`StandardKeyMetadata`]; +/// writers produce one to embed. +/// +/// [`DataFile`]: crate::spec::DataFile +#[async_trait] +pub trait FileEncryptionHandler: Debug + Send + Sync { + /// Produce key material for the next file to be written. + async fn next_key_metadata(&self) -> Result; +} + +/// Default [`FileEncryptionHandler`] for the standard encryption scheme. +/// +/// Generates a fresh random DEK and AAD prefix per file with no KMS +/// round-trip; satisfies the async signature trivially. +#[derive(Debug, Default, Clone)] +pub struct StandardFileEncryptionHandler { + key_size: AesKeySize, +} + +impl StandardFileEncryptionHandler { + /// Creates a new handler with the given DEK size. + pub fn new(key_size: AesKeySize) -> Self { + Self { key_size } + } +} + +#[async_trait] +impl FileEncryptionHandler for StandardFileEncryptionHandler { + async fn next_key_metadata(&self) -> Result { + Ok(generate_standard_key_metadata(self.key_size)) + } +} + +/// Generate a [`StandardKeyMetadata`] with a fresh random DEK and AAD prefix. +pub(crate) fn generate_standard_key_metadata(key_size: AesKeySize) -> StandardKeyMetadata { + let dek = SecureKey::generate(key_size); + let aad_prefix = generate_aad_prefix(); + StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix) +} + +fn generate_aad_prefix() -> Box<[u8]> { + let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; + OsRng.fill_bytes(&mut prefix); + prefix.into_boxed_slice() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_standard_handler_emits_distinct_keys() { + let handler = StandardFileEncryptionHandler::default(); + let a = handler.next_key_metadata().await.unwrap(); + let b = handler.next_key_metadata().await.unwrap(); + assert_ne!( + a.encryption_key().as_bytes(), + b.encryption_key().as_bytes(), + "each file must get a fresh DEK" + ); + assert_ne!( + a.aad_prefix(), + b.aad_prefix(), + "each file must get a fresh AAD prefix" + ); + } +} diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index d88c4cb8f7..25f5417109 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -28,8 +28,7 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::Duration; -use aes_gcm::aead::OsRng; -use aes_gcm::aead::rand_core::RngCore; +use async_trait::async_trait; use chrono::Utc; use moka::future::Cache; use uuid::Uuid; @@ -37,6 +36,7 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +use super::handler::{FileEncryptionHandler, generate_standard_key_metadata}; use super::io::EncryptedOutputFile; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; @@ -54,10 +54,6 @@ const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730; /// Default cache TTL for unwrapped KEKs. const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600); -/// Default AAD prefix length in bytes. -/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. -const AAD_PREFIX_LENGTH: usize = 16; - /// File-level encryption manager using two-layer envelope encryption. /// /// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls. @@ -151,10 +147,7 @@ impl EncryptionManager { /// Returns an [`EncryptedOutputFile`] that transparently encrypts on /// write, along with key metadata for later decryption. pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile { - let dek = SecureKey::generate(self.key_size); - let aad_prefix = Self::generate_aad_prefix(); - let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix); - EncryptedOutputFile::new(raw_output, metadata) + EncryptedOutputFile::new(raw_output, generate_standard_key_metadata(self.key_size)) } /// Wrap a manifest list key metadata with a KEK for storage in table metadata. @@ -397,13 +390,6 @@ impl EncryptionManager { }) } - /// Generate a random AAD prefix for file encryption. - fn generate_aad_prefix() -> Box<[u8]> { - let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; - OsRng.fill_bytes(&mut prefix); - prefix.into_boxed_slice() - } - /// Wrap a DEK with a KEK using local AES-GCM. fn wrap_dek_with_kek( &self, @@ -429,6 +415,18 @@ impl EncryptionManager { } } +#[async_trait] +impl FileEncryptionHandler for EncryptionManager { + /// Generate per-file key metadata for the standard encryption scheme. + /// + /// Returns a fresh plaintext DEK + AAD prefix sized to the manager's + /// configured [`AesKeySize`]. No KMS round-trip — the KMS/KEK envelope + /// work happens one tier up when the manifest-list key metadata is wrapped. + async fn next_key_metadata(&self) -> Result { + Ok(generate_standard_key_metadata(self.key_size)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 12ee76e5e0..161b88cff3 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,6 +21,7 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; +mod handler; pub(crate) mod io; pub(crate) mod key_metadata; pub mod kms; @@ -28,6 +29,7 @@ mod manager; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +pub use handler::{FileEncryptionHandler, StandardFileEncryptionHandler}; pub use io::{EncryptedInputFile, EncryptedOutputFile}; pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 101919f5b3..e77d4c0af1 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -40,6 +40,12 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. + /// + /// Whether the resulting file is encrypted is determined by the builder's + /// own configuration (e.g. a [`FileEncryptionHandler`] configured upfront), + /// not by the caller picking a different `build` method. + /// + /// [`FileEncryptionHandler`]: crate::encryption::FileEncryptionHandler fn build(&self, output_file: OutputFile) -> impl Future> + Send; } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..5ec8b28dd6 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,6 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; +use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -36,6 +37,7 @@ use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; +use crate::encryption::{FileEncryptionHandler, StandardKeyMetadata}; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, @@ -46,12 +48,34 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// Build Parquet Modular Encryption properties from per-file key material. +/// +/// PME uses the DEK as the footer key (uniform encryption: same key for footer +/// and all column chunks) and the AAD prefix as the file-level AAD. +fn build_file_encryption_properties( + key_metadata: &StandardKeyMetadata, +) -> Result> { + let mut builder = + FileEncryptionProperties::builder(key_metadata.encryption_key().as_bytes().to_vec()); + if let Some(aad) = key_metadata.aad_prefix() { + builder = builder.with_aad_prefix(aad.to_vec()); + } + builder.build().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to build parquet file encryption properties", + ) + .with_source(e) + }) +} + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, match_mode: FieldMatchMode, + file_encryption_handler: Option>, } impl ParquetWriterBuilder { @@ -71,8 +95,23 @@ impl ParquetWriterBuilder { props, schema, match_mode, + file_encryption_handler: None, } } + + /// Configure per-file encryption via a [`FileEncryptionHandler`]. + /// + /// When set, every file produced by this builder is wrapped in AES-GCM + /// stream encryption keyed from the handler's `next_key_metadata`, and the + /// resulting [`DataFile::key_metadata`] carries the encoded + /// [`StandardKeyMetadata`] so readers can decrypt the file. + pub fn with_file_encryption_handler( + mut self, + handler: Arc, + ) -> Self { + self.file_encryption_handler = Some(handler); + self + } } impl FileWriterBuilder for ParquetWriterBuilder { @@ -85,6 +124,8 @@ impl FileWriterBuilder for ParquetWriterBuilder { writer_properties: self.props.clone(), current_row_num: 0, output_file, + file_encryption_handler: self.file_encryption_handler.clone(), + resolved_key_metadata: None, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } @@ -212,6 +253,12 @@ impl SchemaVisitor for IndexByParquetPathName { pub struct ParquetWriter { schema: SchemaRef, output_file: OutputFile, + /// When set, the writer wraps its raw output with AES-GCM stream + /// encryption using key material from this handler. + file_encryption_handler: Option>, + /// Set on lazy init when encryption is enabled — captured here so + /// `close()` can emit it on the resulting `DataFile::key_metadata`. + resolved_key_metadata: Option, inner_writer: Option>, writer_properties: WriterProperties, current_row_num: usize, @@ -307,7 +354,7 @@ impl MinMaxColAggregator { } impl ParquetWriter { - /// Converts parquet files to data files + /// Converts already-written parquet files into [`DataFile`]s. #[allow(dead_code)] pub(crate) async fn parquet_files_to_data_files( file_io: &FileIO, @@ -337,6 +384,7 @@ impl ParquetWriter { file_path, // TODO: Implement nan_value_counts here HashMap::new(), + None, )?; builder.partition_spec_id(table_metadata.default_partition_spec_id()); let data_file = builder.build().unwrap(); @@ -353,6 +401,7 @@ impl ParquetWriter { written_size: usize, file_path: String, nan_value_counts: HashMap, + key_metadata: Option>, ) -> Result { let index_by_parquet_path = { let mut visitor = IndexByParquetPathName::new(); @@ -412,6 +461,7 @@ impl ParquetWriter { // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd .lower_bounds(lower_bounds) .upper_bounds(upper_bounds) + .key_metadata(key_metadata) .split_offsets(Some( metadata .row_groups() @@ -490,12 +540,23 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let inner_writer = self.output_file.writer().await?; - let async_writer = AsyncFileWriter::new(inner_writer); + let writer_properties = if let Some(handler) = &self.file_encryption_handler { + let key_metadata = handler.next_key_metadata().await?; + let file_encryption_properties = build_file_encryption_properties(&key_metadata)?; + self.resolved_key_metadata = Some(key_metadata); + self.writer_properties + .clone() + .into_builder() + .with_file_encryption_properties(file_encryption_properties) + .build() + } else { + self.writer_properties.clone() + }; + let async_writer = AsyncFileWriter::new(self.output_file.writer().await?); let writer = AsyncArrowWriter::try_new( async_writer, arrow_schema.clone(), - Some(self.writer_properties.clone()), + Some(writer_properties), ) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") @@ -540,12 +601,18 @@ impl FileWriter for ParquetWriter { } else { let parquet_metadata = Arc::new(metadata); + let key_metadata = self + .resolved_key_metadata + .as_ref() + .map(|km| km.encode().map(|b| b.into_vec())) + .transpose()?; Ok(vec![Self::parquet_to_data_file_builder( self.schema, parquet_metadata, written_size, self.output_file.location().to_string(), self.nan_value_count_visitor.nan_value_counts, + key_metadata, )?]) } } @@ -627,6 +694,8 @@ mod tests { use super::*; use crate::arrow::schema_to_arrow_schema; + use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; + use crate::encryption::EncryptionManager; use crate::io::FileIO; use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale}; use crate::spec::{PrimitiveLiteral, Struct, *}; @@ -861,6 +930,85 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_writer_encrypted_round_trip() -> Result<()> { + use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + use parquet::encryption::decrypt::FileDecryptionProperties; + + let file_io = FileIO::new_with_memory(); + let path = "memory:///encrypted_parquet.parquet"; + + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let manager = Arc::new( + EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(), + ); + + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![ + Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + ])); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int64Array::from_iter_values(0..16)) as ArrayRef], + ) + .unwrap(); + + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(arrow_schema.as_ref().try_into().unwrap()), + ) + .with_file_encryption_handler(manager as Arc) + .build(file_io.new_output(path)?) + .await?; + pw.write(&batch).await?; + let mut builders = pw.close().await?; + + let data_file = builders + .pop() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + let decoded_km = StandardKeyMetadata::decode( + data_file + .key_metadata() + .expect("encrypted data file must carry key_metadata"), + )?; + + // Read back via PME: build FileDecryptionProperties from the per-file + // key material and hand them to the parquet reader. The file on disk + // is a valid parquet file — no stream decryption involved. + let mut decryption_builder = FileDecryptionProperties::builder( + decoded_km.encryption_key().as_bytes().to_vec(), + ); + if let Some(aad) = decoded_km.aad_prefix() { + decryption_builder = decryption_builder.with_aad_prefix(aad.to_vec()); + } + let decryption_properties = decryption_builder.build().unwrap(); + + let ciphertext = file_io.new_input(path)?.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + ciphertext, + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties), + ) + .unwrap() + .build() + .unwrap(); + let batches: Vec = reader.map(|b| b.unwrap()).collect(); + assert_eq!(batch, concat_batches(&arrow_schema, &batches).unwrap()); + + Ok(()) + } + #[tokio::test] async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap();