Skip to content

Commit 6148c28

Browse files
craig[bot]matt
andcommitted
Merge #159208
159208: bulkmerge: implement SST merging in bulk merge processor r=mw5h a=mw5h Previously, the bulkMergeProcessor was a stub that returned placeholder output URIs without performing any actual SST merging. This made the distributed merge infrastructure non-functional for real workloads. This commit implements the core merging logic in the processor by: 1. Adding `mergeSSTs` method that identifies overlapping SSTs for each task's key range, creates an iterator over the external SST files, and writes merged output while respecting configurable size limits. 2. Introducing a new `Merge` function that creates and executes the DistSQL merge flow, collects results from all processors, and sorts the output SSTs by their start key for ingestion. 3. Adding comprehensive tests for both single-node and multi-node distributed merge scenarios, including test infrastructure for creating and verifying SST merge results. The implementation now produces correctly merged, non-overlapping SST files that can be ingested into ranges, making the distributed bulk merge functionality operational. Fixes: #156658 Release note: None Co-authored-by: matt <[email protected]>
2 parents 2f2800e + caa32ef commit 6148c28

File tree

9 files changed

+534
-149
lines changed

9 files changed

+534
-149
lines changed

pkg/sql/bulkmerge/BUILD.bazel

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
88
go_library(
99
name = "bulkmerge",
1010
srcs = [
11+
"merge.go",
1112
"merge_coordinator.go",
1213
"merge_loopback.go",
1314
"merge_planning.go",
@@ -17,14 +18,21 @@ go_library(
1718
visibility = ["//visibility:public"],
1819
deps = [
1920
"//pkg/base",
21+
"//pkg/ccl/storageccl",
22+
"//pkg/roachpb",
23+
"//pkg/settings",
2024
"//pkg/sql",
25+
"//pkg/sql/bulksst",
26+
"//pkg/sql/bulkutil",
2127
"//pkg/sql/execinfra",
2228
"//pkg/sql/execinfrapb",
2329
"//pkg/sql/physicalplan",
2430
"//pkg/sql/rowenc",
2531
"//pkg/sql/rowexec",
2632
"//pkg/sql/sem/tree",
2733
"//pkg/sql/types",
34+
"//pkg/storage",
35+
"//pkg/util/log",
2836
"//pkg/util/protoutil",
2937
"//pkg/util/syncutil",
3038
"//pkg/util/taskset",
@@ -36,27 +44,30 @@ go_test(
3644
name = "bulkmerge_test",
3745
srcs = [
3846
"main_test.go",
39-
"merge_processor_test.go",
4047
"merge_test.go",
4148
],
4249
embed = [":bulkmerge"],
4350
deps = [
4451
"//pkg/base",
52+
"//pkg/ccl/storageccl",
53+
"//pkg/cloud",
54+
"//pkg/cloud/externalconn/providers",
4555
"//pkg/kv/kvclient/kvtenant",
56+
"//pkg/roachpb",
4657
"//pkg/security/securityassets",
4758
"//pkg/security/securitytest",
4859
"//pkg/security/username",
4960
"//pkg/server",
50-
"//pkg/settings/cluster",
5161
"//pkg/sql",
52-
"//pkg/sql/execinfra",
62+
"//pkg/sql/bulksst",
63+
"//pkg/sql/bulkutil",
5364
"//pkg/sql/execinfrapb",
54-
"//pkg/sql/sem/eval",
5565
"//pkg/sql/sem/tree",
66+
"//pkg/storage",
67+
"//pkg/testutils",
5668
"//pkg/testutils/serverutils",
57-
"//pkg/testutils/skip",
58-
"//pkg/testutils/sqlutils",
5969
"//pkg/testutils/testcluster",
70+
"//pkg/util/hlc",
6071
"//pkg/util/leaktest",
6172
"//pkg/util/log",
6273
"//pkg/util/protoutil",

pkg/sql/bulkmerge/main_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
// Use of this software is governed by the CockroachDB Software License
44
// included in the /LICENSE file.
55

6-
package bulkmerge
6+
package bulkmerge_test
77

88
import (
99
"os"
1010
"testing"
1111

12+
_ "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/providers"
1213
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
1314
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
1415
"github.com/cockroachdb/cockroach/pkg/security/securitytest"

pkg/sql/bulkmerge/merge.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package bulkmerge
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"slices"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/sql"
16+
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
17+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
18+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
19+
)
20+
21+
// Merge creates and waits on a DistSQL flow that merges the provided SSTs into
22+
// the ranges defined by the input splits.
23+
func Merge(
24+
ctx context.Context,
25+
execCtx sql.JobExecContext,
26+
ssts []execinfrapb.BulkMergeSpec_SST,
27+
spans []roachpb.Span,
28+
genOutputURI func(sqlInstance base.SQLInstanceID) string,
29+
) ([]execinfrapb.BulkMergeSpec_SST, error) {
30+
execCfg := execCtx.ExecCfg()
31+
32+
plan, planCtx, err := newBulkMergePlan(ctx, execCtx, ssts, spans, genOutputURI)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
var result execinfrapb.BulkMergeSpec_Output
38+
rowWriter := sql.NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
39+
return protoutil.Unmarshal([]byte(*row[0].(*tree.DBytes)), &result)
40+
})
41+
42+
sqlReciever := sql.MakeDistSQLReceiver(
43+
ctx,
44+
rowWriter,
45+
tree.Rows,
46+
execCfg.RangeDescriptorCache,
47+
nil,
48+
nil,
49+
execCtx.ExtendedEvalContext().Tracing)
50+
defer sqlReciever.Release()
51+
52+
execCtx.DistSQLPlanner().Run(
53+
ctx,
54+
planCtx,
55+
nil,
56+
plan,
57+
sqlReciever,
58+
&execCtx.ExtendedEvalContext().Context,
59+
nil,
60+
)
61+
62+
if err := rowWriter.Err(); err != nil {
63+
return nil, err
64+
}
65+
66+
// Sort the SSTs by their range start key. Ingest requires that SSTs are
67+
// sorted an non-overlapping. The output of merge is not sorted because SSTs
68+
// are emitted as their task is completed.
69+
slices.SortFunc(result.SSTs, func(i, j execinfrapb.BulkMergeSpec_SST) int {
70+
return bytes.Compare(i.StartKey, j.StartKey)
71+
})
72+
73+
return result.SSTs, nil
74+
}

pkg/sql/bulkmerge/merge_coordinator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ func (m *mergeCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta
112112
case meta.Err != nil:
113113
m.MoveToDraining(meta.Err)
114114
default:
115-
m.MoveToDraining(errors.Newf("unexpected meta: %v", meta))
115+
// If there is non-nil meta, we pass it up the processor chain. It might
116+
// be something like a trace.
117+
return nil, meta
116118
}
117119
}
118120
return nil, m.DrainHelper()

pkg/sql/bulkmerge/merge_planning.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,23 @@ import (
99
"context"
1010

1111
"github.com/cockroachdb/cockroach/pkg/base"
12+
"github.com/cockroachdb/cockroach/pkg/roachpb"
1213
"github.com/cockroachdb/cockroach/pkg/sql"
1314
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1415
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
1516
"github.com/cockroachdb/errors"
1617
)
1718

1819
func newBulkMergePlan(
19-
ctx context.Context, execCtx sql.JobExecContext, taskCount int,
20+
ctx context.Context,
21+
execCtx sql.JobExecContext,
22+
ssts []execinfrapb.BulkMergeSpec_SST,
23+
spans []roachpb.Span,
24+
genOutputURI func(sqlInstance base.SQLInstanceID) string,
2025
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
26+
if len(spans) == 0 {
27+
return nil, nil, errors.Newf("no spans specified")
28+
}
2129
// NOTE: This implementation is inspired by the physical plan created by
2230
// restore in `pkg/backup/restore_processor_planning.go`
2331
// TODO(mw5h): We need to be careful about mixed version clusters, so consider
@@ -60,6 +68,14 @@ func newBulkMergePlan(
6068

6169
mergeStage := plan.NewStageOnNodes(sqlInstanceIDs)
6270
for streamID, sqlInstanceID := range sqlInstanceIDs {
71+
outputStorage, err := execCtx.ExecCfg().DistSQLSrv.ExternalStorageFromURI(
72+
ctx,
73+
genOutputURI(sqlInstanceID),
74+
execCtx.User(),
75+
)
76+
if err != nil {
77+
return nil, nil, err
78+
}
6379
pIdx := plan.AddProcessor(physicalplan.Processor{
6480
SQLInstanceID: sqlInstanceID,
6581
Spec: execinfrapb.ProcessorSpec{
@@ -68,7 +84,9 @@ func newBulkMergePlan(
6884
}},
6985
Core: execinfrapb.ProcessorCoreUnion{
7086
BulkMerge: &execinfrapb.BulkMergeSpec{
71-
// TODO(jeffswenson): fill in the rest of the spec
87+
SSTs: ssts,
88+
Spans: spans,
89+
OutputStorage: outputStorage.Conf(),
7290
},
7391
},
7492
Post: execinfrapb.PostProcessSpec{},
@@ -86,11 +104,12 @@ func newBulkMergePlan(
86104
DestInput: 0,
87105
})
88106
plan.ResultRouters = append(plan.ResultRouters, pIdx)
107+
outputStorage.Close()
89108
}
90109

91110
plan.AddSingleGroupStage(ctx, coordinatorID, execinfrapb.ProcessorCoreUnion{
92111
MergeCoordinator: &execinfrapb.MergeCoordinatorSpec{
93-
TaskCount: int64(taskCount),
112+
TaskCount: int64(len(spans)),
94113
WorkerSqlInstanceIds: keys,
95114
},
96115
}, execinfrapb.PostProcessSpec{}, mergeCoordinatorOutputTypes, nil /* finalizeLastStageCb */)

0 commit comments

Comments
 (0)