Skip to content

Commit 92f0ddb

Browse files
authored
Segment sequence ID (#34)
1 parent cc107cd commit 92f0ddb

8 files changed

Lines changed: 104 additions & 76 deletions

File tree

compaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (db *DB) compact(f *segment) (CompactionResult, error) {
8787
}
8888

8989
func (db *DB) pickForCompaction() ([]*segment, error) {
90-
segments, err := db.datalog.segmentsByModification()
90+
segments, err := db.datalog.segmentsBySequenceID()
9191
if err != nil {
9292
return nil, err
9393
}

compaction_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func TestCompaction(t *testing.T) {
7272
assert.Nil(t, db.datalog.segments[0])
7373
assert.Equal(t, &segmentMeta{PutRecords: 1}, db.datalog.segments[1].meta)
7474
// Compacted file was removed.
75-
assert.Equal(t, false, fileExists(filepath.Join(db.opts.path, segmentName(0))))
76-
assert.Equal(t, false, fileExists(filepath.Join(db.opts.path, segmentMetaName(0))))
75+
assert.Equal(t, false, fileExists(filepath.Join(db.opts.path, segmentName(0, 1))))
76+
assert.Equal(t, false, fileExists(filepath.Join(db.opts.path, segmentMetaName(0, 1))))
7777
})
7878

7979
run("compact entire segment", func(t *testing.T, db *DB) {

datalog.go

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sort"
1010
"strconv"
1111
"strings"
12-
"time"
1312
)
1413

1514
const (
@@ -18,10 +17,10 @@ const (
1817

1918
// datalog is a write-ahead log.
2019
type datalog struct {
21-
opts *Options
22-
curSeg *segment
23-
segments [maxSegments]*segment
24-
modTime int64
20+
opts *Options
21+
curSeg *segment
22+
segments [maxSegments]*segment
23+
maxSequenceID uint64
2524
}
2625

2726
func openDatalog(opts *Options) (*datalog, error) {
@@ -31,8 +30,7 @@ func openDatalog(opts *Options) (*datalog, error) {
3130
}
3231

3332
dl := &datalog{
34-
opts: opts,
35-
modTime: time.Now().UnixNano(),
33+
opts: opts,
3634
}
3735

3836
for _, file := range files {
@@ -41,13 +39,12 @@ func openDatalog(opts *Options) (*datalog, error) {
4139
if ext != segmentExt {
4240
continue
4341
}
44-
id, err := strconv.ParseInt(strings.TrimSuffix(name, ext), 10, 16)
42+
seg, err := dl.openSegment(name)
4543
if err != nil {
4644
return nil, err
4745
}
48-
_, err = dl.openSegment(filepath.Join(opts.path, name), uint16(id))
49-
if err != nil {
50-
return nil, err
46+
if seg.sequenceID > dl.maxSequenceID {
47+
dl.maxSequenceID = seg.sequenceID
5148
}
5249
}
5350

@@ -58,80 +55,110 @@ func openDatalog(opts *Options) (*datalog, error) {
5855
return dl, nil
5956
}
6057

61-
func (dl *datalog) openSegment(path string, id uint16) (*segment, error) {
62-
f, err := openFile(dl.opts.FileSystem, path, false)
58+
func parseSegmentName(name string) (uint16, uint64, error) {
59+
parts := strings.SplitN(strings.TrimSuffix(name, segmentExt), "-", 2)
60+
id, err := strconv.ParseUint(parts[0], 10, 16)
61+
if err != nil {
62+
return 0, 0, err
63+
}
64+
var seqID uint64
65+
if len(parts) == 2 {
66+
seqID, err = strconv.ParseUint(parts[1], 10, 64)
67+
if err != nil {
68+
return 0, 0, err
69+
}
70+
}
71+
return uint16(id), seqID, nil
72+
}
73+
74+
func (dl *datalog) openSegment(name string) (*segment, error) {
75+
id, seqID, err := parseSegmentName(name)
76+
if err != nil {
77+
return nil, err
78+
}
79+
f, err := openFile(dl.opts.FileSystem, filepath.Join(dl.opts.path, name), false)
6380
if err != nil {
6481
return nil, err
6582
}
66-
var modTime int64
6783
meta := &segmentMeta{}
6884
if !f.empty() {
69-
metaPath := filepath.Join(dl.opts.path, segmentMetaName(id))
85+
metaPath := filepath.Join(dl.opts.path, name+metaExt)
7086
if err := readGobFile(dl.opts.FileSystem, metaPath, &meta); err != nil {
7187
logger.Printf("error reading segment meta %d: %v", id, err)
7288
// TODO: rebuild meta?
7389
}
74-
stat, err := f.MmapFile.Stat()
75-
if err != nil {
76-
return nil, err
77-
}
78-
modTime = stat.ModTime().UnixNano()
7990
} else {
80-
dl.modTime++
81-
modTime = dl.modTime
91+
dl.maxSequenceID++
92+
seqID = dl.maxSequenceID
8293
}
8394

84-
df := &segment{file: f, id: id, meta: meta, modTime: modTime}
95+
df := &segment{
96+
file: f,
97+
id: id,
98+
sequenceID: seqID,
99+
name: name,
100+
meta: meta,
101+
}
85102
dl.segments[id] = df
86103
return df, nil
87104
}
88105

89-
func (dl *datalog) nextWritableSegmentID() (uint16, error) {
90-
for i, file := range dl.segments {
91-
if file == nil || !file.meta.Full {
92-
return uint16(i), nil
106+
func (dl *datalog) nextWritableSegmentID() (uint16, uint64, error) {
107+
for id, seg := range dl.segments {
108+
// Pick unfilled segment.
109+
if seg != nil && !seg.meta.Full {
110+
dl.maxSequenceID++
111+
return uint16(id), dl.maxSequenceID, nil
112+
}
113+
}
114+
for id, seg := range dl.segments {
115+
// Pick empty segment.
116+
if seg == nil {
117+
dl.maxSequenceID++
118+
return uint16(id), dl.maxSequenceID, nil
93119
}
94120
}
95-
return 0, fmt.Errorf("number of segments exceeds %d", maxSegments)
121+
return 0, 0, fmt.Errorf("number of segments exceeds %d", maxSegments)
96122
}
97123

98124
func (dl *datalog) swapSegment() error {
99-
id, err := dl.nextWritableSegmentID()
125+
id, seqID, err := dl.nextWritableSegmentID()
100126
if err != nil {
101127
return err
102128
}
103-
var f *segment
129+
var seg *segment
104130
if dl.segments[id] != nil {
105-
f = dl.segments[id]
131+
seg = dl.segments[id]
106132
} else {
107-
name := segmentName(id)
108-
f, err = dl.openSegment(filepath.Join(dl.opts.path, name), id)
133+
name := segmentName(id, seqID)
134+
seg, err = dl.openSegment(name)
109135
if err != nil {
110136
return err
111137
}
112138
}
113-
dl.curSeg = f
139+
dl.curSeg = seg
114140
return nil
115141
}
116142

117-
func (dl *datalog) removeSegment(f *segment) error {
118-
dl.segments[f.id] = nil
143+
func (dl *datalog) removeSegment(seg *segment) error {
144+
dl.segments[seg.id] = nil
119145

120-
if err := f.Close(); err != nil {
146+
if err := seg.Close(); err != nil {
121147
return err
122148
}
123149

124-
// Remove segment.
125-
filePath := filepath.Join(dl.opts.path, segmentName(f.id))
126-
if err := os.Remove(filePath); err != nil {
150+
// Remove segment meta from FS.
151+
metaPath := filepath.Join(dl.opts.path, seg.name+segmentExt)
152+
if err := dl.opts.FileSystem.Remove(metaPath); err != nil && !os.IsNotExist(err) {
127153
return err
128154
}
129155

130-
// Remove segment meta.
131-
metaPath := filepath.Join(dl.opts.path, segmentMetaName(f.id))
132-
if err := os.Remove(metaPath); err != nil && !os.IsNotExist(err) {
156+
// Remove segment from FS.
157+
filePath := filepath.Join(dl.opts.path, seg.name)
158+
if err := dl.opts.FileSystem.Remove(filePath); err != nil {
133159
return err
134160
}
161+
135162
return nil
136163
}
137164

@@ -208,23 +235,23 @@ func (dl *datalog) sync() error {
208235
}
209236

210237
func (dl *datalog) close() error {
211-
for id, f := range dl.segments {
212-
if f == nil {
238+
for _, seg := range dl.segments {
239+
if seg == nil {
213240
continue
214241
}
215-
if err := f.Close(); err != nil {
242+
if err := seg.Close(); err != nil {
216243
return err
217244
}
218-
metaPath := filepath.Join(dl.opts.path, segmentMetaName(uint16(id)))
219-
if err := writeGobFile(dl.opts.FileSystem, metaPath, f.meta); err != nil {
245+
metaPath := filepath.Join(dl.opts.path, seg.name+metaExt)
246+
if err := writeGobFile(dl.opts.FileSystem, metaPath, seg.meta); err != nil {
220247
return err
221248
}
222249
}
223250
return nil
224251
}
225252

226-
func (dl *datalog) segmentsByModification() ([]*segment, error) {
227-
// Sort segments in ascending order by last modified time.
253+
func (dl *datalog) segmentsBySequenceID() ([]*segment, error) {
254+
// Sort segments in ascending order by sequence ID.
228255
var segments []*segment
229256

230257
for _, f := range dl.segments {
@@ -235,7 +262,7 @@ func (dl *datalog) segmentsByModification() ([]*segment, error) {
235262
}
236263

237264
sort.SliceStable(segments, func(i, j int) bool {
238-
return segments[i].modTime < segments[j].modTime
265+
return segments[i].sequenceID < segments[j].sequenceID
239266
})
240267

241268
return segments, nil

datalog_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func TestDatalog(t *testing.T) {
1515
assert.Equal(t, &segmentMeta{PutRecords: 1}, db.datalog.segments[0].meta)
1616
assert.Nil(t, db.datalog.segments[1])
1717

18-
sm, err := db.datalog.segmentsByModification()
18+
sm, err := db.datalog.segmentsBySequenceID()
1919
assert.Nil(t, err)
2020
assert.Equal(t, []*segment{db.datalog.segments[0]}, sm)
2121

@@ -26,7 +26,7 @@ func TestDatalog(t *testing.T) {
2626
assert.Equal(t, &segmentMeta{PutRecords: 1, Full: true}, db.datalog.segments[0].meta)
2727
assert.Equal(t, &segmentMeta{PutRecords: 1}, db.datalog.segments[1].meta)
2828

29-
sm, err = db.datalog.segmentsByModification()
29+
sm, err = db.datalog.segmentsBySequenceID()
3030
assert.Nil(t, err)
3131
assert.Equal(t, []*segment{db.datalog.segments[0], db.datalog.segments[1]}, sm)
3232

db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestSimple(t *testing.T) {
139139

140140
// Simulate crash.
141141
assert.Nil(t, touchFile(filepath.Join("test.db", lockName)))
142-
assert.Nil(t, os.Remove(filepath.Join("test.db", segmentMetaName(0))))
142+
assert.Nil(t, os.Remove(filepath.Join("test.db", segmentMetaName(0, 1))))
143143
assert.Nil(t, os.Remove(filepath.Join("test.db", indexMetaName)))
144144

145145
// Open and check again

recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type recoveryIterator struct {
6868
}
6969

7070
func newRecoveryIterator(dl *datalog) (*recoveryIterator, error) {
71-
files, err := dl.segmentsByModification()
71+
files, err := dl.segmentsBySequenceID()
7272
if err != nil {
7373
return nil, err
7474
}

recovery_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,60 +9,60 @@ import (
99
)
1010

1111
func TestRecovery(t *testing.T) {
12-
dfPath := filepath.Join("test.db", segmentName(0))
12+
segPath := filepath.Join("test.db", segmentName(0, 1))
1313
testCases := []struct {
1414
name string
1515
fn func() error
1616
}{
1717
{
1818
name: "all zeroes",
1919
fn: func() error {
20-
return appendFile(dfPath, make([]byte, 128))
20+
return appendFile(segPath, make([]byte, 128))
2121
},
2222
},
2323
{
2424
name: "partial kv size",
2525
fn: func() error {
26-
return appendFile(dfPath, []byte{1})
26+
return appendFile(segPath, []byte{1})
2727
},
2828
},
2929
{
3030
name: "only kv size",
3131
fn: func() error {
32-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0})
32+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0})
3333
},
3434
},
3535
{
3636
name: "kv size and key",
3737
fn: func() error {
38-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1})
38+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1})
3939
},
4040
},
4141
{
4242
name: "kv size, key, value",
4343
fn: func() error {
44-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1, 1})
44+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1, 1})
4545
},
4646
},
4747
{
4848
name: "kv size, key, value, partial crc32",
4949
fn: func() error {
50-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40})
50+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40})
5151
},
5252
},
5353
{
5454
name: "kv size, key, value, invalid crc32",
5555
fn: func() error {
56-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40, 19, 197, 0})
56+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40, 19, 197, 0})
5757
},
5858
},
5959
{
6060
name: "corrupted and not corrupted record",
6161
fn: func() error {
62-
if err := appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40, 19, 197, 0}); err != nil {
62+
if err := appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 40, 19, 197, 0}); err != nil {
6363
return err
6464
}
65-
return appendFile(dfPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 133, 13, 200, 12})
65+
return appendFile(segPath, []byte{1, 0, 1, 0, 0, 0, 1, 1, 133, 13, 200, 12})
6666
},
6767
},
6868
}
@@ -71,7 +71,7 @@ func TestRecovery(t *testing.T) {
7171
t.Run(fmt.Sprintf("case %s", testCase.name), func(t *testing.T) {
7272
db, err := createTestDB(nil)
7373
assert.Nil(t, err)
74-
// Fill file 0.
74+
// Fill segment 0.
7575
var i uint8
7676
for i = 0; i < 128; i++ {
7777
assert.Nil(t, db.Put([]byte{i}, []byte{i}))

segment.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ const (
2121
// It consists of a sequence of binary-encoded variable length records.
2222
type segment struct {
2323
*file
24-
id uint16
25-
meta *segmentMeta
26-
modTime int64
24+
id uint16 // Physical segment identifier.
25+
sequenceID uint64 // Logical monotonically increasing segment identifier.
26+
name string
27+
meta *segmentMeta
2728
}
2829

29-
func segmentName(id uint16) string {
30-
return fmt.Sprintf("%05d%s", id, segmentExt)
30+
func segmentName(id uint16, sequenceID uint64) string {
31+
return fmt.Sprintf("%05d-%d%s", id, sequenceID, segmentExt)
3132
}
3233

3334
type segmentMeta struct {
@@ -38,8 +39,8 @@ type segmentMeta struct {
3839
DeletedBytes uint32
3940
}
4041

41-
func segmentMetaName(id uint16) string {
42-
return fmt.Sprintf("%05d%s%s", id, segmentExt, metaExt)
42+
func segmentMetaName(id uint16, sequenceID uint64) string {
43+
return segmentName(id, sequenceID) + metaExt
4344
}
4445

4546
// Binary representation of a segment record:

0 commit comments

Comments
 (0)