Skip to content

Commit 7d7fdff

Browse files
committed
Fix recovery writing extra delete record
1 parent 92f0ddb commit 7d7fdff

9 files changed

Lines changed: 99 additions & 97 deletions

File tree

compaction.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,8 @@ func (db *DB) compact(f *segment) (CompactionResult, error) {
8686
return cr, err
8787
}
8888

89-
func (db *DB) pickForCompaction() ([]*segment, error) {
90-
segments, err := db.datalog.segmentsBySequenceID()
91-
if err != nil {
92-
return nil, err
93-
}
89+
func (db *DB) pickForCompaction() []*segment {
90+
segments := db.datalog.segmentsBySequenceID()
9491
var picked []*segment
9592
for i := len(segments) - 1; i >= 0; i-- {
9693
seg := segments[i]
@@ -107,12 +104,12 @@ func (db *DB) pickForCompaction() ([]*segment, error) {
107104
if seg.meta.DeleteRecords > 0 {
108105
// Delete records can be discarded only when older files contain no put records for the corresponding keys.
109106
// All files older than the file eligible for compaction have to be compacted.
110-
return append(segments[:i+1], picked...), nil
107+
return append(segments[:i+1], picked...)
111108
}
112109

113110
picked = append([]*segment{seg}, picked...)
114111
}
115-
return picked, nil
112+
return picked
116113
}
117114

118115
// Compact compacts the DB. Deleted and overwritten items are discarded.
@@ -128,11 +125,8 @@ func (db *DB) Compact() (CompactionResult, error) {
128125
}()
129126

130127
db.mu.RLock()
131-
segments, err := db.pickForCompaction()
128+
segments := db.pickForCompaction()
132129
db.mu.RUnlock()
133-
if err != nil {
134-
return cr, err
135-
}
136130

137131
for _, f := range segments {
138132
fcr, err := db.compact(f)

datalog.go

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,18 @@ func openDatalog(opts *Options) (*datalog, error) {
3939
if ext != segmentExt {
4040
continue
4141
}
42-
seg, err := dl.openSegment(name)
42+
id, seqID, err := parseSegmentName(name)
43+
if err != nil {
44+
return nil, err
45+
}
46+
seg, err := dl.openSegment(name, id, seqID)
4347
if err != nil {
4448
return nil, err
4549
}
4650
if seg.sequenceID > dl.maxSequenceID {
4751
dl.maxSequenceID = seg.sequenceID
4852
}
53+
dl.segments[seg.id] = seg
4954
}
5055

5156
if err := dl.swapSegment(); err != nil {
@@ -71,46 +76,33 @@ func parseSegmentName(name string) (uint16, uint64, error) {
7176
return uint16(id), seqID, nil
7277
}
7378

74-
func (dl *datalog) openSegment(name string) (*segment, error) {
75-
id, seqID, err := parseSegmentName(name)
76-
if err != nil {
77-
return nil, err
78-
}
79+
func (dl *datalog) openSegment(name string, id uint16, seqID uint64) (*segment, error) {
7980
f, err := openFile(dl.opts.FileSystem, filepath.Join(dl.opts.path, name), false)
8081
if err != nil {
8182
return nil, err
8283
}
84+
8385
meta := &segmentMeta{}
8486
if !f.empty() {
8587
metaPath := filepath.Join(dl.opts.path, name+metaExt)
8688
if err := readGobFile(dl.opts.FileSystem, metaPath, &meta); err != nil {
8789
logger.Printf("error reading segment meta %d: %v", id, err)
8890
// TODO: rebuild meta?
8991
}
90-
} else {
91-
dl.maxSequenceID++
92-
seqID = dl.maxSequenceID
9392
}
9493

95-
df := &segment{
94+
seg := &segment{
9695
file: f,
9796
id: id,
9897
sequenceID: seqID,
9998
name: name,
10099
meta: meta,
101100
}
102-
dl.segments[id] = df
103-
return df, nil
101+
102+
return seg, nil
104103
}
105104

106105
func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) {
107-
for id, seg := range dl.segments {
108-
// Pick unfilled segment.
109-
if seg != nil && !seg.meta.Full {
110-
dl.maxSequenceID++
111-
return uint16(id), dl.maxSequenceID, nil
112-
}
113-
}
114106
for id, seg := range dl.segments {
115107
// Pick empty segment.
116108
if seg == nil {
@@ -122,21 +114,29 @@ func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) {
122114
}
123115

124116
func (dl *datalog) swapSegment() error {
117+
// Pick unfilled segment.
118+
for _, seg := range dl.segments {
119+
if seg != nil && !seg.meta.Full {
120+
dl.curSeg = seg
121+
return nil
122+
}
123+
}
124+
125+
// Create new segment.
125126
id, seqID, err := dl.nextWritableSegmentID()
126127
if err != nil {
127128
return err
128129
}
129-
var seg *segment
130-
if dl.segments[id] != nil {
131-
seg = dl.segments[id]
132-
} else {
133-
name := segmentName(id, seqID)
134-
seg, err = dl.openSegment(name)
135-
if err != nil {
136-
return err
137-
}
130+
131+
name := segmentName(id, seqID)
132+
seg, err := dl.openSegment(name, id, seqID)
133+
if err != nil {
134+
return err
138135
}
136+
137+
dl.segments[id] = seg
139138
dl.curSeg = seg
139+
140140
return nil
141141
}
142142

@@ -189,14 +189,13 @@ func (dl *datalog) readKey(sl slot) ([]byte, error) {
189189
return key, nil*/
190190
}
191191

192-
func (dl *datalog) trackOverwrite(sl slot) {
192+
func (dl *datalog) trackDel(sl slot) {
193193
meta := dl.segments[sl.segmentID].meta
194194
meta.DeletedKeys++
195195
meta.DeletedBytes += encodedRecordSize(sl.kvSize())
196196
}
197197

198-
func (dl *datalog) del(key []byte, sl slot) error {
199-
dl.trackOverwrite(sl)
198+
func (dl *datalog) del(key []byte) error {
200199
delRecord := encodeDeleteRecord(key)
201200
_, _, err := dl.writeRecord(delRecord, recordTypeDelete)
202201
if err != nil {
@@ -226,7 +225,7 @@ func (dl *datalog) writeRecord(data []byte, rt recordType) (uint16, uint32, erro
226225
return dl.curSeg.id, uint32(off), nil
227226
}
228227

229-
func (dl *datalog) writeKeyValue(key []byte, value []byte) (uint16, uint32, error) {
228+
func (dl *datalog) put(key []byte, value []byte) (uint16, uint32, error) {
230229
return dl.writeRecord(encodePutRecord(key, value), recordTypePut)
231230
}
232231

@@ -250,20 +249,20 @@ func (dl *datalog) close() error {
250249
return nil
251250
}
252251

253-
func (dl *datalog) segmentsBySequenceID() ([]*segment, error) {
254-
// Sort segments in ascending order by sequence ID.
252+
// segmentsBySequenceID returns segments ordered from oldest to newest.
253+
func (dl *datalog) segmentsBySequenceID() []*segment {
255254
var segments []*segment
256255

257-
for _, f := range dl.segments {
258-
if f == nil {
256+
for _, seg := range dl.segments {
257+
if seg == nil {
259258
continue
260259
}
261-
segments = append(segments, f)
260+
segments = append(segments, seg)
262261
}
263262

264263
sort.SliceStable(segments, func(i, j int) bool {
265264
return segments[i].sequenceID < segments[j].sequenceID
266265
})
267266

268-
return segments, nil
267+
return segments
269268
}

datalog_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,37 @@ import (
66
"github.com/akrylysov/pogreb/internal/assert"
77
)
88

9+
func (dl *datalog) segmentMetas() []segmentMeta {
10+
var metas []segmentMeta
11+
for _, seg := range dl.segmentsBySequenceID() {
12+
metas = append(metas, *seg.meta)
13+
}
14+
return metas
15+
}
16+
917
func TestDatalog(t *testing.T) {
1018
db, err := createTestDB(nil)
1119
assert.Nil(t, err)
1220

13-
_, _, err = db.datalog.writeKeyValue([]byte{'1'}, []byte{'1'})
21+
_, _, err = db.datalog.put([]byte{'1'}, []byte{'1'})
1422
assert.Nil(t, err)
1523
assert.Equal(t, &segmentMeta{PutRecords: 1}, db.datalog.segments[0].meta)
1624
assert.Nil(t, db.datalog.segments[1])
1725

18-
sm, err := db.datalog.segmentsBySequenceID()
19-
assert.Nil(t, err)
26+
sm := db.datalog.segmentsBySequenceID()
2027
assert.Equal(t, []*segment{db.datalog.segments[0]}, sm)
2128

2229
// Writing to a full file swaps it.
2330
db.datalog.segments[0].meta.Full = true
24-
_, _, err = db.datalog.writeKeyValue([]byte{'1'}, []byte{'1'})
31+
_, _, err = db.datalog.put([]byte{'1'}, []byte{'1'})
2532
assert.Nil(t, err)
2633
assert.Equal(t, &segmentMeta{PutRecords: 1, Full: true}, db.datalog.segments[0].meta)
2734
assert.Equal(t, &segmentMeta{PutRecords: 1}, db.datalog.segments[1].meta)
2835

29-
sm, err = db.datalog.segmentsBySequenceID()
30-
assert.Nil(t, err)
36+
sm = db.datalog.segmentsBySequenceID()
3137
assert.Equal(t, []*segment{db.datalog.segments[0], db.datalog.segments[1]}, sm)
3238

33-
_, _, err = db.datalog.writeKeyValue([]byte{'1'}, []byte{'1'})
39+
_, _, err = db.datalog.put([]byte{'1'}, []byte{'1'})
3440
assert.Nil(t, err)
3541
assert.Equal(t, &segmentMeta{PutRecords: 1, Full: true}, db.datalog.segments[0].meta)
3642
assert.Equal(t, &segmentMeta{PutRecords: 2}, db.datalog.segments[1].meta)

db.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (db *DB) put(sl slot, key []byte) error {
251251
return true, err
252252
}
253253
if bytes.Equal(key, slKey) {
254-
db.datalog.trackOverwrite(cursl) // Overwriting existing key.
254+
db.datalog.trackDel(cursl) // Overwriting existing key.
255255
return true, nil
256256
}
257257
return false, nil
@@ -271,7 +271,7 @@ func (db *DB) Put(key []byte, value []byte) error {
271271
db.mu.Lock()
272272
defer db.mu.Unlock()
273273

274-
fileID, offset, err := db.datalog.writeKeyValue(key, value)
274+
fileID, offset, err := db.datalog.put(key, value)
275275
if err != nil {
276276
return err
277277
}
@@ -294,7 +294,7 @@ func (db *DB) Put(key []byte, value []byte) error {
294294
return nil
295295
}
296296

297-
func (db *DB) del(h uint32, key []byte) error {
297+
func (db *DB) del(h uint32, key []byte, writeWAL bool) error {
298298
err := db.index.delete(h, func(sl slot) (b bool, e error) {
299299
if uint16(len(key)) != sl.keySize {
300300
return false, nil
@@ -304,7 +304,12 @@ func (db *DB) del(h uint32, key []byte) error {
304304
return true, err
305305
}
306306
if bytes.Equal(key, slKey) {
307-
return true, db.datalog.del(key, sl)
307+
db.datalog.trackDel(sl)
308+
var err error
309+
if writeWAL {
310+
err = db.datalog.del(key)
311+
}
312+
return true, err
308313
}
309314
return false, nil
310315
})
@@ -317,7 +322,7 @@ func (db *DB) Delete(key []byte) error {
317322
db.metrics.Dels.Add(1)
318323
db.mu.Lock()
319324
defer db.mu.Unlock()
320-
if err := db.del(h, key); err != nil {
325+
if err := db.del(h, key, true); err != nil {
321326
return err
322327
}
323328
if db.syncWrites {

db_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestEmpty(t *testing.T) {
8282
assert.Nil(t, db.Close())
8383
}
8484

85-
func TestSimple(t *testing.T) {
85+
func TestFull(t *testing.T) {
8686
db, err := createTestDB(&Options{maxSegmentSize: 1024, BackgroundSyncInterval: -1})
8787
assert.Nil(t, err)
8888
var i byte
@@ -135,6 +135,7 @@ func TestSimple(t *testing.T) {
135135
assert.Nil(t, db.Close())
136136
}
137137

138+
expectedSegMetas := db.datalog.segmentMetas()
138139
verifyKeysAndClose(0)
139140

140141
// Simulate crash.
@@ -147,8 +148,7 @@ func TestSimple(t *testing.T) {
147148
assert.Nil(t, err)
148149
verifyKeysAndClose(0)
149150

150-
assert.Equal(t, segmentMeta{PutRecords: 42, DeleteRecords: 1, DeletedBytes: 11}, *db.datalog.segments[0].meta)
151-
assert.Equal(t, segmentMeta{PutRecords: 42}, *db.datalog.segments[1].meta)
151+
assert.Equal(t, expectedSegMetas, db.datalog.segmentMetas())
152152

153153
// Update all items
154154
db, err = Open("test.db", nil)

internal/assert/assert_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestEqual(t *testing.T) {
6767
}
6868

6969
for i, tc := range testCases {
70-
t.Run(fmt.Sprintf("case %d %+v", i, tc), func(t *testing.T) {
70+
t.Run(fmt.Sprintf("%d %+v", i, tc), func(t *testing.T) {
7171
mock := &testing.T{}
7272
wg := &sync.WaitGroup{}
7373
wg.Add(1)
@@ -129,7 +129,7 @@ func TestNil(t *testing.T) {
129129
}
130130

131131
for i, tc := range testCases {
132-
t.Run(fmt.Sprintf("case %d %+v", i, tc.obj), func(t *testing.T) {
132+
t.Run(fmt.Sprintf("%d %+v", i, tc.obj), func(t *testing.T) {
133133
mock := &testing.T{}
134134
wg := &sync.WaitGroup{}
135135
wg.Add(1)

internal/hash/murmurhash32_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestSum32WithSeed(t *testing.T) {
4444
},
4545
}
4646
for i, tc := range testCases {
47-
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
47+
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
4848
assert.Equal(t, tc.out, Sum32WithSeed(tc.in, tc.seed))
4949
})
5050
}

0 commit comments

Comments
 (0)