Skip to content

Commit 92af8a2

Browse files
committed
sql/fingerprint: add span fingerprinting framework
This adds a new high level framework for fingerprinting a span by fingerprinting its subspans using many worker and persisting partial results. The actual computation any single subspan's fingerprint, or actual persistance of a the partial state, are left to the implementation of passed helpers -- this new framework just orchestrates these abstract actions which can in turn be implemented by a real job's job_info rows/scanning KV, or by testing mocks. Release note: none. Epic: none.
1 parent d1a7838 commit 92af8a2

File tree

5 files changed

+564
-0
lines changed

5 files changed

+564
-0
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ ALL_TESTS = [
500500
"//pkg/sql/execstats:execstats_test",
501501
"//pkg/sql/export:export_test",
502502
"//pkg/sql/exprutil:exprutil_test",
503+
"//pkg/sql/fingerprint:fingerprint_test",
503504
"//pkg/sql/flowinfra:flowinfra_disallowed_imports_test",
504505
"//pkg/sql/flowinfra:flowinfra_test",
505506
"//pkg/sql/gcjob/gcjobnotifier:gcjobnotifier_test",
@@ -2064,6 +2065,8 @@ GO_TARGETS = [
20642065
"//pkg/sql/exprutil:exprutil",
20652066
"//pkg/sql/exprutil:exprutil_test",
20662067
"//pkg/sql/faketreeeval:faketreeeval",
2068+
"//pkg/sql/fingerprint:fingerprint",
2069+
"//pkg/sql/fingerprint:fingerprint_test",
20672070
"//pkg/sql/flowinfra:flowinfra",
20682071
"//pkg/sql/flowinfra:flowinfra_test",
20692072
"//pkg/sql/gcjob/gcjobnotifier:gcjobnotifier",

pkg/sql/fingerprint/BUILD.bazel

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "fingerprint",
5+
srcs = ["gatherer.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/fingerprint",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/roachpb",
10+
"//pkg/sql",
11+
"//pkg/util/ctxgroup",
12+
"//pkg/util/hlc",
13+
"//pkg/util/span",
14+
"//pkg/util/syncutil",
15+
"@com_github_cockroachdb_errors//:errors",
16+
],
17+
)
18+
19+
go_test(
20+
name = "fingerprint_test",
21+
srcs = [
22+
"gatherer_test.go",
23+
"main_test.go",
24+
],
25+
embed = [":fingerprint"],
26+
deps = [
27+
"//pkg/base",
28+
"//pkg/roachpb",
29+
"//pkg/security/securityassets",
30+
"//pkg/security/securitytest",
31+
"//pkg/server",
32+
"//pkg/sql",
33+
"//pkg/testutils/serverutils",
34+
"//pkg/testutils/testcluster",
35+
"//pkg/util/hlc",
36+
"//pkg/util/leaktest",
37+
"//pkg/util/span",
38+
"@com_github_stretchr_testify//require",
39+
],
40+
)

pkg/sql/fingerprint/gatherer.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package fingerprint
7+
8+
import (
9+
"context"
10+
"time"
11+
12+
"github.com/cockroachdb/cockroach/pkg/roachpb"
13+
"github.com/cockroachdb/cockroach/pkg/sql"
14+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
15+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/cockroach/pkg/util/span"
17+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
18+
"github.com/cockroachdb/errors"
19+
)
20+
21+
type spanFingerprintResult struct {
22+
span roachpb.Span
23+
fingerprint uint64
24+
}
25+
26+
type spanFingerprinter func(ctx context.Context, span roachpb.Span) (spanFingerprintResult, error)
27+
28+
type checkpointState struct {
29+
fingerprint uint64
30+
totalParts int
31+
done span.Frontier
32+
}
33+
34+
type persister interface {
35+
load(ctx context.Context) (checkpointState, bool, error)
36+
store(ctx context.Context, state checkpointState, frac float64) error
37+
}
38+
39+
type partitioner interface {
40+
partition(ctx context.Context, spans []roachpb.Span) ([]sql.SpanPartition, error)
41+
}
42+
43+
// gatherer represents the high-level orchestration involved in fingerprinting
44+
// a collection of spans in terms of abstract components for computing the
45+
// fingerprint for any individual span or for persisting and retrieving progress
46+
// information from durable (e.g. job_info) storage. The gatherer is responsible
47+
// for tying these components together while managing concurrency and
48+
// coordination acoss the workers doing the fingerprinting, aggregating their
49+
// partial results, and periodically the aggregation, as well as resuming this
50+
// process from a previously persisted checkpoint.
51+
type gatherer struct {
52+
spans []roachpb.Span
53+
persister persister
54+
partitioner partitioner
55+
fn spanFingerprinter
56+
chkptFreq func() time.Duration
57+
58+
mu struct {
59+
syncutil.Mutex
60+
checkpointState
61+
prog []struct{ total, remaining int }
62+
}
63+
}
64+
65+
// Run executes the fingerprinting job.
66+
// Spans should be the initial target spans.
67+
// Returns the final fingerprint.
68+
func (g *gatherer) run(ctx context.Context) (uint64, error) {
69+
loaded, ok, err := g.persister.load(ctx)
70+
if err != nil {
71+
return 0, err
72+
}
73+
if ok {
74+
g.mu.checkpointState = loaded
75+
} else {
76+
g.mu.checkpointState.done, err = span.MakeFrontier(g.spans...)
77+
if err != nil {
78+
return 0, err
79+
}
80+
}
81+
82+
todo := g.remaining()
83+
if len(todo) == 0 {
84+
return g.finish(ctx)
85+
}
86+
87+
partitions, err := g.partitioner.partition(ctx, todo)
88+
if err != nil {
89+
return 0, errors.Wrap(err, "failed to partition spans")
90+
}
91+
g.mu.prog = make([]struct{ total, remaining int }, len(partitions))
92+
93+
stopCheckpoint := make(chan struct{})
94+
results := make(chan spanResult, len(partitions))
95+
grp := ctxgroup.WithContext(ctx)
96+
97+
// Gather the individual span fingerprints.
98+
grp.GoCtx(func(ctx context.Context) error {
99+
defer close(results)
100+
return ctxgroup.GroupWorkers(ctx, len(partitions), func(ctx context.Context, i int) error {
101+
return worker(ctx, i, partitions[i].Spans, g.fn, results)
102+
})
103+
})
104+
105+
// Accumulate results from the workers into the gatherer state.
106+
grp.GoCtx(func(ctx context.Context) error {
107+
defer close(stopCheckpoint)
108+
for res := range results {
109+
if err := g.record(res); err != nil {
110+
return err
111+
}
112+
}
113+
return nil
114+
})
115+
116+
// Periodically checkpoint progress to the persister.
117+
grp.GoCtx(func(ctx context.Context) error {
118+
freq := g.chkptFreq()
119+
ticker := time.NewTicker(freq)
120+
for {
121+
select {
122+
case <-ticker.C:
123+
if err := g.checkpoint(ctx); err != nil {
124+
return errors.Wrap(err, "failed to checkpoint progress")
125+
}
126+
if f := g.chkptFreq(); f != freq {
127+
freq = f
128+
ticker.Reset(freq)
129+
}
130+
case <-stopCheckpoint:
131+
return nil
132+
}
133+
}
134+
})
135+
136+
if err := grp.Wait(); err != nil {
137+
return 0, err
138+
}
139+
140+
return g.finish(ctx)
141+
}
142+
143+
func (g *gatherer) remaining() []roachpb.Span {
144+
var remaining []roachpb.Span
145+
for span, ts := range g.mu.done.Entries() {
146+
if ts.Less(hlc.Timestamp{WallTime: 1}) {
147+
remaining = append(remaining, span)
148+
}
149+
}
150+
return remaining
151+
}
152+
153+
func (g *gatherer) record(result spanResult) error {
154+
g.mu.Lock()
155+
defer g.mu.Unlock()
156+
157+
g.mu.fingerprint ^= result.fingerprint
158+
if _, err := g.mu.done.Forward(result.span, hlc.Timestamp{WallTime: 1}); err != nil {
159+
return err
160+
}
161+
g.mu.prog[result.idx].total, g.mu.prog[result.idx].remaining = result.total, result.remaining
162+
163+
return nil
164+
}
165+
166+
func (g *gatherer) checkpoint(ctx context.Context) error {
167+
g.mu.Lock()
168+
defer g.mu.Unlock()
169+
var overallRemaining, overallTotal int
170+
for _, p := range g.mu.prog {
171+
overallRemaining += p.remaining
172+
overallTotal += p.total
173+
}
174+
175+
// Update the totalPart count that gets persisted if needed, and use it as the
176+
// overall total for progress fraction reporting.
177+
g.mu.checkpointState.totalParts = max(overallTotal, g.mu.checkpointState.totalParts)
178+
overallTotal = g.mu.checkpointState.totalParts
179+
180+
// Nothing to save yet.
181+
if overallRemaining == overallTotal {
182+
return nil
183+
}
184+
return g.persister.store(ctx, g.mu.checkpointState, float64(overallTotal-overallRemaining)/float64(overallTotal))
185+
}
186+
187+
func (g *gatherer) finish(ctx context.Context) (uint64, error) {
188+
return g.mu.fingerprint, nil
189+
}
190+
191+
type spanResult struct {
192+
spanFingerprintResult
193+
idx, total, remaining int
194+
}
195+
196+
func worker(
197+
ctx context.Context, idx int, todo []roachpb.Span, fn spanFingerprinter, res chan<- spanResult,
198+
) error {
199+
done := ctx.Done()
200+
i := 0
201+
for _, sp := range todo {
202+
select {
203+
case <-done:
204+
return ctx.Err()
205+
default:
206+
r, err := fn(ctx, sp)
207+
if err != nil {
208+
return err
209+
}
210+
i++
211+
res <- spanResult{r, idx, len(todo), len(todo) - i}
212+
}
213+
}
214+
return nil
215+
}

0 commit comments

Comments
 (0)