Skip to content

Commit e5a1af7

Browse files
craig[bot]pav-kvsravotto
committed
155953: kvserver: simplify rewriteRaftState r=arulajmani a=pav-kv Keys in the RangeID-local unreplicated space have both state machine and raft state. With separated engines, these will reside in different engines, and are also interleaved in an unfortunate way. So we can't use one `ClearRawRange` to cover them all, or even one per engine. This PR clears all unreplicated keys in `rewriteRaftState` manually, so that we can control which goes to which engine/`Writer`. Additionally, this uncovers that half of these keys don't need to be cleared at all because they don't exist, or don't need to be changed. The end effect of the PR is that `rewriteRaftState` now only mutates the raft engine state, which is very convenient. Incidentally, this function finally lives up to its name. Part of #152845 Epic: CRDB-55220 159067: util/ioctx: add RandomAccessReader for cloud storage r=sravotto a=sravotto Add a general-purpose wrapper that provides ReadAt/Seek support for any cloud storage reader using an opener factory function pattern. This enables random access (required by formats like Parquet) without requiring the cloud storage implementation itself to natively support ReadAt/Seek. Instead, it opens new readers at specific offsets for each ReadAt call. Key features: - Thread-safe ReadAt: Each call opens a new connection, allowing concurrent reads from different offsets - Seek support: Tracks position logically without seeking underlying readers - Generic: Works with any ReadCloserCtx via OpenerAtFunc factory Release note: None Epic: CRDB-23802 Co-authored-by: Pavel Kalinnikov <[email protected]> Co-authored-by: Silvano Ravotto <[email protected]>
3 parents c9f37c8 + 481eef3 + 746f171 commit e5a1af7

File tree

8 files changed

+523
-45
lines changed

8 files changed

+523
-45
lines changed

pkg/kv/kvserver/client_raft_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3259,15 +3259,18 @@ func TestRaftRemoveRace(t *testing.T) {
32593259
}
32603260
tc.AddVotersOrFatal(t, key, targets...)
32613261

3262+
sl := kvstorage.MakeStateLoader(desc.RangeID)
3263+
s2 := tc.GetFirstStoreFromServer(t, 2)
32623264
for i := 0; i < 10; i++ {
32633265
tc.RemoveVotersOrFatal(t, key, tc.Target(2))
32643266
tc.AddVotersOrFatal(t, key, tc.Target(2))
32653267

3266-
// Verify the tombstone key does not exist. See #12130.
3267-
ts, err := kvstorage.MakeStateLoader(desc.RangeID).LoadRangeTombstone(
3268-
ctx, tc.GetFirstStoreFromServer(t, 2).StateEngine())
3268+
replID, err := sl.LoadRaftReplicaID(ctx, s2.StateEngine())
32693269
require.NoError(t, err)
3270-
require.Equal(t, kvserverpb.RangeTombstone{}, ts)
3270+
ts, err := sl.LoadRangeTombstone(ctx, s2.StateEngine())
3271+
require.NoError(t, err)
3272+
// ReplicaID leads the RangeTombstone, which means a replica exists.
3273+
require.GreaterOrEqual(t, replID.ReplicaID, ts.NextReplicaID)
32713274
}
32723275
}
32733276

pkg/kv/kvserver/kvstorage/destroy.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/keys"
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
15+
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
1617
"github.com/cockroachdb/cockroach/pkg/storage"
1718
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
@@ -239,6 +240,37 @@ func RemoveStaleRHSFromSplit(
239240
return nil
240241
}
241242

243+
// RewriteRaftState rewrites the raft state of the given replica with the
244+
// provided state. Specifically, it rewrites HardState and RaftTruncatedState,
245+
// and clears the raft log. All writes are generated in the engine keys order.
246+
//
247+
// TODO(pav-kv): get rid of the returned cleared spans.
248+
func RewriteRaftState(
249+
ctx context.Context,
250+
raftWO RaftWO,
251+
sl StateLoader,
252+
hs raftpb.HardState,
253+
ts kvserverpb.RaftTruncatedState,
254+
) (cleared roachpb.Span, _ error) {
255+
// Update HardState.
256+
if err := sl.SetHardState(ctx, raftWO, hs); err != nil {
257+
return roachpb.Span{}, errors.Wrapf(err, "unable to write HardState")
258+
}
259+
// Clear the raft log. Note that there are no Pebble range keys in this span.
260+
logPrefix := sl.RaftLogPrefix().Clone()
261+
raftLog := roachpb.Span{Key: logPrefix, EndKey: logPrefix.PrefixEnd()}
262+
if err := raftWO.ClearRawRange(
263+
raftLog.Key, raftLog.EndKey, true /* pointKeys */, false, /* rangeKeys */
264+
); err != nil {
265+
return roachpb.Span{}, errors.Wrapf(err, "unable to clear the raft log")
266+
}
267+
// Update the log truncation state.
268+
if err := sl.SetRaftTruncatedState(ctx, raftWO, &ts); err != nil {
269+
return roachpb.Span{}, errors.Wrapf(err, "unable to write RaftTruncatedState")
270+
}
271+
return raftLog, nil
272+
}
273+
242274
// TestingForceClearRange changes the value of ClearRangeThresholdPointKeys to
243275
// 1, which effectively forces ClearRawRange in the replica destruction path,
244276
// instead of point deletions. This can be used for making the storage

pkg/kv/kvserver/snapshot_apply_prepare.go

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,19 @@ package kvserver
88
import (
99
"context"
1010

11-
"github.com/cockroachdb/cockroach/pkg/keys"
1211
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1312
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1413
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
1514
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
1615
"github.com/cockroachdb/cockroach/pkg/roachpb"
1716
"github.com/cockroachdb/cockroach/pkg/storage"
1817
"github.com/cockroachdb/cockroach/pkg/util/log"
19-
"github.com/cockroachdb/errors"
2018
)
2119

2220
// snapWriteBuilder contains the data needed to prepare the on-disk state for a
2321
// snapshot.
22+
//
23+
// TODO(pav-kv): move this struct to kvstorage package.
2424
type snapWriteBuilder struct {
2525
id roachpb.FullReplicaID
2626

@@ -42,7 +42,11 @@ type snapWriteBuilder struct {
4242

4343
// prepareSnapApply writes the unreplicated SST for the snapshot and clears disk data for subsumed replicas.
4444
func (s *snapWriteBuilder) prepareSnapApply(ctx context.Context) error {
45+
// TODO(pav-kv): assert that our replica already exists in storage. Note that
46+
// it can be either uninitialized or initialized.
4547
_ = applySnapshotTODO // 1.1 + 1.3 + 2.4 + 3.1
48+
// TODO(sep-raft-log): rewriteRaftState now only touches raft engine keys, so
49+
// it will be convenient to redirect it to a raft engine batch.
4650
if err := s.writeSST(ctx, s.rewriteRaftState); err != nil {
4751
return err
4852
}
@@ -55,42 +59,16 @@ func (s *snapWriteBuilder) prepareSnapApply(ctx context.Context) error {
5559
return s.clearResidualDataOnNarrowSnapshot(ctx)
5660
}
5761

58-
// rewriteRaftState clears and rewrites the unreplicated rangeID-local key space
59-
// of the given replica with the provided raft state. Note that it also clears
60-
// the raft log contents.
61-
//
62-
// The caller must make sure the log does not have entries newer than the
63-
// snapshot entry ID, and that clearing the log is applied atomically with the
64-
// snapshot write, or after the latter is synced.
62+
// rewriteRaftState rewrites the raft state of the given replica with the
63+
// provided state. Specifically, it rewrites HardState and RaftTruncatedState,
64+
// and clears the raft log. All writes are generated in the engine keys order.
6565
func (s *snapWriteBuilder) rewriteRaftState(ctx context.Context, w storage.Writer) error {
66-
// Clearing the unreplicated state.
67-
//
68-
// NB: We do not expect to see range keys in the unreplicated state, so
69-
// we don't drop a range tombstone across the range key space.
70-
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(s.id.RangeID)
71-
unreplicatedStart := unreplicatedPrefixKey
72-
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
73-
if err := w.ClearRawRange(
74-
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
75-
); err != nil {
76-
return errors.Wrapf(err, "error clearing the unreplicated space")
77-
}
78-
79-
// Update HardState.
80-
if err := s.sl.SetHardState(ctx, w, s.hardState); err != nil {
81-
return errors.Wrapf(err, "unable to write HardState")
82-
}
83-
// We've cleared all the raft state above, so we are forced to write the
84-
// RaftReplicaID again here.
85-
if err := s.sl.SetRaftReplicaID(ctx, w, s.id.ReplicaID); err != nil {
86-
return errors.Wrapf(err, "unable to write RaftReplicaID")
87-
}
88-
// Update the log truncation state.
89-
if err := s.sl.SetRaftTruncatedState(ctx, w, &s.truncState); err != nil {
90-
return errors.Wrapf(err, "unable to write RaftTruncatedState")
66+
cleared, err := kvstorage.RewriteRaftState(
67+
ctx, kvstorage.RaftWO(w), s.sl, s.hardState, s.truncState)
68+
if err != nil {
69+
return err
9170
}
92-
93-
s.cleared = append(s.cleared, roachpb.Span{Key: unreplicatedStart, EndKey: unreplicatedEnd})
71+
s.cleared = append(s.cleared, cleared)
9472
return nil
9573
}
9674

pkg/kv/kvserver/snapshot_apply_prepare_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ import (
1212
"testing"
1313

1414
"github.com/cockroachdb/cockroach/pkg/keys"
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
1820
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/print"
1921
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
2022
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
@@ -130,4 +132,24 @@ func createRangeData(t *testing.T, eng storage.Engine, desc roachpb.RangeDescrip
130132
}.ToEngineKey(nil)
131133
require.NoError(t, eng.PutEngineKey(ek, nil))
132134
}
135+
136+
// Add some raft state: HardState, TruncatedState and log entries.
137+
const truncIndex, numEntries = 10, 3
138+
ctx := context.Background()
139+
140+
sl := logstore.NewStateLoader(desc.RangeID)
141+
require.NoError(t, sl.SetRaftTruncatedState(
142+
ctx, eng, &kvserverpb.RaftTruncatedState{Index: truncIndex, Term: 5},
143+
))
144+
require.NoError(t, sl.SetHardState(
145+
ctx, eng, raftpb.HardState{Term: 6, Commit: truncIndex + numEntries},
146+
))
147+
for i := truncIndex + 1; i <= truncIndex+numEntries; i++ {
148+
require.NoError(t, storage.MVCCBlindPutProto(
149+
ctx, eng,
150+
sl.RaftLogKey(kvpb.RaftIndex(i)), hlc.Timestamp{},
151+
&raftpb.Entry{Index: uint64(i), Term: 6},
152+
storage.MVCCWriteOptions{},
153+
))
154+
}
133155
}

pkg/kv/kvserver/testdata/TestPrepareSnapApply.txt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
echo
22
----
33
>> sst:
4-
Delete Range: [0,0 /Local/RangeID/123/u"" (0x0169f67b7500): , 0,0 /Local/RangeID/123/v"" (0x0169f67b7600): )
54
Put: 0,0 /Local/RangeID/123/u/RaftHardState (0x0169f67b757266746800): term:20 vote:0 commit:100 lead:0 lead_epoch:0
6-
Put: 0,0 /Local/RangeID/123/u/RaftReplicaID (0x0169f67b757266747200): replica_id:4
5+
Delete Range: [0,0 /Local/RangeID/123/u/RaftLog (0x0169f67b757266746c00): , 0,0 /Local/RangeID/123/u"rftm" (0x0169f67b757266746d00): )
76
Put: 0,0 /Local/RangeID/123/u/RaftTruncatedState (0x0169f67b757266747400): index:100 term:20
87
>> sst:
98
Put: 0,0 /Local/RangeID/101/u/RangeTombstone (0x0169ed757266746200): next_replica_id:2147483647
9+
Delete (Sized at 38): 0,0 /Local/RangeID/101/u/RaftHardState (0x0169ed757266746800):
10+
Delete (Sized at 42): 0,0 /Local/RangeID/101/u/RaftLog/logIndex:11 (0x0169ed757266746c000000000000000b00):
11+
Delete (Sized at 42): 0,0 /Local/RangeID/101/u/RaftLog/logIndex:12 (0x0169ed757266746c000000000000000c00):
12+
Delete (Sized at 42): 0,0 /Local/RangeID/101/u/RaftLog/logIndex:13 (0x0169ed757266746c000000000000000d00):
1013
Delete: 0,0 /Local/RangeID/101/u/RaftReplicaID (0x0169ed757266747200):
14+
Delete (Sized at 32): 0,0 /Local/RangeID/101/u/RaftTruncatedState (0x0169ed757266747400):
1115
>> sst:
1216
Put: 0,0 /Local/RangeID/102/u/RangeTombstone (0x0169ee757266746200): next_replica_id:2147483647
17+
Delete (Sized at 38): 0,0 /Local/RangeID/102/u/RaftHardState (0x0169ee757266746800):
18+
Delete (Sized at 42): 0,0 /Local/RangeID/102/u/RaftLog/logIndex:11 (0x0169ee757266746c000000000000000b00):
19+
Delete (Sized at 42): 0,0 /Local/RangeID/102/u/RaftLog/logIndex:12 (0x0169ee757266746c000000000000000c00):
20+
Delete (Sized at 42): 0,0 /Local/RangeID/102/u/RaftLog/logIndex:13 (0x0169ee757266746c000000000000000d00):
1321
Delete: 0,0 /Local/RangeID/102/u/RaftReplicaID (0x0169ee757266747200):
22+
Delete (Sized at 32): 0,0 /Local/RangeID/102/u/RaftTruncatedState (0x0169ee757266747400):
1423
>> sst:
1524
Delete (Sized at 27): /Local/Lock"y\xff" 0300000000000000000000000000000000 (0x017a6b1279ff000100030000000000000000000000000000000012):
1625
>> sst:
@@ -20,7 +29,7 @@ Delete (Sized at 12): 0.000000001,0 "y\xff" (0x79ff00000000000000000109):
2029
>> repl: /Local/Lock/Local/Range"{a"-k"}
2130
>> repl: /Local/Lock"{a"-k"}
2231
>> repl: {a-k}
23-
>> cleared: /Local/RangeID/123/{u""-v""}
32+
>> cleared: /Local/RangeID/123/u{/RaftLog-"rftm"}
2433
>> cleared: /Local/RangeID/101/{r""-s""}
2534
>> cleared: /Local/RangeID/101/{u""-v""}
2635
>> cleared: /Local/RangeID/102/{r""-s""}

pkg/util/ioctx/BUILD.bazel

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "ioctx",
5-
srcs = ["reader.go"],
5+
srcs = [
6+
"random_access_reader.go",
7+
"reader.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/util/ioctx",
710
visibility = ["//visibility:public"],
811
deps = ["@com_github_cockroachdb_errors//:errors"],
@@ -11,7 +14,10 @@ go_library(
1114
go_test(
1215
name = "ioctx_test",
1316
size = "small",
14-
srcs = ["reader_test.go"],
17+
srcs = [
18+
"random_access_reader_test.go",
19+
"reader_test.go",
20+
],
1521
embed = [":ioctx"],
1622
deps = [
1723
"@com_github_cockroachdb_errors//:errors",
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ioctx
7+
8+
import (
9+
"context"
10+
"io"
11+
12+
"github.com/cockroachdb/errors"
13+
)
14+
15+
// OpenerAtFunc is a factory function that opens a reader at a specific offset.
16+
// endHint can be used to optimize the read if the end position is known.
17+
type OpenerAtFunc func(ctx context.Context, offset int64, endHint int64) (ReadCloserCtx, error)
18+
19+
// randomAccessReader wraps any cloud storage reader and adds ReadAt/Seek support
20+
// using a factory function that can open readers at specific offsets.
21+
//
22+
// This provides random access capabilities without requiring the cloud storage
23+
// implementation itself to support seeking. Each ReadAt call opens a new reader
24+
// at the requested offset.
25+
//
26+
// Thread Safety:
27+
// - ReadAt: Safe for concurrent calls. Each call opens its own reader.
28+
// - Read/Seek: NOT safe for concurrent calls. Should only be used from a single goroutine.
29+
type randomAccessReader struct {
30+
ctx context.Context
31+
size int64
32+
openAt OpenerAtFunc
33+
34+
// Current sequential reader (for Read operations)
35+
current ReadCloserCtx
36+
pos int64
37+
}
38+
39+
var _ io.ReaderAt = &randomAccessReader{}
40+
var _ io.ReadSeekCloser = &randomAccessReader{}
41+
42+
// NewRandomAccessReader creates a reader that supports ReadAt and Seek
43+
// by using the provided opener function to create readers at specific offsets.
44+
//
45+
// Parameters:
46+
// - ctx: Context for operations
47+
// - size: Total size of the file
48+
// - openAt: Factory function that opens a reader at a given offset
49+
func NewRandomAccessReader(
50+
ctx context.Context, size int64, openAt OpenerAtFunc,
51+
) *randomAccessReader {
52+
return &randomAccessReader{
53+
ctx: ctx,
54+
size: size,
55+
openAt: openAt,
56+
}
57+
}
58+
59+
// Read implements io.Reader using sequential access
60+
func (r *randomAccessReader) Read(p []byte) (int, error) {
61+
if r.current == nil {
62+
// Open reader at current position
63+
opened, err := r.openAt(r.ctx, r.pos, r.size)
64+
if err != nil {
65+
return 0, err
66+
}
67+
r.current = opened
68+
}
69+
70+
n, err := r.current.Read(r.ctx, p)
71+
r.pos += int64(n)
72+
return n, err
73+
}
74+
75+
// ReadAt implements io.ReaderAt - safe for concurrent calls.
76+
// Each call opens a new reader at the requested offset, so multiple
77+
// goroutines can call ReadAt simultaneously.
78+
//
79+
// Note: This opens a new connection (HTTP request) for every ReadAt call.
80+
// For cloud storage, this means each random access read results in a new
81+
// range request to the storage service.
82+
func (r *randomAccessReader) ReadAt(p []byte, off int64) (int, error) {
83+
if off >= r.size {
84+
return 0, io.EOF
85+
}
86+
87+
// Open a new reader at the requested offset
88+
reader, err := r.openAt(r.ctx, off, off+int64(len(p)))
89+
if err != nil {
90+
return 0, err
91+
}
92+
defer reader.Close(r.ctx)
93+
94+
// Read exactly len(p) bytes or until EOF
95+
return io.ReadFull(ReaderCtxAdapter(r.ctx, reader), p)
96+
}
97+
98+
// Seek implements io.Seeker
99+
func (r *randomAccessReader) Seek(offset int64, whence int) (int64, error) {
100+
var newPos int64
101+
switch whence {
102+
case io.SeekStart:
103+
newPos = offset
104+
case io.SeekCurrent:
105+
newPos = r.pos + offset
106+
case io.SeekEnd:
107+
newPos = r.size + offset
108+
default:
109+
return 0, errors.Newf("invalid whence: %d", whence)
110+
}
111+
112+
if newPos < 0 {
113+
return 0, errors.New("negative position")
114+
}
115+
116+
// Close current reader if position changed
117+
if r.current != nil && newPos != r.pos {
118+
r.current.Close(r.ctx)
119+
r.current = nil
120+
}
121+
122+
r.pos = newPos
123+
return newPos, nil
124+
}
125+
126+
// Close closes any open reader
127+
func (r *randomAccessReader) Close() error {
128+
if r.current != nil {
129+
return r.current.Close(r.ctx)
130+
}
131+
return nil
132+
}

0 commit comments

Comments
 (0)