Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions crates/iceberg/src/encryption/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Pluggable generation of per-file `key_metadata` on the write path.

use std::fmt::Debug;

use aes_gcm::aead::OsRng;
use aes_gcm::aead::rand_core::RngCore;
use async_trait::async_trait;

use super::crypto::{AesKeySize, SecureKey};
use super::key_metadata::StandardKeyMetadata;
use crate::Result;

/// AAD prefix length in bytes.
/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`.
const AAD_PREFIX_LENGTH: usize = 16;

/// Produces the per-file `key_metadata` that the writer attaches to each
/// emitted [`DataFile`] and uses to encrypt the file.
///
/// The spec defines `key_metadata` (field 131) as implementation-specific. The
/// reference *standard* encryption scheme stores a [`StandardKeyMetadata`]
/// containing a fresh plaintext DEK + AAD prefix per file, generated locally
/// without a KMS round-trip (see [`StandardFileEncryptionHandler`]). Other
/// schemes may need to call out to a KMS to wrap a freshly minted DEK, hence
/// the `async` signature.
///
/// This is the write-side counterpart of `FileKeyResolver` on the read path:
/// readers resolve `key_metadata` bytes back into a [`StandardKeyMetadata`];
/// writers produce one to embed.
///
/// [`DataFile`]: crate::spec::DataFile
#[async_trait]
pub trait FileEncryptionHandler: Debug + Send + Sync {
/// Produce key material for the next file to be written.
async fn next_key_metadata(&self) -> Result<StandardKeyMetadata>;
}

/// Default [`FileEncryptionHandler`] for the standard encryption scheme.
///
/// Generates a fresh random DEK and AAD prefix per file with no KMS
/// round-trip; satisfies the async signature trivially.
#[derive(Debug, Default, Clone)]
pub struct StandardFileEncryptionHandler {
key_size: AesKeySize,
}

impl StandardFileEncryptionHandler {
/// Creates a new handler with the given DEK size.
pub fn new(key_size: AesKeySize) -> Self {
Self { key_size }
}
}

#[async_trait]
impl FileEncryptionHandler for StandardFileEncryptionHandler {
async fn next_key_metadata(&self) -> Result<StandardKeyMetadata> {
Ok(generate_standard_key_metadata(self.key_size))
}
}

/// Generate a [`StandardKeyMetadata`] with a fresh random DEK and AAD prefix.
pub(crate) fn generate_standard_key_metadata(key_size: AesKeySize) -> StandardKeyMetadata {
let dek = SecureKey::generate(key_size);
let aad_prefix = generate_aad_prefix();
StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix)
}

fn generate_aad_prefix() -> Box<[u8]> {
let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
OsRng.fill_bytes(&mut prefix);
prefix.into_boxed_slice()
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_standard_handler_emits_distinct_keys() {
let handler = StandardFileEncryptionHandler::default();
let a = handler.next_key_metadata().await.unwrap();
let b = handler.next_key_metadata().await.unwrap();
assert_ne!(
a.encryption_key().as_bytes(),
b.encryption_key().as_bytes(),
"each file must get a fresh DEK"
);
assert_ne!(
a.aad_prefix(),
b.aad_prefix(),
"each file must get a fresh AAD prefix"
);
}
}
32 changes: 15 additions & 17 deletions crates/iceberg/src/encryption/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ use std::fmt;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use aes_gcm::aead::OsRng;
use aes_gcm::aead::rand_core::RngCore;
use async_trait::async_trait;
use chrono::Utc;
use moka::future::Cache;
use uuid::Uuid;

const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000;

use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
use super::handler::{FileEncryptionHandler, generate_standard_key_metadata};
use super::io::EncryptedOutputFile;
use super::key_metadata::StandardKeyMetadata;
use super::kms::KeyManagementClient;
Expand All @@ -54,10 +54,6 @@ const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730;
/// Default cache TTL for unwrapped KEKs.
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600);

/// Default AAD prefix length in bytes.
/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`.
const AAD_PREFIX_LENGTH: usize = 16;

/// File-level encryption manager using two-layer envelope encryption.
///
/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls.
Expand Down Expand Up @@ -151,10 +147,7 @@ impl EncryptionManager {
/// Returns an [`EncryptedOutputFile`] that transparently encrypts on
/// write, along with key metadata for later decryption.
pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile {
let dek = SecureKey::generate(self.key_size);
let aad_prefix = Self::generate_aad_prefix();
let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix);
EncryptedOutputFile::new(raw_output, metadata)
EncryptedOutputFile::new(raw_output, generate_standard_key_metadata(self.key_size))
}

/// Wrap a manifest list key metadata with a KEK for storage in table metadata.
Expand Down Expand Up @@ -397,13 +390,6 @@ impl EncryptionManager {
})
}

/// Generate a random AAD prefix for file encryption.
fn generate_aad_prefix() -> Box<[u8]> {
let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
OsRng.fill_bytes(&mut prefix);
prefix.into_boxed_slice()
}

/// Wrap a DEK with a KEK using local AES-GCM.
fn wrap_dek_with_kek(
&self,
Expand All @@ -429,6 +415,18 @@ impl EncryptionManager {
}
}

#[async_trait]
impl FileEncryptionHandler for EncryptionManager {
/// Generate per-file key metadata for the standard encryption scheme.
///
/// Returns a fresh plaintext DEK + AAD prefix sized to the manager's
/// configured [`AesKeySize`]. No KMS round-trip — the KMS/KEK envelope
/// work happens one tier up when the manifest-list key metadata is wrapped.
async fn next_key_metadata(&self) -> Result<StandardKeyMetadata> {
Ok(generate_standard_key_metadata(self.key_size))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
//! for encrypting and decrypting data in Iceberg tables.

mod crypto;
mod handler;
pub(crate) mod io;
pub(crate) mod key_metadata;
pub mod kms;
mod manager;
mod stream;

pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
pub use handler::{FileEncryptionHandler, StandardFileEncryptionHandler};
pub use io::{EncryptedInputFile, EncryptedOutputFile};
pub use key_metadata::StandardKeyMetadata;
pub use kms::{GeneratedKey, KeyManagementClient};
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
///
/// Whether the resulting file is encrypted is determined by the builder's
/// own configuration (e.g. a [`FileEncryptionHandler`] configured upfront),
/// not by the caller picking a different `build` method.
///
/// [`FileEncryptionHandler`]: crate::encryption::FileEncryptionHandler
fn build(&self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
}

Expand Down
Loading
Loading