Skip to content

Commit c9f37c8

Browse files
craig[bot]dt
andcommitted
Merge #158506
158506: sql/fingerprint: add new job fingerprint spans job r=dt a=dt Currently unused outside of tests. This new job computes the kv fingerprint of the configured spans using the KV ExportRequest fingerprint mechanism. Release note: none. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents 52d5d8e + 8988b46 commit c9f37c8

File tree

12 files changed

+1429
-1
lines changed

12 files changed

+1429
-1
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4430,6 +4430,105 @@ layers:
44304430
unit: COUNT
44314431
aggregation: AVG
44324432
derivative: NON_NEGATIVE_DERIVATIVE
4433+
- name: jobs.fingerprint.currently_idle
4434+
exported_name: jobs_fingerprint_currently_idle
4435+
labeled_name: 'jobs{type: fingerprint, status: currently_idle}'
4436+
description: Number of fingerprint jobs currently considered Idle and can be freely shut down
4437+
y_axis_label: jobs
4438+
type: GAUGE
4439+
unit: COUNT
4440+
aggregation: AVG
4441+
derivative: NONE
4442+
- name: jobs.fingerprint.currently_paused
4443+
exported_name: jobs_fingerprint_currently_paused
4444+
labeled_name: 'jobs{name: fingerprint, status: currently_paused}'
4445+
description: Number of fingerprint jobs currently considered Paused
4446+
y_axis_label: jobs
4447+
type: GAUGE
4448+
unit: COUNT
4449+
aggregation: AVG
4450+
derivative: NONE
4451+
- name: jobs.fingerprint.currently_running
4452+
exported_name: jobs_fingerprint_currently_running
4453+
labeled_name: 'jobs{type: fingerprint, status: currently_running}'
4454+
description: Number of fingerprint jobs currently running in Resume or OnFailOrCancel state
4455+
y_axis_label: jobs
4456+
type: GAUGE
4457+
unit: COUNT
4458+
aggregation: AVG
4459+
derivative: NONE
4460+
- name: jobs.fingerprint.expired_pts_records
4461+
exported_name: jobs_fingerprint_expired_pts_records
4462+
labeled_name: 'jobs.expired_pts_records{type: fingerprint}'
4463+
description: Number of expired protected timestamp records owned by fingerprint jobs
4464+
y_axis_label: records
4465+
type: COUNTER
4466+
unit: COUNT
4467+
aggregation: AVG
4468+
derivative: NON_NEGATIVE_DERIVATIVE
4469+
- name: jobs.fingerprint.fail_or_cancel_completed
4470+
exported_name: jobs_fingerprint_fail_or_cancel_completed
4471+
labeled_name: 'jobs.fail_or_cancel{name: fingerprint, status: completed}'
4472+
description: Number of fingerprint jobs which successfully completed their failure or cancelation process
4473+
y_axis_label: jobs
4474+
type: COUNTER
4475+
unit: COUNT
4476+
aggregation: AVG
4477+
derivative: NON_NEGATIVE_DERIVATIVE
4478+
- name: jobs.fingerprint.fail_or_cancel_retry_error
4479+
exported_name: jobs_fingerprint_fail_or_cancel_retry_error
4480+
labeled_name: 'jobs.fail_or_cancel{name: fingerprint, status: retry_error}'
4481+
description: Number of fingerprint jobs which failed with a retriable error on their failure or cancelation process
4482+
y_axis_label: jobs
4483+
type: COUNTER
4484+
unit: COUNT
4485+
aggregation: AVG
4486+
derivative: NON_NEGATIVE_DERIVATIVE
4487+
- name: jobs.fingerprint.protected_age_sec
4488+
exported_name: jobs_fingerprint_protected_age_sec
4489+
labeled_name: 'jobs.protected_age_sec{type: fingerprint}'
4490+
description: The age of the oldest PTS record protected by fingerprint jobs
4491+
y_axis_label: seconds
4492+
type: GAUGE
4493+
unit: SECONDS
4494+
aggregation: AVG
4495+
derivative: NONE
4496+
- name: jobs.fingerprint.protected_record_count
4497+
exported_name: jobs_fingerprint_protected_record_count
4498+
labeled_name: 'jobs.protected_record_count{type: fingerprint}'
4499+
description: Number of protected timestamp records held by fingerprint jobs
4500+
y_axis_label: records
4501+
type: GAUGE
4502+
unit: COUNT
4503+
aggregation: AVG
4504+
derivative: NONE
4505+
- name: jobs.fingerprint.resume_completed
4506+
exported_name: jobs_fingerprint_resume_completed
4507+
labeled_name: 'jobs.resume{name: fingerprint, status: completed}'
4508+
description: Number of fingerprint jobs which successfully resumed to completion
4509+
y_axis_label: jobs
4510+
type: COUNTER
4511+
unit: COUNT
4512+
aggregation: AVG
4513+
derivative: NON_NEGATIVE_DERIVATIVE
4514+
- name: jobs.fingerprint.resume_failed
4515+
exported_name: jobs_fingerprint_resume_failed
4516+
labeled_name: 'jobs.resume{name: fingerprint, status: failed}'
4517+
description: Number of fingerprint jobs which failed with a non-retriable error
4518+
y_axis_label: jobs
4519+
type: COUNTER
4520+
unit: COUNT
4521+
aggregation: AVG
4522+
derivative: NON_NEGATIVE_DERIVATIVE
4523+
- name: jobs.fingerprint.resume_retry_error
4524+
exported_name: jobs_fingerprint_resume_retry_error
4525+
labeled_name: 'jobs.resume{name: fingerprint, status: retry_error}'
4526+
description: Number of fingerprint jobs which failed with a retriable error
4527+
y_axis_label: jobs
4528+
type: COUNTER
4529+
unit: COUNT
4530+
aggregation: AVG
4531+
derivative: NON_NEGATIVE_DERIVATIVE
44334532
- name: jobs.history_retention.currently_idle
44344533
exported_name: jobs_history_retention_currently_idle
44354534
labeled_name: 'jobs{type: history_retention, status: currently_idle}'

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ ALL_TESTS = [
500500
"//pkg/sql/execstats:execstats_test",
501501
"//pkg/sql/export:export_test",
502502
"//pkg/sql/exprutil:exprutil_test",
503+
"//pkg/sql/fingerprint:fingerprint_test",
503504
"//pkg/sql/flowinfra:flowinfra_disallowed_imports_test",
504505
"//pkg/sql/flowinfra:flowinfra_test",
505506
"//pkg/sql/gcjob/gcjobnotifier:gcjobnotifier_test",
@@ -2064,6 +2065,8 @@ GO_TARGETS = [
20642065
"//pkg/sql/exprutil:exprutil",
20652066
"//pkg/sql/exprutil:exprutil_test",
20662067
"//pkg/sql/faketreeeval:faketreeeval",
2068+
"//pkg/sql/fingerprint:fingerprint",
2069+
"//pkg/sql/fingerprint:fingerprint_test",
20672070
"//pkg/sql/flowinfra:flowinfra",
20682071
"//pkg/sql/flowinfra:flowinfra_test",
20692072
"//pkg/sql/gcjob/gcjobnotifier:gcjobnotifier",

pkg/jobs/job_info_utils.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ package jobs
88
import (
99
"bytes"
1010
"context"
11+
"encoding/binary"
1112
"fmt"
1213
"io"
1314
"strings"
1415

1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1617
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1719
"github.com/cockroachdb/errors"
1820
"github.com/klauspost/compress/gzip"
1921
)
@@ -108,3 +110,47 @@ func ReadChunkedFileToJobInfo(
108110

109111
return buf.Bytes(), nil
110112
}
113+
114+
// WriteUint64 writes a uint64 value to the info storage.
115+
func (i InfoStorage) WriteUint64(ctx context.Context, infoKey string, value uint64) error {
116+
buf := make([]byte, 8)
117+
binary.LittleEndian.PutUint64(buf, value)
118+
return i.Write(ctx, infoKey, buf)
119+
}
120+
121+
// GetUint64 retrieves a uint64 value from the info storage.
122+
// Returns (value, found, error).
123+
func (i InfoStorage) GetUint64(ctx context.Context, infoKey string) (uint64, bool, error) {
124+
data, found, err := i.Get(ctx, "get-uint64", infoKey)
125+
if err != nil || !found {
126+
return 0, found, err
127+
}
128+
if len(data) != 8 {
129+
return 0, false, errors.Newf("invalid uint64 data length: %d", len(data))
130+
}
131+
return binary.LittleEndian.Uint64(data), true, nil
132+
}
133+
134+
// WriteProto writes a protobuf message to the info storage.
135+
func (i InfoStorage) WriteProto(ctx context.Context, infoKey string, msg protoutil.Message) error {
136+
data, err := protoutil.Marshal(msg)
137+
if err != nil {
138+
return errors.Wrap(err, "failed to marshal proto")
139+
}
140+
return i.Write(ctx, infoKey, data)
141+
}
142+
143+
// GetProto retrieves a protobuf message from the info storage.
144+
// Returns (found, error). If found, the message is unmarshaled into msg.
145+
func (i InfoStorage) GetProto(
146+
ctx context.Context, infoKey string, msg protoutil.Message,
147+
) (bool, error) {
148+
data, found, err := i.Get(ctx, "get-proto", infoKey)
149+
if err != nil || !found {
150+
return found, err
151+
}
152+
if err := protoutil.Unmarshal(data, msg); err != nil {
153+
return false, errors.Wrap(err, "failed to unmarshal proto")
154+
}
155+
return true, nil
156+
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,18 @@ message InspectDetails {
15341534
];
15351535
}
15361536

1537+
message FingerprintDetails {
1538+
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
1539+
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
1540+
util.hlc.Timestamp start = 3 [(gogoproto.nullable) = false];
1541+
bool stripped = 4;
1542+
bytes pts = 5 [(gogoproto.customname) = "PTS", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", (gogoproto.nullable) = false];
1543+
}
1544+
1545+
message FingerprintProgress {
1546+
// Not used: progress is stored in its own info key(s) and frontier.
1547+
}
1548+
15371549
message UpdateTableMetadataCacheDetails {}
15381550
message UpdateTableMetadataCacheProgress {
15391551
enum Status {
@@ -1659,6 +1671,7 @@ message Payload {
16591671
SqlActivityFlushDetails sql_activity_flush_details = 51;
16601672
HotRangesLoggerDetails hot_ranges_logger_details = 52;
16611673
InspectDetails inspect_details = 53;
1674+
FingerprintDetails fingerprint_details = 54;
16621675
}
16631676
reserved 26;
16641677
// PauseReason is used to describe the reason that the job is currently paused
@@ -1743,6 +1756,7 @@ message Progress {
17431756
SqlActivityFlushProgress sql_activity_flush = 39;
17441757
HotRangesLoggerProgress hot_ranges_logger = 40;
17451758
InspectProgress inspect = 41;
1759+
FingerprintProgress fingerprint = 42;
17461760
}
17471761

17481762
uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
@@ -1790,6 +1804,7 @@ enum Type {
17901804
SQL_ACTIVITY_FLUSH = 31 [(gogoproto.enumvalue_customname) = "TypeSQLActivityFlush"];
17911805
HOT_RANGES_LOGGER = 32 [(gogoproto.enumvalue_customname) = "TypeHotRangesLogger"];
17921806
INSPECT = 33 [(gogoproto.enumvalue_customname) = "TypeInspect"];
1807+
FINGERPRINT = 34 [(gogoproto.enumvalue_customname) = "TypeFingerprint"];
17931808
}
17941809

17951810
message Job {

pkg/jobs/jobspb/wrap.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var (
4949
_ Details = SqlActivityFlushDetails{}
5050
_ Details = HotRangesLoggerDetails{}
5151
_ Details = InspectDetails{}
52+
_ Details = FingerprintDetails{}
5253
)
5354

5455
// ProgressDetails is a marker interface for job progress details proto structs.
@@ -82,6 +83,7 @@ var (
8283
_ ProgressDetails = SqlActivityFlushProgress{}
8384
_ ProgressDetails = HotRangesLoggerProgress{}
8485
_ ProgressDetails = InspectProgress{}
86+
_ ProgressDetails = FingerprintProgress{}
8587
)
8688

8789
// Type returns the payload's job type and panics if the type is invalid.
@@ -249,6 +251,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
249251
return TypeHotRangesLogger, nil
250252
case *Payload_InspectDetails:
251253
return TypeInspect, nil
254+
case *Payload_FingerprintDetails:
255+
return TypeFingerprint, nil
252256
default:
253257
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
254258
}
@@ -305,6 +309,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
305309
TypeSQLActivityFlush: SqlActivityFlushDetails{},
306310
TypeHotRangesLogger: HotRangesLoggerDetails{},
307311
TypeInspect: InspectDetails{},
312+
TypeFingerprint: FingerprintDetails{},
308313
}
309314

310315
// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
@@ -378,6 +383,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
378383
return &Progress_HotRangesLogger{HotRangesLogger: &d}
379384
case InspectProgress:
380385
return &Progress_Inspect{Inspect: &d}
386+
case FingerprintProgress:
387+
return &Progress_Fingerprint{Fingerprint: &d}
381388
default:
382389
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
383390
}
@@ -449,6 +456,8 @@ func (p *Payload) UnwrapDetails() Details {
449456
return *d.HotRangesLoggerDetails
450457
case *Payload_InspectDetails:
451458
return *d.InspectDetails
459+
case *Payload_FingerprintDetails:
460+
return *d.FingerprintDetails
452461
default:
453462
return nil
454463
}
@@ -520,6 +529,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
520529
return *d.HotRangesLogger
521530
case *Progress_Inspect:
522531
return d.Inspect
532+
case *Progress_Fingerprint:
533+
return d.Fingerprint
523534
default:
524535
return nil
525536
}
@@ -615,6 +626,8 @@ func WrapPayloadDetails(details Details) interface {
615626
return &Payload_HotRangesLoggerDetails{HotRangesLoggerDetails: &d}
616627
case InspectDetails:
617628
return &Payload_InspectDetails{InspectDetails: &d}
629+
case FingerprintDetails:
630+
return &Payload_FingerprintDetails{FingerprintDetails: &d}
618631
default:
619632
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
620633
}
@@ -650,7 +663,7 @@ const (
650663
func (Type) SafeValue() {}
651664

652665
// NumJobTypes is the number of jobs types.
653-
const NumJobTypes = 34
666+
const NumJobTypes = 35
654667

655668
// ChangefeedDetailsMarshaler allows for dependency injection of
656669
// cloud.SanitizeExternalStorageURI to avoid the dependency from this

pkg/roachprod/agents/opentelemetry/files/cockroachdb_metrics.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,17 @@ jobs_create_stats_fail_or_cancel_retry_error: jobs.create_stats.fail_or_cancel_r
842842
jobs_create_stats_resume_completed: jobs.create_stats.resume_completed
843843
jobs_create_stats_resume_failed: jobs.create_stats.resume_failed
844844
jobs_create_stats_resume_retry_error: jobs.create_stats.resume_retry_error
845+
jobs_fingerprint_currently_idle: jobs.fingerprint.currently_idle
846+
jobs_fingerprint_currently_paused: jobs.fingerprint.currently_paused
847+
jobs_fingerprint_currently_running: jobs.fingerprint.currently_running
848+
jobs_fingerprint_expired_pts_records: jobs.fingerprint.expired_pts_records
849+
jobs_fingerprint_fail_or_cancel_completed: jobs.fingerprint.fail_or_cancel_completed
850+
jobs_fingerprint_fail_or_cancel_retry_error: jobs.fingerprint.fail_or_cancel_retry_error
851+
jobs_fingerprint_protected_age_sec: jobs.fingerprint.protected_age_sec
852+
jobs_fingerprint_protected_record_count: jobs.fingerprint.protected_record_count
853+
jobs_fingerprint_resume_completed: jobs.fingerprint.resume_completed
854+
jobs_fingerprint_resume_failed: jobs.fingerprint.resume_failed
855+
jobs_fingerprint_resume_retry_error: jobs.fingerprint.resume_retry_error
845856
jobs_history_retention_currently_idle: jobs.history_retention.currently_idle
846857
jobs_history_retention_currently_paused: jobs.history_retention.currently_paused
847858
jobs_history_retention_currently_running: jobs.history_retention.currently_running

pkg/sql/fingerprint/BUILD.bazel

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "fingerprint",
5+
srcs = [
6+
"fingerprint_job.go",
7+
"gatherer.go",
8+
],
9+
importpath = "github.com/cockroachdb/cockroach/pkg/sql/fingerprint",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//pkg/ccl/kvccl/kvfollowerreadsccl",
13+
"//pkg/jobs",
14+
"//pkg/jobs/joberror",
15+
"//pkg/jobs/jobfrontier",
16+
"//pkg/jobs/jobspb",
17+
"//pkg/kv",
18+
"//pkg/kv/kvpb",
19+
"//pkg/kv/kvserver/protectedts",
20+
"//pkg/roachpb",
21+
"//pkg/settings/cluster",
22+
"//pkg/sql",
23+
"//pkg/sql/isql",
24+
"//pkg/util/admission/admissionpb",
25+
"//pkg/util/ctxgroup",
26+
"//pkg/util/hlc",
27+
"//pkg/util/log",
28+
"//pkg/util/retry",
29+
"//pkg/util/span",
30+
"//pkg/util/syncutil",
31+
"//pkg/util/timeutil",
32+
"//pkg/util/tracing",
33+
"//pkg/util/tracing/tracingpb",
34+
"//pkg/util/uuid",
35+
"@com_github_cockroachdb_errors//:errors",
36+
"@com_github_cockroachdb_redact//:redact",
37+
],
38+
)
39+
40+
go_test(
41+
name = "fingerprint_test",
42+
srcs = [
43+
"fingerprint_job_test.go",
44+
"gatherer_test.go",
45+
"main_test.go",
46+
],
47+
embed = [":fingerprint"],
48+
deps = [
49+
"//pkg/base",
50+
"//pkg/jobs",
51+
"//pkg/jobs/jobspb",
52+
"//pkg/roachpb",
53+
"//pkg/security/securityassets",
54+
"//pkg/security/securitytest",
55+
"//pkg/security/username",
56+
"//pkg/server",
57+
"//pkg/sql",
58+
"//pkg/testutils/jobutils",
59+
"//pkg/testutils/serverutils",
60+
"//pkg/testutils/sqlutils",
61+
"//pkg/testutils/testcluster",
62+
"//pkg/util/hlc",
63+
"//pkg/util/leaktest",
64+
"//pkg/util/log",
65+
"//pkg/util/span",
66+
"@com_github_stretchr_testify//require",
67+
],
68+
)

0 commit comments

Comments
 (0)