Skip to content

Commit b236729

Browse files
committed
sql/backfill: persist distributed merge mode in job details
Previously, the choice of merge strategy during index backfills was controlled by a global cluster setting. This boolean value was evaluated at execution time, meaning the behavior could change mid-job if the setting changed. This commit addresses that by persisting the distributed merge mode into the job details. The setting is now read only once (when the job is created) and saved in the job payload. This change applies to both legacy and declarative schema changes. Additionally, the cluster setting has been converted from a boolean to a multi value enum. This allows us to enable this for only legacy or only declarative schema changes. Informs: #158378 Epic: CRDB-48845 Release note: none
1 parent 6806f49 commit b236729

File tree

16 files changed

+242
-51
lines changed

16 files changed

+242
-51
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,13 @@ message TypeSchemaChangeProgress {
788788

789789
}
790790

791+
// IndexBackfillDistributedMergeMode enumerates the per-job distributed merge
792+
// decision for index backfills.
793+
enum IndexBackfillDistributedMergeMode {
794+
INDEX_BACKFILL_DISTRIBUTED_MERGE_MODE_DISABLED = 0 [(gogoproto.enumvalue_customname) = "Disabled"];
795+
INDEX_BACKFILL_DISTRIBUTED_MERGE_MODE_ENABLED = 1 [(gogoproto.enumvalue_customname) = "Enabled"];
796+
}
797+
791798
// NewSchemaChangeDetails is the job detail information for the new schema change job.
792799
message NewSchemaChangeDetails {
793800

@@ -806,6 +813,11 @@ message NewSchemaChangeDetails {
806813
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
807814
];
808815

816+
// DistributedMergeMode captures the per-job distributed merge decision for
817+
// index backfills managed by this job. This is ignored if the job doesn't
818+
// perform an index backfill.
819+
IndexBackfillDistributedMergeMode distributed_merge_mode = 8;
820+
809821
reserved 1, 2, 3, 5;
810822
}
811823

@@ -1039,7 +1051,12 @@ message SchemaChangeDetails {
10391051

10401052
sessiondatapb.SessionData session_data = 13;
10411053

1042-
// Next id 14.
1054+
// DistributedMergeMode captures the per-job distributed merge decision for
1055+
// index backfills managed by this job. This is ignored if the job doesn't
1056+
// perform an index backfill.
1057+
IndexBackfillDistributedMergeMode distributed_merge_mode = 14;
1058+
1059+
// Next id 15.
10431060
}
10441061

10451062
message SchemaChangeProgress {

pkg/jobs/registry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ func (r *Registry) ID() base.SQLInstanceID {
286286
return r.nodeID.SQLInstanceID()
287287
}
288288

289+
// ClusterSettings returns the registry's cluster settings handle.
290+
func (r *Registry) ClusterSettings() *cluster.Settings {
291+
return r.settings
292+
}
293+
289294
// makeCtx returns a new context from r's ambient context and an associated
290295
// cancel func.
291296
func (r *Registry) makeCtx() (context.Context, func()) {

pkg/sql/backfill.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,8 +1044,9 @@ func (sc *SchemaChanger) distIndexBackfill(
10441044
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
10451045
chunkSize := sc.getChunkSize(indexBatchSize)
10461046
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
1047-
if err := maybeEnableDistributedMergeIndexBackfill(ctx, sc.execCfg.Settings, sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec); err != nil {
1048-
return err
1047+
if details, ok := sc.job.Details().(jobspb.SchemaChangeDetails); ok &&
1048+
details.DistributedMergeMode == jobspb.IndexBackfillDistributedMergeMode_Enabled {
1049+
backfill.EnableDistributedMergeIndexBackfillSink(sc.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec)
10491050
}
10501051
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
10511052
return err

pkg/sql/backfill/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "backfill",
55
srcs = [
66
"backfill.go",
7+
"distributed_merge_mode.go",
78
"index_backfiller_cols.go",
89
"mvcc_index_merger.go",
910
],
@@ -12,11 +13,13 @@ go_library(
1213
deps = [
1314
"//pkg/base",
1415
"//pkg/clusterversion",
16+
"//pkg/jobs/jobspb",
1517
"//pkg/keys",
1618
"//pkg/kv",
1719
"//pkg/kv/kvpb",
1820
"//pkg/roachpb",
1921
"//pkg/settings",
22+
"//pkg/settings/cluster",
2023
"//pkg/sql/catalog",
2124
"//pkg/sql/catalog/catenumpb",
2225
"//pkg/sql/catalog/descpb",
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package backfill
7+
8+
import (
9+
"context"
10+
"fmt"
11+
12+
"github.com/cockroachdb/cockroach/pkg/base"
13+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
14+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+
"github.com/cockroachdb/cockroach/pkg/settings"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
18+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
19+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
20+
"github.com/cockroachdb/errors"
21+
)
22+
23+
// DistributedMergeConsumer identifies which backfill pipeline is attempting to
24+
// opt into the distributed merge infrastructure.
25+
type DistributedMergeConsumer int
26+
27+
const (
28+
// DistributedMergeConsumerLegacy corresponds to the legacy schema changer.
29+
DistributedMergeConsumerLegacy DistributedMergeConsumer = iota
30+
// DistributedMergeConsumerDeclarative corresponds to the declarative schema
31+
// changer (new schema change).
32+
DistributedMergeConsumerDeclarative
33+
)
34+
35+
type distributedMergeIndexBackfillMode int64
36+
37+
const (
38+
distributedMergeModeDisabled distributedMergeIndexBackfillMode = iota
39+
distributedMergeModeEnabled
40+
distributedMergeModeLegacy
41+
distributedMergeModeDeclarative
42+
// aliases for synonyms.
43+
distributedMergeModeAliasFalse
44+
distributedMergeModeAliasTrue
45+
distributedMergeModeAliasOff
46+
distributedMergeModeAliasOn
47+
)
48+
49+
// DistributedMergeIndexBackfillMode exposes the cluster setting used to control
50+
// when index backfills run through the distributed merge pipeline.
51+
var DistributedMergeIndexBackfillMode = settings.RegisterEnumSetting(
52+
settings.ApplicationLevel,
53+
"bulkio.index_backfill.distributed_merge.mode",
54+
"controls when the distributed merge pipeline powers index backfills: disabled/off/false, legacy, declarative, or enabled/on/true",
55+
"disabled",
56+
map[distributedMergeIndexBackfillMode]string{
57+
distributedMergeModeDisabled: "disabled",
58+
distributedMergeModeEnabled: "enabled",
59+
distributedMergeModeLegacy: "legacy",
60+
distributedMergeModeDeclarative: "declarative",
61+
distributedMergeModeAliasFalse: "false",
62+
distributedMergeModeAliasTrue: "true",
63+
distributedMergeModeAliasOff: "off",
64+
distributedMergeModeAliasOn: "on",
65+
},
66+
settings.WithRetiredName("bulkio.index_backfill.distributed_merge.enabled"),
67+
)
68+
69+
// shouldEnableDistributedMergeIndexBackfill determines whether the specified
70+
// backfill consumer should opt into the distributed merge pipeline based on the
71+
// current cluster setting and version state.
72+
func shouldEnableDistributedMergeIndexBackfill(
73+
ctx context.Context, st *cluster.Settings, consumer DistributedMergeConsumer,
74+
) (bool, error) {
75+
mode := DistributedMergeIndexBackfillMode.Get(&st.SV)
76+
var enable bool
77+
switch mode {
78+
case distributedMergeModeDisabled, distributedMergeModeAliasFalse, distributedMergeModeAliasOff:
79+
enable = false
80+
case distributedMergeModeLegacy:
81+
enable = consumer == DistributedMergeConsumerLegacy
82+
case distributedMergeModeDeclarative:
83+
enable = consumer == DistributedMergeConsumerDeclarative
84+
case distributedMergeModeEnabled, distributedMergeModeAliasTrue, distributedMergeModeAliasOn:
85+
enable = true
86+
default:
87+
return false, errors.AssertionFailedf("unrecognized distributed merge index backfill mode %d", mode)
88+
}
89+
if enable && !st.Version.IsActive(ctx, clusterversion.V26_1) {
90+
return false, pgerror.New(pgcode.FeatureNotSupported, "distributed merge requires cluster version 26.1")
91+
}
92+
return enable, nil
93+
}
94+
95+
// EnableDistributedMergeIndexBackfillSink updates the backfiller spec to use the
96+
// distributed merge sink and file prefix for the provided SQL instance.
97+
func EnableDistributedMergeIndexBackfillSink(
98+
nodeID base.SQLInstanceID, spec *execinfrapb.BackfillerSpec,
99+
) {
100+
spec.UseDistributedMergeSink = true
101+
spec.DistributedMergeFilePrefix = fmt.Sprintf("nodelocal://%d/index-backfill", nodeID)
102+
}
103+
104+
// DetermineDistributedMergeMode evaluates the cluster setting to decide
105+
// whether backfills for the specified consumer should opt into the distributed
106+
// merge pipeline.
107+
func DetermineDistributedMergeMode(
108+
ctx context.Context, st *cluster.Settings, consumer DistributedMergeConsumer,
109+
) (jobspb.IndexBackfillDistributedMergeMode, error) {
110+
if st == nil {
111+
return jobspb.IndexBackfillDistributedMergeMode_Disabled, nil
112+
}
113+
useDistributedMerge, err := shouldEnableDistributedMergeIndexBackfill(ctx, st, consumer)
114+
if err != nil {
115+
return jobspb.IndexBackfillDistributedMergeMode_Disabled, err
116+
}
117+
if useDistributedMerge {
118+
return jobspb.IndexBackfillDistributedMergeMode_Enabled, nil
119+
}
120+
return jobspb.IndexBackfillDistributedMergeMode_Disabled, nil
121+
}

pkg/sql/distsql_plan_backfill.go

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,14 @@ package sql
77

88
import (
99
"context"
10-
"fmt"
1110
"time"
1211
"unsafe"
1312

14-
"github.com/cockroachdb/cockroach/pkg/base"
15-
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1613
"github.com/cockroachdb/cockroach/pkg/roachpb"
1714
"github.com/cockroachdb/cockroach/pkg/settings"
18-
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1915
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2016
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2117
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
22-
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
23-
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2418
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
2519
"github.com/cockroachdb/cockroach/pkg/sql/types"
2620
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -86,30 +80,6 @@ var initialSplitsPerProcessor = settings.RegisterIntSetting(
8680
settings.NonNegativeInt,
8781
)
8882

89-
var distributedMergeIndexBackfillEnabled = settings.RegisterBoolSetting(
90-
settings.ApplicationLevel,
91-
"bulkio.index_backfill.distributed_merge.enabled",
92-
"enable the distributed merge pipeline for index backfills",
93-
false,
94-
)
95-
96-
func maybeEnableDistributedMergeIndexBackfill(
97-
ctx context.Context,
98-
st *cluster.Settings,
99-
nodeID base.SQLInstanceID,
100-
spec *execinfrapb.BackfillerSpec,
101-
) error {
102-
if !distributedMergeIndexBackfillEnabled.Get(&st.SV) {
103-
return nil
104-
}
105-
if !st.Version.IsActive(ctx, clusterversion.V26_1) {
106-
return pgerror.New(pgcode.FeatureNotSupported, "distributed merge requires cluster version 26.1")
107-
}
108-
spec.UseDistributedMergeSink = true
109-
spec.DistributedMergeFilePrefix = fmt.Sprintf("nodelocal://%d/index-backfill", nodeID)
110-
return nil
111-
}
112-
11383
// createBackfiller generates a plan consisting of index/column backfiller
11484
// processors, one for each node that has spans that we are reading. The plan is
11585
// finalized.

pkg/sql/index_backfiller.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/kv"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
1617
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
@@ -121,6 +122,10 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
121122
updateSSTManifests := func(newManifests []jobspb.IndexBackfillSSTManifest) {
122123
progress.SSTManifests = sstManifestBuf.Append(newManifests)
123124
}
125+
mode, err := getIndexBackfillDistributedMergeMode(job)
126+
if err != nil {
127+
return err
128+
}
124129
updateFunc := func(
125130
ctx context.Context, meta *execinfrapb.ProducerMetadata,
126131
) error {
@@ -175,6 +180,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
175180
// timestamp because other writing transactions have been writing at the
176181
// appropriate timestamps in-between.
177182
readAsOf := now
183+
useDistributedMerge := mode == jobspb.IndexBackfillDistributedMergeMode_Enabled
178184
run, retErr := ib.plan(
179185
ctx,
180186
descriptor,
@@ -184,6 +190,7 @@ func (ib *IndexBackfillPlanner) BackfillIndexes(
184190
spansToDo,
185191
progress.DestIndexIDs,
186192
progress.SourceIndexID,
193+
useDistributedMerge,
187194
updateFunc,
188195
)
189196
if retErr != nil {
@@ -235,6 +242,7 @@ func (ib *IndexBackfillPlanner) plan(
235242
sourceSpans []roachpb.Span,
236243
indexesToBackfill []descpb.IndexID,
237244
sourceIndexID descpb.IndexID,
245+
useDistributedMerge bool,
238246
callback func(_ context.Context, meta *execinfrapb.ProducerMetadata) error,
239247
) (runFunc func(context.Context) error, _ error) {
240248

@@ -258,8 +266,8 @@ func (ib *IndexBackfillPlanner) plan(
258266
*td.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize,
259267
indexesToBackfill, sourceIndexID,
260268
)
261-
if err := maybeEnableDistributedMergeIndexBackfill(ctx, ib.execCfg.Settings, ib.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec); err != nil {
262-
return err
269+
if useDistributedMerge {
270+
backfill.EnableDistributedMergeIndexBackfillSink(ib.execCfg.NodeInfo.NodeID.SQLInstanceID(), &spec)
263271
}
264272
var err error
265273
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, sourceSpans)
@@ -286,3 +294,15 @@ func (ib *IndexBackfillPlanner) plan(
286294
return cbw.Err()
287295
}, nil
288296
}
297+
298+
func getIndexBackfillDistributedMergeMode(
299+
job *jobs.Job,
300+
) (jobspb.IndexBackfillDistributedMergeMode, error) {
301+
payload := job.Payload()
302+
details := payload.GetNewSchemaChange()
303+
if details == nil {
304+
return jobspb.IndexBackfillDistributedMergeMode_Disabled,
305+
errors.AssertionFailedf("expected new schema change details on job %d", job.ID())
306+
}
307+
return details.DistributedMergeMode, nil
308+
}

pkg/sql/logictest/testdata/logic_test/create_index

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -712,13 +712,13 @@ subtest end
712712
subtest distributed_merge_index_backfill_placeholder
713713

714714
statement ok
715-
CREATE TABLE dist_merge_idx (a INT PRIMARY KEY, b INT)
715+
CREATE TABLE dist_merge_idx (a INT NOT NULL PRIMARY KEY, b INT NOT NULL, c INT NOT NULL UNIQUE)
716716

717717
statement ok
718-
INSERT INTO dist_merge_idx VALUES (1,1), (2,2), (3,3)
718+
INSERT INTO dist_merge_idx VALUES (1,1,1), (2,2,2), (3,3,3)
719719

720720
statement ok
721-
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.enabled = true
721+
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.mode = enabled
722722

723723
# TODO(158378): The end-to-end flow for create index using distributed
724724
# merge isn't implemented yet, so we get to the validation step and notice
@@ -732,8 +732,21 @@ onlyif config local-mixed-25.3 local-mixed-25.4
732732
statement error pq: .*distributed merge requires cluster version 26.*
733733
CREATE INDEX dist_merge_idx_idx ON dist_merge_idx (b)
734734

735+
# Try an operation that is implemented in the declarative schema changer.
736+
#
737+
# TODO(158378): Declarative schema change don't yet implement the full flow for
738+
# distributed merge. So, we end up with a similar situation as above where we
739+
# get to validation and notice no rows were ingested.
740+
skipif config local-mixed-25.3 local-mixed-25.4
741+
statement error pgcode 23505 pq: .*duplicate key value violates unique constraint.*
742+
ALTER TABLE dist_merge_idx ALTER PRIMARY KEY USING COLUMNS (b)
743+
744+
onlyif config local-mixed-25.3 local-mixed-25.4
745+
statement error pq: .*distributed merge requires cluster version 26.*
746+
ALTER TABLE dist_merge_idx ALTER PRIMARY KEY USING COLUMNS (b)
747+
735748
statement ok
736-
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.enabled = false
749+
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.mode = 'disabled'
737750

738751
statement ok
739752
DROP TABLE dist_merge_idx

0 commit comments

Comments
 (0)