Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,44 @@ const (
s3LeaderHealthPath = "/healthz/leader"
s3HealthMaxRequestBodyBytes = 1024
s3ChunkSize = 1 << 20
s3ChunkBatchOps = 16
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
s3DefaultRegion = "us-east-1"
s3MaxKeys = 1000
s3ListPageSize = 256
s3ManifestCleanupTimeout = 2 * time.Minute
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.
// s3ChunkBatchOps caps how many s3ChunkSize chunks fit in a single
// coordinator.Dispatch call on the data-write path
// (PutObject / UploadPart). Sized so the resulting Raft entry stays
// strictly under the post-PR-#593 default `MaxSizePerMsg = 4 MiB`
// even after protobuf framing overhead — each pb.Mutation carries
// the Op tag, Key tag + bytes, and Value length prefix; the
// pb.Request envelope wraps them; marshalRaftCommand prepends one
// byte. Empirically the per-Mutation overhead is ~60 B for normal
// keys and grows linearly with the bucket / objectKey length, so
// `4 × 1 MiB = 4 MiB` exactly is *over* MaxSizePerMsg in practice
// and falls into etcd/raft's util.go:limitSize oversized-first-
// entry path, bypassing the documented
// `MaxInflight × MaxSizePerMsg` per-peer memory bound. Capping at
// `3 × 1 MiB ≈ 3 MiB + few hundred bytes` leaves ~1 MiB of headroom
// even with kilobyte-scale object keys, so the entry rides the
// normal batched-MsgApp path and the bound holds. Per-PUT Raft
// commit count grows ~5× from the pre-PR-#636 baseline (a 5 GiB
// PUT goes from 320 → ~1707 entries) but each fsync is ~5×
// smaller; the WAL group commit landed in PR #600 absorbs the
// higher commit rate. See TestS3ChunkBatchFitsInRaftMaxSize
// for the encoded-size invariant.
s3ChunkBatchOps = 3
// s3MetaBatchOps batches key-only Del / scan ops on cleanup paths
// (cleanupPartBlobsAsync, deleteByPrefix, cleanupManifestBlobs).
// These ops carry no chunk payload, so the MaxSizePerMsg cap
// translates to a pure key-count budget: 64 BlobKey-shaped keys
// × ~100 B each ≈ 6 KiB per batch, three orders of magnitude
// under the 4 MiB limit. Keeping this batch large means a
// 5 GiB-object cleanup commits ~80 batches instead of ~1707, so
// orphaned-blob garbage collection finishes proportionally faster
// and does not amplify Raft load relative to the data-write path.
s3MetaBatchOps = 64
s3XMLNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
s3DefaultRegion = "us-east-1"
s3MaxKeys = 1000
s3ListPageSize = 256
s3ManifestCleanupTimeout = 2 * time.Minute
s3MaxObjectSizeBytes = 5 * 1024 * 1024 * 1024 // 5 GiB, matching AWS S3 single PUT limit.

s3TxnRetryInitialBackoff = 2 * time.Millisecond
s3TxnRetryMaxBackoff = 32 * time.Millisecond
Expand Down Expand Up @@ -1876,7 +1907,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
flush := func() {
if len(pending) == 0 {
return
Expand All @@ -1897,7 +1928,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
Op: kv.Del,
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion),
})
if len(pending) >= s3ChunkBatchOps {
if len(pending) >= s3MetaBatchOps {
flush()
}
}
Expand Down Expand Up @@ -1930,7 +1961,7 @@ func (s *S3Server) deleteByPrefix(ctx context.Context, prefix []byte, bucket str
for {
readTS := s.readTS()
readPin := s.pinReadTS(readTS)
kvs, err := s.store.ScanAt(ctx, cursor, end, s3ChunkBatchOps, readTS)
kvs, err := s.store.ScanAt(ctx, cursor, end, s3MetaBatchOps, readTS)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep blob cleanup scan page size independent of delete batch

deleteByPrefix now uses s3MetaBatchOps for ScanAt, so cleanup of blob prefixes scans 64 full chunk KVs per loop. In this codebase, ScanAt materializes values (e.g. store/mvcc_store.go clones Value in collectScanResults), and blob values are ~1 MiB each, so one cleanup worker can allocate/read ~64 MiB per page and up to ~1 GiB with 16 concurrent cleanup workers. This is a regression from the smaller scan page and can cause significant memory/GC and I/O spikes during abort/manifest cleanup even though the code only needs keys for deletes; consider keeping scan size small (or key-only scan) while still batching delete dispatches at 64.

Useful? React with 👍 / 👎.

readPin.Release()
if err != nil {
slog.ErrorContext(ctx, "deleteByPrefix: scan failed",
Expand Down Expand Up @@ -2189,7 +2220,7 @@ func (s *S3Server) cleanupManifestBlobs(ctx context.Context, bucket string, gene
if s == nil || manifest == nil || manifest.UploadID == "" || s.coordinator == nil {
return
}
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
pending := make([]*kv.Elem[kv.OP], 0, s3MetaBatchOps)
flush := func() {
if len(pending) == 0 {
return
Expand Down Expand Up @@ -2224,7 +2255,7 @@ func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string,
Op: kv.Del,
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion),
})
if len(pending) >= s3ChunkBatchOps {
if len(pending) >= s3MetaBatchOps {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

appendPartBlobKeys 内でのバッチ処理ロジックが正しく動作していません。pending スライスが値渡しされているため、この関数内での append による変更(長さの更新や再割り当て)が、flush クロージャが参照している呼び出し元の pending 変数に伝わりません。その結果、flush 内の len(pending) は常に 0 となり、バッチごとのフラッシュが実行されず、全キーが1つの巨大なバッチとして蓄積されてしまいます。これは、外部リクエストに起因するコレクションの無制限な増大を防ぎ、OOM(メモリ不足)を回避するというリポジトリの原則に抵触します。関数シグネチャをポインタ経由(*[]*kv.Elem[kv.OP])に変更するか、バッチ管理の構造を見直してください。

References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests, such as pending configuration changes. Reject new requests when the bound is reached.

flush()
}
}
Expand Down
139 changes: 139 additions & 0 deletions adapter/s3_chunk_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package adapter

import (
"strings"
"testing"

"github.com/bootjp/elastickv/internal/s3keys"
pb "github.com/bootjp/elastickv/proto"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

// raftMaxSizePerMsgPostPR593 is the post-PR-#593 default for
// etcd/raft's MaxSizePerMsg setting. Hardcoded here (rather than
// imported from internal/raftengine/etcd) so the test does not pull
// the engine package; the value is intentionally duplicated and pinned
// because the entire point of this test is to detect when an S3 batch
// silently grows past it.
const raftMaxSizePerMsgPostPR593 = 4 << 20

// TestS3ChunkBatchFitsInRaftMaxSize is the byte-budget invariant the
// s3ChunkBatchOps comment advertises: a worst-case S3 PutObject /
// UploadPart batch must encode strictly under
// raftMaxSizePerMsgPostPR593, plus the 1-byte raft framing prefix
// added by marshalRaftCommand.
//
// Without this guard, raising s3ChunkBatchOps or growing s3ChunkSize
// would silently route Raft entries through etcd/raft's
// oversized-first-entry path (util.go:limitSize, the "as an
// exception, if the size of the first entry exceeds maxSize, a
// non-empty slice with just this entry is returned" branch), which
// inflates the documented `MaxInflight × MaxSizePerMsg` per-peer
// memory bound.
func TestS3ChunkBatchFitsInRaftMaxSize(t *testing.T) {
t.Parallel()

// Worst-case key: a kilobyte-scale objectKey amplifies the
// per-Mutation envelope. Choose 1 KiB to model a deeply nested
// S3 path; longer keys are unusual but the headroom is generous.
bucket := "test-bucket"
objectKey := strings.Repeat("a", 1024)
uploadID := "upload-12345678901234567890"
const generation uint64 = 1
const partNo uint64 = 1

// Fill the chunk value with non-zero bytes so protobuf does not
// elide trailing zeros and underestimate the encoded size.
value := make([]byte, s3ChunkSize)
for i := range value {
value[i] = 0xAB
}

muts := make([]*pb.Mutation, 0, s3ChunkBatchOps)
for i := uint64(0); i < uint64(s3ChunkBatchOps); i++ {
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
muts = append(muts, &pb.Mutation{
Op: pb.Op_PUT,
Key: key,
Value: value,
})
}

req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: 1234567890,
Mutations: muts,
}

encoded, err := proto.Marshal(req)
require.NoError(t, err)

// marshalRaftCommand prepends one framing byte (raftEncodeSingle
// or raftEncodeBatch). Account for it explicitly.
const raftFramingPrefix = 1
totalEntrySize := len(encoded) + raftFramingPrefix

require.Lessf(t,
totalEntrySize, raftMaxSizePerMsgPostPR593,
"S3 chunk batch entry must fit strictly under MaxSizePerMsg=%d to avoid the etcd/raft oversized-first-entry path; got %d (s3ChunkBatchOps=%d, s3ChunkSize=%d, objectKey=%dB)",
raftMaxSizePerMsgPostPR593, totalEntrySize, s3ChunkBatchOps, s3ChunkSize, len(objectKey),
)

// Sanity: the headroom should be meaningful (at least 64 KiB) so
// future small bumps in key length or Request envelope fields do
// not silently push past the limit. This is the constant we
// document in the s3ChunkBatchOps comment.
const minHeadroom = 64 << 10
require.Greaterf(t,
raftMaxSizePerMsgPostPR593-totalEntrySize, minHeadroom,
"S3 chunk batch headroom under MaxSizePerMsg has fallen below %d B (got %d B); reduce s3ChunkBatchOps or s3ChunkSize",
minHeadroom, raftMaxSizePerMsgPostPR593-totalEntrySize,
)
}

// TestS3MetaBatchFitsInRaftMaxSize is the same byte-budget invariant
// for the cleanup paths (cleanupPartBlobsAsync, deleteByPrefix,
// cleanupManifestBlobs). These ops carry no chunk payload so the
// batch is dominated by key bytes; even at the worst-case key length
// the total stays well under the cap. The test pins the headroom
// margin so a future bump in s3MetaBatchOps that pushes too far is
// caught at PR time.
func TestS3MetaBatchFitsInRaftMaxSize(t *testing.T) {
t.Parallel()

bucket := "test-bucket"
objectKey := strings.Repeat("a", 1024)
uploadID := "upload-12345678901234567890"
const generation uint64 = 1
const partNo uint64 = 1

muts := make([]*pb.Mutation, 0, s3MetaBatchOps)
for i := uint64(0); i < uint64(s3MetaBatchOps); i++ {
key := s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i)
muts = append(muts, &pb.Mutation{
Op: pb.Op_DEL,
Key: key,
})
}

req := &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: 1234567890,
Mutations: muts,
}

encoded, err := proto.Marshal(req)
require.NoError(t, err)

const raftFramingPrefix = 1
totalEntrySize := len(encoded) + raftFramingPrefix

require.Lessf(t,
totalEntrySize, raftMaxSizePerMsgPostPR593,
"S3 meta batch entry must fit under MaxSizePerMsg=%d; got %d (s3MetaBatchOps=%d, objectKey=%dB)",
raftMaxSizePerMsgPostPR593, totalEntrySize, s3MetaBatchOps, len(objectKey),
)
}
Loading