Skip to content

Commit ec2a260

Browse files
authored
Merge pull request #159315 from spilchen/blathers/backport-release-26.1-158983
release-26.1: sql: use job scoped paths for index backfill SST files
2 parents c4d90fe + 290606f commit ec2a260

File tree

4 files changed

+108
-5
lines changed

4 files changed

+108
-5
lines changed

pkg/sql/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1051,7 +1051,7 @@ func (sc *SchemaChanger) distIndexBackfill(
10511051
chunkSize := sc.getChunkSize(indexBatchSize)
10521052
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
10531053
if useDistributedMerge {
1054-
backfill.EnableDistributedMergeIndexBackfillSink(sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec)
1054+
backfill.EnableDistributedMergeIndexBackfillSink(sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), sc.job.ID(), &spec)
10551055
}
10561056
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
10571057
return err

pkg/sql/backfill/distributed_merge_mode.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ func shouldEnableDistributedMergeIndexBackfill(
9696
// EnableDistributedMergeIndexBackfillSink updates the backfiller spec to use the
9797
// distributed merge sink and file prefix for the provided SQL instance.
9898
func EnableDistributedMergeIndexBackfillSink(
99-
nodeID base.SQLInstanceID, spec *execinfrapb.BackfillerSpec,
99+
nodeID base.SQLInstanceID, jobID jobspb.JobID, spec *execinfrapb.BackfillerSpec,
100100
) {
101101
spec.UseDistributedMergeSink = true
102-
spec.DistributedMergeFilePrefix = fmt.Sprintf("nodelocal://%d/index-backfill", nodeID)
102+
spec.DistributedMergeFilePrefix = fmt.Sprintf("nodelocal://%d/job/%d/map", nodeID, jobID)
103103
}
104104

105105
// DetermineDistributedMergeMode evaluates the cluster setting to decide

pkg/sql/index_backfiller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
150150
useDistributedMerge := mode == jobspb.IndexBackfillDistributedMergeMode_Enabled
151151
run, retErr := ib.plan(
152152
ctx,
153+
job.ID(),
153154
descriptor,
154155
now,
155156
progress.MinimumWriteTimestamp,
@@ -204,6 +205,7 @@ var _ scexec.Backfiller = (*IndexBackfillPlanner)(nil)
204205

205206
func (ib *IndexBackfillPlanner) plan(
206207
ctx context.Context,
208+
jobID jobspb.JobID,
207209
tableDesc catalog.TableDescriptor,
208210
nowTimestamp, writeAsOf, readAsOf hlc.Timestamp,
209211
sourceSpans []roachpb.Span,
@@ -234,7 +236,7 @@ func (ib *IndexBackfillPlanner) plan(
234236
indexesToBackfill, sourceIndexID,
235237
)
236238
if useDistributedMerge {
237-
backfill.EnableDistributedMergeIndexBackfillSink(ib.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec)
239+
backfill.EnableDistributedMergeIndexBackfillSink(ib.execCfg.NodeInfo.NodeID.SQLInstanceID(), jobID, &spec)
238240
}
239241
var err error
240242
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, sourceSpans)

pkg/sql/rowexec/indexbackfiller_test.go

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package rowexec
77

88
import (
99
"context"
10+
"fmt"
1011
"path/filepath"
1112
"strings"
1213
"testing"
@@ -16,11 +17,13 @@ import (
1617
"github.com/cockroachdb/cockroach/pkg/cloud"
1718
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
1819
"github.com/cockroachdb/cockroach/pkg/cloud/nodelocal"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1921
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2022
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
2224
"github.com/cockroachdb/cockroach/pkg/security/username"
2325
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
26+
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
2427
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2528
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2629
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
@@ -63,7 +66,7 @@ func TestIndexBackfillSinkSelection(t *testing.T) {
6366

6467
tempDir := t.TempDir()
6568
ib.spec.UseDistributedMergeSink = true
66-
ib.spec.DistributedMergeFilePrefix = "nodelocal://0/index-backfill/test"
69+
ib.spec.DistributedMergeFilePrefix = "nodelocal://0/job/123/map"
6770
ib.flowCtx.Cfg.DB = srv.SystemLayer().InternalDB().(descs.DB)
6871
ib.flowCtx.Cfg.ExternalStorageFromURI = func(
6972
ctx context.Context, uri string, _ username.SQLUsername, opts ...cloud.ExternalStorageOption,
@@ -87,6 +90,104 @@ func TestIndexBackfillSinkSelection(t *testing.T) {
8790
})
8891
}
8992

93+
func TestSSTFileNamingConvention(t *testing.T) {
94+
defer leaktest.AfterTest(t)()
95+
96+
ctx := context.Background()
97+
98+
settings := cluster.MakeTestingClusterSettings()
99+
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
100+
defer srv.Stopper().Stop(ctx)
101+
102+
tempDir := t.TempDir()
103+
104+
const jobID jobspb.JobID = 12345
105+
const processorID = 7
106+
const nodeID = 1
107+
108+
flowCtx := &execinfra.FlowCtx{
109+
Cfg: &execinfra.ServerConfig{
110+
Settings: settings,
111+
DB: srv.SystemLayer().InternalDB().(descs.DB),
112+
ExternalStorageFromURI: func(
113+
ctx context.Context, uri string, _ username.SQLUsername, opts ...cloud.ExternalStorageOption,
114+
) (cloud.ExternalStorage, error) {
115+
if !strings.HasPrefix(uri, "nodelocal://") {
116+
return nil, errors.New("unexpected URI")
117+
}
118+
es := nodelocal.TestingMakeNodelocalStorage(
119+
tempDir,
120+
settings,
121+
cloudpb.ExternalStorage{},
122+
)
123+
t.Cleanup(func() { es.Close() })
124+
return es, nil
125+
},
126+
},
127+
}
128+
129+
// Use the same function that production code uses to set the prefix.
130+
spec := execinfrapb.BackfillerSpec{}
131+
backfill.EnableDistributedMergeIndexBackfillSink(nodeID, jobID, &spec)
132+
133+
writeTS := hlc.Timestamp{WallTime: 1000000000}
134+
135+
sink, err := newSSTIndexBackfillSink(ctx, flowCtx, spec.DistributedMergeFilePrefix, writeTS, processorID)
136+
require.NoError(t, err)
137+
defer sink.Close(ctx)
138+
139+
sstSink := sink.(*sstIndexBackfillSink)
140+
141+
// Add some data to trigger SST file creation.
142+
key := roachpb.Key("test-key-1")
143+
value := []byte("test-value-1")
144+
require.NoError(t, sstSink.Add(ctx, key, value))
145+
146+
// Set up flush callback and flush.
147+
var flushed bool
148+
sstSink.SetOnFlush(func(summary kvpb.BulkOpSummary) {
149+
flushed = true
150+
})
151+
require.NoError(t, sstSink.Flush(ctx))
152+
require.True(t, flushed)
153+
154+
// Get the manifests and verify the file naming convention.
155+
manifests := sstSink.ConsumeFlushManifests()
156+
require.NotEmpty(t, manifests, "expected at least one SST file to be created")
157+
158+
for _, manifest := range manifests {
159+
uri := manifest.URI
160+
t.Logf("SST file URI: %s", uri)
161+
162+
// Verify the URI follows the naming convention:
163+
// nodelocal://<nodeID>/job/<jobID>/map/proc-<procID>/<hlc-walltime>-<hlc-logical>.sst
164+
expectedPrefix := fmt.Sprintf("nodelocal://%d/job/%d/map/proc-%d/", nodeID, jobID, processorID)
165+
require.True(t, strings.HasPrefix(uri, expectedPrefix),
166+
"URI %q does not have expected prefix %q", uri, expectedPrefix)
167+
168+
// Extract the filename part after the prefix.
169+
filename := strings.TrimPrefix(uri, expectedPrefix)
170+
171+
// Verify it ends with .sst.
172+
require.True(t, strings.HasSuffix(filename, ".sst"),
173+
"filename %q does not end with .sst", filename)
174+
175+
// Verify the format is <walltime>-<logical>.sst.
176+
filenameWithoutExt := strings.TrimSuffix(filename, ".sst")
177+
parts := strings.Split(filenameWithoutExt, "-")
178+
require.Equal(t, 2, len(parts),
179+
"filename %q should have format <walltime>-<logical>.sst", filename)
180+
181+
// Verify both parts are numeric (HLC timestamp components).
182+
for i, part := range parts {
183+
for _, ch := range part {
184+
require.True(t, ch >= '0' && ch <= '9',
185+
"part %d of filename %q contains non-numeric character: %c", i, filename, ch)
186+
}
187+
}
188+
}
189+
}
190+
90191
func TestRetryOfIndexEntryBatch(t *testing.T) {
91192
defer leaktest.AfterTest(t)()
92193
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)