Skip to content

Commit 91ebaa9

Browse files
committed
merge: resolve conflicts with dev (integrate AssetRefBuffer with encryption branch)
2 parents bd38ed2 + c6a3ef9 commit 91ebaa9

10 files changed

Lines changed: 679 additions & 20 deletions

File tree

src/server/api/go/cmd/admin/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/memodb-io/Acontext/internal/infra/cache"
2828
dbpkg "github.com/memodb-io/Acontext/internal/infra/db"
2929
"github.com/memodb-io/Acontext/internal/modules/handler"
30+
"github.com/memodb-io/Acontext/internal/modules/repo"
3031
"github.com/memodb-io/Acontext/internal/pkg/tokenizer"
3132
"github.com/memodb-io/Acontext/internal/router"
3233
"github.com/memodb-io/Acontext/internal/telemetry"
@@ -129,6 +130,10 @@ func main() {
129130
IdleTimeout: 120 * time.Second,
130131
}
131132

133+
// Start the Redis-buffered asset reference writer.
134+
assetRefBuffer := do.MustInvoke[repo.AssetRefBuffer](inj)
135+
assetRefBuffer.Start()
136+
132137
go func() {
133138
log.Sugar().Infow("starting admin http server", "addr", addr)
134139
log.Sugar().Infow("swagger url", "url", addr+"/swagger/index.html")
@@ -142,6 +147,9 @@ func main() {
142147
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
143148
<-quit
144149

150+
// Stop the asset reference buffer first (final flush to DB).
151+
assetRefBuffer.Stop()
152+
145153
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
146154
defer cancel()
147155
if err := srv.Shutdown(ctx); err != nil {

src/server/api/go/cmd/server/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/memodb-io/Acontext/internal/infra/cache"
2828
dbpkg "github.com/memodb-io/Acontext/internal/infra/db"
2929
"github.com/memodb-io/Acontext/internal/modules/handler"
30+
"github.com/memodb-io/Acontext/internal/modules/repo"
3031
"github.com/memodb-io/Acontext/internal/pkg/tokenizer"
3132
"github.com/memodb-io/Acontext/internal/router"
3233
"github.com/memodb-io/Acontext/internal/telemetry"
@@ -121,6 +122,10 @@ func main() {
121122
IdleTimeout: 120 * time.Second,
122123
}
123124

125+
// Start the Redis-buffered asset reference writer.
126+
assetRefBuffer := do.MustInvoke[repo.AssetRefBuffer](inj)
127+
assetRefBuffer.Start()
128+
124129
go func() {
125130
log.Sugar().Infow("starting http server", "addr", addr)
126131
log.Sugar().Infow("swagger url", "url", addr+"/swagger/index.html")
@@ -135,6 +140,9 @@ func main() {
135140
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
136141
<-quit
137142

143+
// Stop the asset reference buffer first (final flush to DB).
144+
assetRefBuffer.Stop()
145+
138146
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
139147
defer cancel()
140148
if err := srv.Shutdown(ctx); err != nil {

src/server/api/go/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ require (
5353
github.com/ClickHouse/ch-go v0.70.0 // indirect
5454
github.com/ClickHouse/clickhouse-go/v2 v2.42.0 // indirect
5555
github.com/KyleBanks/depth v1.2.1 // indirect
56+
github.com/alicebob/miniredis/v2 v2.37.0 // indirect
5657
github.com/andybalholm/brotli v1.2.0 // indirect
5758
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.7 // indirect
5859
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect

src/server/api/go/internal/bootstrap/container.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,22 @@ func BuildContainer() *do.Injector {
267267
), nil
268268
})
269269

270+
// Asset reference buffer (Redis-backed, flushed to DB periodically)
271+
do.Provide(inj, func(i *do.Injector) (repo.AssetRefBuffer, error) {
272+
return repo.NewAssetRefBuffer(
273+
do.MustInvoke[*redis.Client](i),
274+
do.MustInvoke[repo.AssetReferenceRepo](i),
275+
do.MustInvoke[*zap.Logger](i),
276+
), nil
277+
})
278+
270279
// Service
271280
do.Provide(inj, func(i *do.Injector) (service.SessionService, error) {
272281
return service.NewSessionService(
273282
do.MustInvoke[repo.SessionRepo](i),
274283
do.MustInvoke[repo.SessionEventRepo](i),
275284
do.MustInvoke[repo.AssetReferenceRepo](i),
285+
do.MustInvoke[repo.AssetRefBuffer](i),
276286
do.MustInvoke[*zap.Logger](i),
277287
do.MustInvoke[*blob.S3Deps](i),
278288
do.MustInvoke[*mq.Publisher](i),
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
package repo
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
"github.com/memodb-io/Acontext/internal/modules/model"
12+
"github.com/redis/go-redis/v9"
13+
"go.uber.org/zap"
14+
)
15+
16+
const (
17+
// Redis key prefixes for the asset reference buffer.
18+
assetRefBufPrefix = "assetref:buf:" // Hash: sha256 → delta count
19+
assetRefMetaPrefix = "assetref:meta:" // String: JSON of model.Asset
20+
assetRefProjectsKey = "assetref:projects" // Set: project IDs with pending deltas
21+
assetRefFlushLockKey = "assetref:flush:lock" // Distributed flush lock
22+
assetRefMetaTTL = time.Hour // Metadata key TTL
23+
assetRefFlushLockTTL = 3 * time.Second // Flush lock TTL
24+
assetRefFlushInterval = time.Second // Flush ticker interval
25+
)
26+
27+
// AssetRefBuffer buffers asset reference increments in Redis and flushes
28+
// them to the database in coalesced batches. This avoids per-request
29+
// INSERT ... ON CONFLICT contention under high concurrency.
30+
type AssetRefBuffer interface {
31+
Enqueue(ctx context.Context, projectID uuid.UUID, assets []model.Asset) error
32+
Start()
33+
Stop()
34+
}
35+
36+
type assetRefBuffer struct {
37+
redis *redis.Client
38+
repo AssetReferenceRepo
39+
log *zap.Logger
40+
stop chan struct{}
41+
done chan struct{}
42+
stopped atomic.Bool
43+
}
44+
45+
// drainScript atomically reads all fields from a hash and deletes it.
46+
var drainScript = redis.NewScript(`
47+
local data = redis.call('HGETALL', KEYS[1])
48+
if #data > 0 then
49+
redis.call('DEL', KEYS[1])
50+
end
51+
return data
52+
`)
53+
54+
func NewAssetRefBuffer(rdb *redis.Client, repo AssetReferenceRepo, log *zap.Logger) AssetRefBuffer {
55+
return &assetRefBuffer{
56+
redis: rdb,
57+
repo: repo,
58+
log: log,
59+
stop: make(chan struct{}),
60+
done: make(chan struct{}),
61+
}
62+
}
63+
64+
// Enqueue buffers asset reference increments in Redis via pipeline.
65+
// Each asset gets HINCRBY +1 on the project's buffer hash, metadata stored (NX),
66+
// and the project ID added to the pending set.
67+
func (b *assetRefBuffer) Enqueue(ctx context.Context, projectID uuid.UUID, assets []model.Asset) error {
68+
if len(assets) == 0 {
69+
return nil
70+
}
71+
if b.stopped.Load() {
72+
b.log.Warn("AssetRefBuffer.Enqueue called after Stop, dropping",
73+
zap.String("project_id", projectID.String()),
74+
zap.Int("assets", len(assets)))
75+
return nil
76+
}
77+
78+
pid := projectID.String()
79+
bufKey := assetRefBufPrefix + pid
80+
81+
pipe := b.redis.Pipeline()
82+
for _, a := range assets {
83+
if a.SHA256 == "" {
84+
continue
85+
}
86+
pipe.HIncrBy(ctx, bufKey, a.SHA256, 1)
87+
88+
metaJSON, err := json.Marshal(a)
89+
if err != nil {
90+
b.log.Warn("failed to marshal asset meta", zap.String("sha256", a.SHA256), zap.Error(err))
91+
continue
92+
}
93+
metaKey := assetRefMetaPrefix + pid + ":" + a.SHA256
94+
pipe.SetNX(ctx, metaKey, metaJSON, assetRefMetaTTL)
95+
}
96+
pipe.SAdd(ctx, assetRefProjectsKey, pid)
97+
98+
_, err := pipe.Exec(ctx)
99+
if err != nil {
100+
return fmt.Errorf("AssetRefBuffer.Enqueue pipeline: %w", err)
101+
}
102+
return nil
103+
}
104+
105+
// Start begins the background flusher goroutine.
106+
func (b *assetRefBuffer) Start() {
107+
go b.run()
108+
}
109+
110+
// Stop signals the flusher to exit, waits for the final flush to complete.
111+
func (b *assetRefBuffer) Stop() {
112+
b.stopped.Store(true)
113+
close(b.stop)
114+
<-b.done
115+
}
116+
117+
func (b *assetRefBuffer) run() {
118+
defer close(b.done)
119+
120+
ticker := time.NewTicker(assetRefFlushInterval)
121+
defer ticker.Stop()
122+
123+
for {
124+
select {
125+
case <-ticker.C:
126+
b.flushAll()
127+
case <-b.stop:
128+
// Final flush before exit.
129+
b.flushAll()
130+
return
131+
}
132+
}
133+
}
134+
135+
// flushAll acquires a distributed lock and flushes all pending projects.
136+
func (b *assetRefBuffer) flushAll() {
137+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
138+
defer cancel()
139+
140+
// Distributed lock so only one pod flushes at a time.
141+
ok, err := b.redis.SetNX(ctx, assetRefFlushLockKey, "1", assetRefFlushLockTTL).Result()
142+
if err != nil {
143+
b.log.Error("AssetRefBuffer: failed to acquire flush lock", zap.Error(err))
144+
return
145+
}
146+
if !ok {
147+
return // Another pod is flushing.
148+
}
149+
defer b.redis.Del(ctx, assetRefFlushLockKey)
150+
151+
// Get all project IDs with pending deltas.
152+
pids, err := b.redis.SMembers(ctx, assetRefProjectsKey).Result()
153+
if err != nil {
154+
b.log.Error("AssetRefBuffer: SMEMBERS failed", zap.Error(err))
155+
return
156+
}
157+
158+
for _, pid := range pids {
159+
b.flushProject(ctx, pid)
160+
}
161+
}
162+
163+
// flushProject atomically drains the buffer hash for one project and upserts to the DB.
164+
func (b *assetRefBuffer) flushProject(ctx context.Context, pid string) {
165+
bufKey := assetRefBufPrefix + pid
166+
167+
// Atomically HGETALL + DEL via Lua.
168+
result, err := drainScript.Run(ctx, b.redis, []string{bufKey}).StringSlice()
169+
if err != nil {
170+
if err == redis.Nil {
171+
// Empty hash, remove from set and return.
172+
b.redis.SRem(ctx, assetRefProjectsKey, pid)
173+
return
174+
}
175+
b.log.Error("AssetRefBuffer: drain script failed", zap.String("project_id", pid), zap.Error(err))
176+
return
177+
}
178+
if len(result) == 0 {
179+
b.redis.SRem(ctx, assetRefProjectsKey, pid)
180+
return
181+
}
182+
183+
projectID, err := uuid.Parse(pid)
184+
if err != nil {
185+
b.log.Error("AssetRefBuffer: invalid project_id", zap.String("project_id", pid), zap.Error(err))
186+
b.redis.SRem(ctx, assetRefProjectsKey, pid)
187+
return
188+
}
189+
190+
// result is [field1, value1, field2, value2, ...]
191+
increments := make([]AssetRefIncrement, 0, len(result)/2)
192+
metaKeysToDelete := make([]string, 0, len(result)/2)
193+
194+
for i := 0; i < len(result)-1; i += 2 {
195+
sha256 := result[i]
196+
var count int
197+
if _, err := fmt.Sscanf(result[i+1], "%d", &count); err != nil || count <= 0 {
198+
continue
199+
}
200+
201+
// Fetch asset metadata.
202+
metaKey := assetRefMetaPrefix + pid + ":" + sha256
203+
metaJSON, err := b.redis.Get(ctx, metaKey).Result()
204+
if err != nil {
205+
b.log.Warn("AssetRefBuffer: missing meta for asset, using minimal",
206+
zap.String("sha256", sha256), zap.Error(err))
207+
// Fallback: create minimal asset with just SHA256.
208+
increments = append(increments, AssetRefIncrement{
209+
Asset: model.Asset{SHA256: sha256},
210+
Count: count,
211+
})
212+
continue
213+
}
214+
metaKeysToDelete = append(metaKeysToDelete, metaKey)
215+
216+
var asset model.Asset
217+
if err := json.Unmarshal([]byte(metaJSON), &asset); err != nil {
218+
b.log.Warn("AssetRefBuffer: failed to unmarshal meta",
219+
zap.String("sha256", sha256), zap.Error(err))
220+
increments = append(increments, AssetRefIncrement{
221+
Asset: model.Asset{SHA256: sha256},
222+
Count: count,
223+
})
224+
continue
225+
}
226+
increments = append(increments, AssetRefIncrement{
227+
Asset: asset,
228+
Count: count,
229+
})
230+
}
231+
232+
if len(increments) > 0 {
233+
if err := b.repo.BatchIncrementAssetRefsWithCounts(ctx, projectID, increments); err != nil {
234+
b.log.Error("AssetRefBuffer: DB flush failed",
235+
zap.String("project_id", pid),
236+
zap.Int("increments", len(increments)),
237+
zap.Error(err))
238+
// Don't remove from project set — will retry on next tick.
239+
return
240+
}
241+
}
242+
243+
// Clean up: remove project from set and delete meta keys.
244+
b.redis.SRem(ctx, assetRefProjectsKey, pid)
245+
if len(metaKeysToDelete) > 0 {
246+
b.redis.Del(ctx, metaKeysToDelete...)
247+
}
248+
}

0 commit comments

Comments
 (0)