-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathmemtable.go
More file actions
179 lines (145 loc) · 4.93 KB
/
memtable.go
File metadata and controls
179 lines (145 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package wildcat
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/wildcatdb/wildcat/v2/blockmanager"
"github.com/wildcatdb/wildcat/v2/bloomfilter"
"github.com/wildcatdb/wildcat/v2/skiplist"
)
// A memtable contains a skiplist and a write-ahead log (WAL) for durability, they are paired.
// WAL is a write-ahead log structure
type WAL struct {
path string // The WAL path i.e <db dir><id>.wal
}
// Memtable is a memory table structure
type Memtable struct {
id int64 // Takes from wal id
skiplist *skiplist.SkipList // The skip list for the memtable, is atomic and concurrent safe
wal *WAL // The write-ahead log for durability, is also atomic and concurrent safe
size int64 // Atomic size of the memtable
db *DB // The database instance
}
// replay replays the WAL to recover the memtable
func (memtable *Memtable) replay(activeTxns *[]*Txn) error {
var walBm *blockmanager.BlockManager
var err error
memtable.db.log(fmt.Sprintf("Replaying WAL for memtable: %s", memtable.wal.path))
walQueueEntry, ok := memtable.db.lru.Get(memtable.wal.path)
if !ok {
memtable.db.log(fmt.Sprintf("WAL file not in LRU cache, opening: %s", memtable.wal.path))
walBm, err = blockmanager.Open(memtable.wal.path, os.O_RDWR|os.O_CREATE, memtable.db.opts.Permission, blockmanager.SyncOption(memtable.db.opts.SyncOption))
if err != nil {
return fmt.Errorf("failed to open WAL block manager: %w", err)
}
memtable.db.lru.Put(memtable.wal.path, walBm, func(key string, value interface{}) {
if bm, ok := value.(*blockmanager.BlockManager); ok {
_ = bm.Close()
}
})
} else {
memtable.db.log(fmt.Sprintf("Found WAL file in LRU cache: %s", memtable.wal.path))
walBm = walQueueEntry.(*blockmanager.BlockManager)
}
iter := walBm.Iterator()
// Track the latest state of each transaction by ID
txnMap := make(map[int64]*Txn)
var txnCount, committedCount int
for {
data, _, err := iter.Next()
if err != nil {
// End of WAL
break
}
txnCount++
var txn Txn
err = txn.deserializeTransaction(data)
if err != nil {
memtable.db.log(fmt.Sprintf("Warning: failed to deserialize transaction: %v - skipping", err))
continue
}
txn.db = memtable.db
// Check if we already have a transaction with this ID
existingTxn, exists := txnMap[txn.Id]
if !exists {
// New transaction, just add it to the map
txnCopy := txn // Make a copy
txnMap[txn.Id] = &txnCopy
} else {
// Merge this transaction entry with the existing one
for key, value := range txn.WriteSet {
existingTxn.WriteSet[key] = value
}
for key := range txn.DeleteSet {
existingTxn.DeleteSet[key] = true
}
for key, timestamp := range txn.ReadSet {
existingTxn.ReadSet[key] = timestamp
}
// Update commit status - a transaction is committed if any entry says it is
if txn.Committed {
existingTxn.Committed = true
existingTxn.Timestamp = txn.Timestamp // Use the timestamp from the commit entry
}
}
}
// After processing all entries, apply the committed transactions
for _, txn := range txnMap {
if txn.Committed {
committedCount++
// Apply writes to the memtable
for key, value := range txn.WriteSet {
memtable.skiplist.Put([]byte(key), value, txn.Timestamp)
atomic.AddInt64(&memtable.size, int64(len(key)+len(value)))
}
// Apply deletes
for key := range txn.DeleteSet {
memtable.skiplist.Delete([]byte(key), txn.Timestamp)
atomic.AddInt64(&memtable.size, -int64(len(key)))
}
}
}
// Collect active transactions if requested
if activeTxns != nil {
for _, txn := range txnMap {
if !txn.Committed && (len(txn.WriteSet) > 0 || len(txn.DeleteSet) > 0 || len(txn.ReadSet) > 0) {
txnCopy := *txn // Make a copy to prevent modification issues
*activeTxns = append(*activeTxns, &txnCopy)
}
}
}
memtable.db.log(fmt.Sprintf("Replay summary for %s: %d total entries, %d unique transactions, %d committed",
memtable.wal.path, txnCount, len(txnMap), committedCount))
return nil
}
// createBloomFilter Creates a bloom filter from skiplist
func (memtable *Memtable) createBloomFilter(entries int64) (*bloomfilter.BloomFilter, error) {
maxPossibleTs := time.Now().UnixNano() + FarFutureOffsetNs
iter, err := memtable.skiplist.NewIterator(nil, maxPossibleTs)
if err != nil {
return nil, err
}
memtable.db.log(fmt.Sprintf("Creating Bloom filter for memtable with %d entries", entries))
bf, err := bloomfilter.New(uint(entries), memtable.db.opts.BloomFilterFPR)
if err != nil {
return nil, err
}
for {
key, val, _, ok := iter.Next()
if !ok {
break
}
if val == nil {
continue // Skip deletion markers (tombstones)
}
err = bf.Add(key)
if err != nil {
// We log a warning
memtable.db.log(fmt.Sprintf("Warning: failed to add key to Bloom filter: %v - skipping", err))
continue
}
}
memtable.db.log(fmt.Sprintf("Bloom filter created for memtable with %d entries", entries))
return bf, nil
}