Skip to content

Commit 7a743ce

Browse files
authored
perf(api): async S3 uploads in StoreMessage (#471)
* perf(api): async S3 uploads in StoreMessage to reduce latency S3 upload latency was the bottleneck in StoreMessage (p75 ~1.8s). Pre-compute asset metadata (SHA256, S3 key) in-memory, write DB + Redis cache synchronously, then upload to S3 in a background goroutine. This also fixes context canceled errors when clients disconnect early. * test(api): add unit tests for PrepareJSONAsset and PrepareFormFileAsset
1 parent fa46480 commit 7a743ce

3 files changed

Lines changed: 259 additions & 18 deletions

File tree

src/server/api/go/internal/infra/blob/s3.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,98 @@ func (u *S3Deps) UploadJSON(ctx context.Context, keyPrefix string, data interfac
353353
)
354354
}
355355

356+
// PreparedUpload holds pre-computed asset metadata and content for deferred S3 upload.
357+
type PreparedUpload struct {
358+
Asset model.Asset // Pre-computed asset metadata (S3Key, SHA256, MIME, SizeB)
359+
Content []byte // Serialized content to upload
360+
Metadata map[string]string // S3 object metadata
361+
}
362+
363+
// PrepareJSONAsset pre-computes asset metadata for JSON data without making any S3 calls.
364+
// Returns a PreparedUpload that can later be uploaded via UploadPrepared.
365+
func (u *S3Deps) PrepareJSONAsset(keyPrefix string, data interface{}) (*PreparedUpload, error) {
366+
jsonData, err := sonic.Marshal(data)
367+
if err != nil {
368+
return nil, fmt.Errorf("marshal json: %w", err)
369+
}
370+
371+
h := sha256.New()
372+
h.Write(jsonData)
373+
sumHex := hex.EncodeToString(h.Sum(nil))
374+
375+
datePrefix := time.Now().UTC().Format("2006/01/02")
376+
key := fmt.Sprintf("%s/%s/%s.json", keyPrefix, datePrefix, sumHex)
377+
378+
return &PreparedUpload{
379+
Asset: model.Asset{
380+
Bucket: u.Bucket,
381+
S3Key: key,
382+
SHA256: sumHex,
383+
MIME: "application/json",
384+
SizeB: int64(len(jsonData)),
385+
},
386+
Content: jsonData,
387+
Metadata: map[string]string{"sha256": sumHex},
388+
}, nil
389+
}
390+
391+
// PrepareFormFileAsset pre-computes asset metadata for a multipart file without making any S3 calls.
392+
// Returns a PreparedUpload that can later be uploaded via UploadPrepared.
393+
func (u *S3Deps) PrepareFormFileAsset(keyPrefix string, fh *multipart.FileHeader) (*PreparedUpload, error) {
394+
file, err := fh.Open()
395+
if err != nil {
396+
return nil, err
397+
}
398+
defer file.Close()
399+
400+
var buf bytes.Buffer
401+
if _, err := io.Copy(&buf, file); err != nil {
402+
return nil, err
403+
}
404+
fileContent := buf.Bytes()
405+
406+
h := sha256.New()
407+
h.Write(fileContent)
408+
sumHex := hex.EncodeToString(h.Sum(nil))
409+
410+
ext := strings.ToLower(filepath.Ext(fh.Filename))
411+
contentType := mime.DetectMimeType(fileContent, fh.Filename)
412+
413+
datePrefix := time.Now().UTC().Format("2006/01/02")
414+
key := fmt.Sprintf("%s/%s/%s%s", keyPrefix, datePrefix, sumHex, ext)
415+
416+
return &PreparedUpload{
417+
Asset: model.Asset{
418+
Bucket: u.Bucket,
419+
S3Key: key,
420+
SHA256: sumHex,
421+
MIME: contentType,
422+
SizeB: int64(len(fileContent)),
423+
},
424+
Content: fileContent,
425+
Metadata: map[string]string{"sha256": sumHex, "name": fh.Filename},
426+
}, nil
427+
}
428+
429+
// UploadPrepared executes a deferred S3 upload for a previously prepared asset.
430+
// It uploads directly using the pre-computed S3 key. Since the key contains the
431+
// content SHA256, re-uploading identical content is naturally idempotent.
432+
func (u *S3Deps) UploadPrepared(ctx context.Context, p *PreparedUpload) error {
433+
input := &s3.PutObjectInput{
434+
Bucket: aws.String(u.Bucket),
435+
Key: aws.String(p.Asset.S3Key),
436+
Body: bytes.NewReader(p.Content),
437+
ContentType: aws.String(p.Asset.MIME),
438+
Metadata: p.Metadata,
439+
}
440+
if u.SSE != nil {
441+
input.ServerSideEncryption = *u.SSE
442+
}
443+
444+
_, err := u.Uploader.Upload(ctx, input)
445+
return err
446+
}
447+
356448
// UploadFileDirect uploads a file directly to S3 at the specified key (no deduplication)
357449
// This is used when you need to preserve the exact file structure
358450
func (u *S3Deps) UploadFileDirect(ctx context.Context, key string, content []byte, contentType string) (*model.Asset, error) {
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package blob
2+
3+
import (
4+
"bytes"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"mime/multipart"
8+
"net/textproto"
9+
"strings"
10+
"testing"
11+
12+
"github.com/bytedance/sonic"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestPrepareJSONAsset(t *testing.T) {
18+
s3 := &S3Deps{Bucket: "test-bucket"}
19+
20+
data := map[string]string{"hello": "world"}
21+
prepared, err := s3.PrepareJSONAsset("parts/project-123", data)
22+
require.NoError(t, err)
23+
24+
// Verify SHA256 matches manual computation
25+
jsonBytes, _ := sonic.Marshal(data)
26+
h := sha256.New()
27+
h.Write(jsonBytes)
28+
expectedSHA := hex.EncodeToString(h.Sum(nil))
29+
30+
assert.Equal(t, expectedSHA, prepared.Asset.SHA256)
31+
assert.Equal(t, "test-bucket", prepared.Asset.Bucket)
32+
assert.Equal(t, "application/json", prepared.Asset.MIME)
33+
assert.Equal(t, int64(len(jsonBytes)), prepared.Asset.SizeB)
34+
assert.Equal(t, jsonBytes, prepared.Content)
35+
36+
// Verify S3 key format: {prefix}/{date}/{sha256}.json
37+
assert.True(t, strings.HasPrefix(prepared.Asset.S3Key, "parts/project-123/"))
38+
assert.True(t, strings.HasSuffix(prepared.Asset.S3Key, expectedSHA+".json"))
39+
40+
// Verify metadata
41+
assert.Equal(t, expectedSHA, prepared.Metadata["sha256"])
42+
}
43+
44+
func TestPrepareJSONAsset_Deterministic(t *testing.T) {
45+
s3 := &S3Deps{Bucket: "test-bucket"}
46+
47+
data := []int{1, 2, 3}
48+
p1, err := s3.PrepareJSONAsset("prefix", data)
49+
require.NoError(t, err)
50+
51+
p2, err := s3.PrepareJSONAsset("prefix", data)
52+
require.NoError(t, err)
53+
54+
// Same input produces same SHA256 and key
55+
assert.Equal(t, p1.Asset.SHA256, p2.Asset.SHA256)
56+
assert.Equal(t, p1.Asset.S3Key, p2.Asset.S3Key)
57+
assert.Equal(t, p1.Content, p2.Content)
58+
}
59+
60+
func TestPrepareJSONAsset_DifferentData(t *testing.T) {
61+
s3 := &S3Deps{Bucket: "test-bucket"}
62+
63+
p1, err := s3.PrepareJSONAsset("prefix", "aaa")
64+
require.NoError(t, err)
65+
66+
p2, err := s3.PrepareJSONAsset("prefix", "bbb")
67+
require.NoError(t, err)
68+
69+
assert.NotEqual(t, p1.Asset.SHA256, p2.Asset.SHA256)
70+
}
71+
72+
// newTestFileHeader creates a multipart.FileHeader for testing.
73+
func newTestFileHeader(filename string, content []byte) *multipart.FileHeader {
74+
var b bytes.Buffer
75+
w := multipart.NewWriter(&b)
76+
part, _ := w.CreatePart(textproto.MIMEHeader{
77+
"Content-Disposition": {`form-data; name="file"; filename="` + filename + `"`},
78+
"Content-Type": {"application/octet-stream"},
79+
})
80+
part.Write(content)
81+
w.Close()
82+
83+
r := multipart.NewReader(&b, w.Boundary())
84+
form, _ := r.ReadForm(1 << 20)
85+
return form.File["file"][0]
86+
}
87+
88+
func TestPrepareFormFileAsset(t *testing.T) {
89+
s3 := &S3Deps{Bucket: "test-bucket"}
90+
91+
content := []byte("test file content")
92+
fh := newTestFileHeader("document.txt", content)
93+
94+
prepared, err := s3.PrepareFormFileAsset("assets/project-456", fh)
95+
require.NoError(t, err)
96+
97+
// Verify SHA256
98+
h := sha256.New()
99+
h.Write(content)
100+
expectedSHA := hex.EncodeToString(h.Sum(nil))
101+
102+
assert.Equal(t, expectedSHA, prepared.Asset.SHA256)
103+
assert.Equal(t, "test-bucket", prepared.Asset.Bucket)
104+
assert.Equal(t, int64(len(content)), prepared.Asset.SizeB)
105+
assert.Equal(t, content, prepared.Content)
106+
107+
// Verify S3 key has correct extension
108+
assert.True(t, strings.HasSuffix(prepared.Asset.S3Key, expectedSHA+".txt"))
109+
assert.True(t, strings.HasPrefix(prepared.Asset.S3Key, "assets/project-456/"))
110+
111+
// Verify metadata includes filename
112+
assert.Equal(t, expectedSHA, prepared.Metadata["sha256"])
113+
assert.Equal(t, "document.txt", prepared.Metadata["name"])
114+
}
115+
116+
func TestPrepareFormFileAsset_Deterministic(t *testing.T) {
117+
s3 := &S3Deps{Bucket: "test-bucket"}
118+
119+
content := []byte("same content")
120+
fh1 := newTestFileHeader("file.bin", content)
121+
fh2 := newTestFileHeader("file.bin", content)
122+
123+
p1, err := s3.PrepareFormFileAsset("prefix", fh1)
124+
require.NoError(t, err)
125+
126+
p2, err := s3.PrepareFormFileAsset("prefix", fh2)
127+
require.NoError(t, err)
128+
129+
assert.Equal(t, p1.Asset.SHA256, p2.Asset.SHA256)
130+
assert.Equal(t, p1.Asset.S3Key, p2.Asset.S3Key)
131+
}

src/server/api/go/internal/modules/service/session.go

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ func (s *sessionService) StoreMessage(ctx context.Context, in StoreMessageInput)
293293

294294
parts := make([]model.Part, 0, len(in.Parts))
295295
var uploadedAssets []model.Asset
296+
var pendingUploads []*blob.PreparedUpload
296297

297298
for idx := range in.Parts {
298299
partIn := &in.Parts[idx] // Use pointer to avoid repeated indexing and allow modifications
@@ -316,14 +317,15 @@ func (s *sessionService) StoreMessage(ctx context.Context, in StoreMessageInput)
316317
return nil, fmt.Errorf("parts[%d]: missing uploaded file %s", idx, partIn.FileField)
317318
}
318319

319-
// upload asset to S3
320-
asset, err := s.s3.UploadFormFile(ctx, "assets/"+in.ProjectID.String(), fh)
320+
// Pre-compute asset metadata without S3 calls
321+
prepared, err := s.s3.PrepareFormFileAsset("assets/"+in.ProjectID.String(), fh)
321322
if err != nil {
322-
return nil, fmt.Errorf("upload %s failed: %w", partIn.FileField, err)
323+
return nil, fmt.Errorf("prepare %s failed: %w", partIn.FileField, err)
323324
}
324325

325-
uploadedAssets = append(uploadedAssets, *asset)
326-
part.Asset = asset
326+
pendingUploads = append(pendingUploads, prepared)
327+
uploadedAssets = append(uploadedAssets, prepared.Asset)
328+
part.Asset = &prepared.Asset
327329
part.Filename = fh.Filename
328330
}
329331

@@ -334,13 +336,37 @@ func (s *sessionService) StoreMessage(ctx context.Context, in StoreMessageInput)
334336
parts = append(parts, part)
335337
}
336338

337-
// upload parts to S3 as JSON file
338-
asset, err := s.s3.UploadJSON(ctx, "parts/"+in.ProjectID.String(), parts)
339+
// Pre-compute parts JSON asset metadata without S3 calls
340+
partsAssetPrepared, err := s.s3.PrepareJSONAsset("parts/"+in.ProjectID.String(), parts)
339341
if err != nil {
340-
return nil, fmt.Errorf("upload parts to S3 failed: %w", err)
342+
return nil, fmt.Errorf("prepare parts asset failed: %w", err)
341343
}
342344

343-
uploadedAssets = append(uploadedAssets, *asset)
345+
pendingUploads = append(pendingUploads, partsAssetPrepared)
346+
partsAsset := partsAssetPrepared.Asset
347+
uploadedAssets = append(uploadedAssets, partsAsset)
348+
349+
// Cache parts data in Redis before responding (uses pre-computed SHA256)
350+
if s.redis != nil {
351+
if err := s.cachePartsInRedis(ctx, partsAsset.SHA256, parts); err != nil {
352+
s.log.Warn("failed to cache parts in Redis", zap.String("sha256", partsAsset.SHA256), zap.Error(err))
353+
}
354+
}
355+
356+
// Upload all assets to S3 asynchronously — not on the request critical path.
357+
// Since S3 keys are content-addressed (SHA256), uploads are idempotent.
358+
go func() {
359+
bgCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
360+
defer cancel()
361+
for _, p := range pendingUploads {
362+
if err := s.s3.UploadPrepared(bgCtx, p); err != nil {
363+
s.log.Error("async S3 upload failed",
364+
zap.String("s3_key", p.Asset.S3Key),
365+
zap.String("sha256", p.Asset.SHA256),
366+
zap.Error(err))
367+
}
368+
}
369+
}()
344370

345371
// Increment asset reference counts asynchronously to avoid blocking the response.
346372
go func() {
@@ -352,14 +378,6 @@ func (s *sessionService) StoreMessage(ctx context.Context, in StoreMessageInput)
352378
}
353379
}()
354380

355-
// Cache parts data in Redis after successful S3 upload
356-
if s.redis != nil {
357-
if err := s.cachePartsInRedis(ctx, asset.SHA256, parts); err != nil {
358-
// Log error but don't fail the request if Redis caching fails
359-
s.log.Warn("failed to cache parts in Redis", zap.String("sha256", asset.SHA256), zap.Error(err))
360-
}
361-
}
362-
363381
// Prepare message metadata
364382
messageMeta := in.MessageMeta
365383
if messageMeta == nil {
@@ -370,7 +388,7 @@ func (s *sessionService) StoreMessage(ctx context.Context, in StoreMessageInput)
370388
SessionID: in.SessionID,
371389
Role: in.Role,
372390
Meta: datatypes.NewJSONType(messageMeta), // Store message-level metadata
373-
PartsAssetMeta: datatypes.NewJSONType(*asset),
391+
PartsAssetMeta: datatypes.NewJSONType(partsAsset),
374392
Parts: parts,
375393
}
376394

0 commit comments

Comments
 (0)