Skip to content

Commit 88ca8b6

Browse files
authored
feat(encryption) [3/N] Support encryption: KMS (#2339)
## Which issue does this PR close? Part of #2034 ## What changes are included in this PR? Adds the `KeyManagementClient` trait and an in-memory implementation for testing. - `KeyManagementClient` trait mirrors Java's `KeyManagementClient` [interface](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java). - `InMemoryKeyManagementClient` for testing-only KMS that stores master keys in memory to wrap/unwrap. Supports configurable key sizes and explicit key bytes for cross-language interop tests. ## Are these changes tested? Tests covered for `InMemoryKeyManagementClient`
1 parent 1ad4bfd commit 88ca8b6

6 files changed

Lines changed: 436 additions & 8 deletions

File tree

crates/iceberg/src/encryption/crypto.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{Error, ErrorKind, Result};
4343
/// containing `SensitiveBytes` can safely derive or implement `Debug`
4444
/// without risk of leaking key material.
4545
#[derive(Clone, PartialEq, Eq)]
46-
struct SensitiveBytes(Zeroizing<Box<[u8]>>);
46+
pub struct SensitiveBytes(Zeroizing<Box<[u8]>>);
4747

4848
impl SensitiveBytes {
4949
/// Wraps the given bytes as sensitive material.
@@ -57,13 +57,11 @@ impl SensitiveBytes {
5757
}
5858

5959
/// Returns the number of bytes.
60-
#[allow(dead_code)] // Encryption work is ongoing so currently unused
6160
pub fn len(&self) -> usize {
6261
self.0.len()
6362
}
6463

6564
/// Returns `true` if the byte slice is empty.
66-
#[allow(dead_code)] // Encryption work is ongoing so currently unused
6765
pub fn is_empty(&self) -> bool {
6866
self.0.is_empty()
6967
}
@@ -85,9 +83,10 @@ impl fmt::Display for SensitiveBytes {
8583
///
8684
/// The Iceberg spec supports 128, 192, and 256-bit keys for AES-GCM.
8785
/// See: <https://iceberg.apache.org/gcm-stream-spec/#goals>
88-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
8987
pub enum AesKeySize {
90-
/// 128-bit AES key (16 bytes)
88+
/// 128-bit AES key (16 bytes). Default per the Iceberg spec.
89+
#[default]
9190
Bits128 = 128,
9291
/// 192-bit AES key (24 bytes)
9392
Bits192 = 192,
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Key management client trait for encryption key operations.
19+
//!
20+
//! Mirrors the Java `KeyManagementClient` interface from the Apache Iceberg spec.
21+
22+
use async_trait::async_trait;
23+
24+
use crate::Result;
25+
use crate::encryption::SensitiveBytes;
26+
27+
/// Result of a server-side key generation operation.
28+
///
29+
/// Returned by [`KeyManagementClient::generate_key`] when the KMS supports
30+
/// atomic key generation and wrapping.
31+
pub struct GeneratedKey {
32+
key: SensitiveBytes,
33+
wrapped_key: Vec<u8>,
34+
}
35+
36+
impl GeneratedKey {
37+
/// Creates a new `GeneratedKey` from plaintext key bytes and wrapped key bytes.
38+
pub fn new(key: SensitiveBytes, wrapped_key: Vec<u8>) -> Self {
39+
Self { key, wrapped_key }
40+
}
41+
42+
/// Returns the plaintext key bytes. Zeroized on drop, redacted in Debug.
43+
pub fn key(&self) -> &SensitiveBytes {
44+
&self.key
45+
}
46+
47+
/// Returns the wrapped (encrypted) key bytes.
48+
pub fn wrapped_key(&self) -> &[u8] {
49+
&self.wrapped_key
50+
}
51+
}
52+
53+
/// Pluggable interface for key management systems (AWS KMS, Azure Key Vault, etc.).
54+
#[async_trait]
55+
pub trait KeyManagementClient: Send + Sync + std::fmt::Debug {
56+
/// Wrap (encrypt) a key using a wrapping key managed by the KMS.
57+
async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result<Vec<u8>>;
58+
59+
/// Unwrap (decrypt) a previously wrapped key.
60+
async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str)
61+
-> Result<SensitiveBytes>;
62+
63+
/// Whether this KMS supports server-side key generation.
64+
///
65+
/// If `true`, callers can use [`generate_key`](Self::generate_key) for atomic
66+
/// key generation and wrapping, which is more secure than generating a key
67+
/// locally and then wrapping it.
68+
fn supports_key_generation(&self) -> bool;
69+
70+
/// Generate a new key and wrap it atomically on the server side.
71+
///
72+
/// This is only supported when [`supports_key_generation`](Self::supports_key_generation)
73+
/// returns `true`.
74+
async fn generate_key(&self, wrapping_key_id: &str) -> Result<GeneratedKey>;
75+
}
76+
77+
#[async_trait]
78+
impl<T: AsRef<dyn KeyManagementClient> + Send + Sync + std::fmt::Debug> KeyManagementClient for T {
79+
async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result<Vec<u8>> {
80+
self.as_ref().wrap_key(key, wrapping_key_id).await
81+
}
82+
83+
async fn unwrap_key(
84+
&self,
85+
wrapped_key: &[u8],
86+
wrapping_key_id: &str,
87+
) -> Result<SensitiveBytes> {
88+
self.as_ref().unwrap_key(wrapped_key, wrapping_key_id).await
89+
}
90+
91+
fn supports_key_generation(&self) -> bool {
92+
self.as_ref().supports_key_generation()
93+
}
94+
95+
async fn generate_key(&self, wrapping_key_id: &str) -> Result<GeneratedKey> {
96+
self.as_ref().generate_key(wrapping_key_id).await
97+
}
98+
}

0 commit comments

Comments
 (0)