Skip to content

Commit c1d3ff8

Browse files
GenerQAQclaude
andcommitted
test: add unit tests for idempotent RewrapDEK and E2E tests for key rotation
Unit tests (crypto/envelope_test.go): - TestRewrapDEK_Idempotent: rewrap already-rewrapped object returns "" (skip) - TestRewrapDEK_BothKEKsFail: neither KEK can unwrap → error - TestEncodeDecodeBase64: round-trip for new helper functions E2E tests (test_encryption.py): - test_key_rotation_multiple_artifacts: rotate with 3 artifacts, verify all decryptable with new key, old key rejected - test_key_rotation_clears_rotation_state: verify rotation_started_at and rotation_encrypted_secret are NULL after successful rotation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent aebb6e0 commit c1d3ff8

2 files changed

Lines changed: 129 additions & 0 deletions

File tree

src/server/api/go/internal/infra/crypto/envelope_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,71 @@ func TestRewrapDEK(t *testing.T) {
195195
}
196196
}
197197

198+
func TestRewrapDEK_Idempotent(t *testing.T) {
199+
// Simulate the crash-safe key rotation: if an object is already rewrapped
200+
// with newKEK, calling RewrapDEK again should return "" (skip) instead of erroring.
201+
oldUserKEK, _ := DeriveUserKEK("old-key-for-idempotent", "pepper")
202+
newUserKEK, _ := DeriveUserKEK("new-key-for-idempotent", "pepper")
203+
204+
plaintext := []byte("data for idempotent rewrap test")
205+
ciphertext, meta, _ := EncryptData(oldUserKEK, plaintext)
206+
207+
// First rewrap: old → new
208+
newWrapped, err := RewrapDEK(meta, oldUserKEK, newUserKEK)
209+
if err != nil {
210+
t.Fatal("first rewrap failed:", err)
211+
}
212+
if newWrapped == "" {
213+
t.Fatal("first rewrap should not skip")
214+
}
215+
meta.UserWrappedDEK = newWrapped
216+
217+
// Second rewrap with same old/new KEKs — should skip (return "")
218+
skipped, err := RewrapDEK(meta, oldUserKEK, newUserKEK)
219+
if err != nil {
220+
t.Fatal("second rewrap should not error:", err)
221+
}
222+
if skipped != "" {
223+
t.Fatal("second rewrap should return empty string (skip already-rewrapped object)")
224+
}
225+
226+
// Verify data is still decryptable with new KEK
227+
decrypted, err := DecryptData(newUserKEK, ciphertext, meta)
228+
if err != nil {
229+
t.Fatal("decrypt with new KEK after idempotent rewrap:", err)
230+
}
231+
if !bytes.Equal(plaintext, decrypted) {
232+
t.Fatal("plaintext should match after idempotent rewrap")
233+
}
234+
}
235+
236+
func TestRewrapDEK_BothKEKsFail(t *testing.T) {
237+
// If the DEK is wrapped with a totally different KEK, RewrapDEK should error.
238+
kek1, _ := DeriveUserKEK("kek-one", "pepper")
239+
kek2, _ := DeriveUserKEK("kek-two", "pepper")
240+
kekWrong, _ := DeriveUserKEK("kek-wrong", "pepper")
241+
242+
_, meta, _ := EncryptData(kekWrong, []byte("data"))
243+
244+
// Neither kek1 nor kek2 can unwrap — should return error
245+
_, err := RewrapDEK(meta, kek1, kek2)
246+
if err == nil {
247+
t.Fatal("RewrapDEK should fail when neither KEK can unwrap")
248+
}
249+
}
250+
251+
func TestEncodeDecodeBase64(t *testing.T) {
252+
original := []byte("hello base64 round trip")
253+
encoded := EncodeBase64(original)
254+
decoded, err := DecodeBase64(encoded)
255+
if err != nil {
256+
t.Fatal(err)
257+
}
258+
if !bytes.Equal(original, decoded) {
259+
t.Fatal("decoded should match original")
260+
}
261+
}
262+
198263
func TestMetadataMapRoundTrip(t *testing.T) {
199264
meta := &EncryptedMeta{
200265
Algo: "AES-256-GCM",

src/server/tests/e2e/test_encryption.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,70 @@ async def test_key_rotation_encrypted_project(db_conn, encrypted_project):
506506
assert resp.content == plaintext
507507

508508

509+
@pytest.mark.asyncio
510+
async def test_key_rotation_multiple_artifacts(db_conn, encrypted_project):
511+
"""Rotate key with multiple artifacts → ALL artifacts still accessible with new key."""
512+
files = {
513+
"file1.txt": b"first file for rotation test",
514+
"file2.txt": b"second file for rotation test",
515+
"file3.txt": b"third file for rotation test",
516+
}
517+
518+
async with httpx.AsyncClient() as client:
519+
disk_id = await create_disk(client, encrypted_project.headers)
520+
521+
# Upload multiple artifacts
522+
for fname, content in files.items():
523+
await upload_artifact(
524+
client, disk_id, encrypted_project.headers,
525+
filename=fname, content=content,
526+
)
527+
528+
# Rotate the key
529+
resp = await rotate_key_with_rewrap(client, encrypted_project.bearer_token)
530+
assert resp.status_code == 200, f"Key rotation failed: {resp.text}"
531+
new_key = resp.json()["data"]["secret_key"]
532+
new_headers = {"Authorization": f"Bearer {new_key}"}
533+
534+
# Verify ALL artifacts are decryptable with new key
535+
for fname, expected_content in files.items():
536+
resp = await download_artifact(
537+
client, disk_id, f"/{fname}", new_headers,
538+
)
539+
assert resp.status_code == 200, f"Download {fname} failed after rotation"
540+
assert resp.content == expected_content, f"Content mismatch for {fname}"
541+
542+
# Verify old key no longer works
543+
resp = await download_artifact(
544+
client, disk_id, "/file1.txt", encrypted_project.headers,
545+
)
546+
assert resp.status_code in (401, 403, 404, 500), (
547+
f"Old key should be rejected after rotation, got {resp.status_code}"
548+
)
549+
550+
551+
@pytest.mark.asyncio
552+
async def test_key_rotation_clears_rotation_state(db_conn, encrypted_project):
553+
"""After successful rotation, rotation_started_at should be NULL."""
554+
async with httpx.AsyncClient() as client:
555+
disk_id = await create_disk(client, encrypted_project.headers)
556+
await upload_artifact(
557+
client, disk_id, encrypted_project.headers,
558+
filename="state-check.txt", content=b"check rotation state",
559+
)
560+
561+
resp = await rotate_key_with_rewrap(client, encrypted_project.bearer_token)
562+
assert resp.status_code == 200
563+
564+
# Verify rotation state is cleared in DB
565+
row = await db_conn.fetchrow(
566+
"SELECT rotation_started_at, rotation_encrypted_secret FROM projects WHERE id = $1",
567+
encrypted_project.project_id,
568+
)
569+
assert row["rotation_started_at"] is None, "rotation_started_at should be NULL after success"
570+
assert row["rotation_encrypted_secret"] is None, "rotation_encrypted_secret should be NULL after success"
571+
572+
509573
@pytest.mark.asyncio
510574
async def test_key_rotation_plain_project(db_conn, plain_project):
511575
"""Rotate key on plain project → works without re-wrap (no encrypted data)."""

0 commit comments

Comments
 (0)