Skip to content

Commit f5ff484

Browse files
committed
sql/fingerprint: add new job fingerprint spans job
This just adds the job itself; it is currently unused outside of its direct tests included here (i.e. there is no code path to create these jobs wired up yet). This new job computes the kv fingerprint of the configured spans using the KV ExportRequest fingerprint mechanism, and stores progress and the result in the job's job_info rows. This KV fingerprint currently only fingerprints as of a given timestamp; it does not fingerprint all revisions prior to that timestamp. This means it does not need to support representing range deletions in its fingerprint, as those are not directly observed: you either observe the keys they deleted if you read prior to them, or don't observe them if you read after the deletion and reflect those keys or their absense in the fingerprint accordingly, but do not need to repesent the deletion itself in the fingerprint. This substantially simplifies the collection of the fingerprint from separate ExportRequests, as we do not need to accumulate and defragment partial range deletions across requests. The KV mode added here is useful for comparing the result of any sort of byte-for-byte copying of data, such as done by a RESTOTRE or PCR. A future "SQL" fingerprint implementation could be an alternative to the KV fingerprinter used here currently, which would use SQL-level scans and schema-aware datum decoding to compute fingerprints over decoded, logical values. While more expensive, this can be useful for cases where logically eqiuivalent data may have different on-disk representations (e.g. composite types), making such a mode ideal for comparing data copied by something like LDR. Release note: none. Epic: none.
1 parent 92af8a2 commit f5ff484

File tree

8 files changed

+778
-2
lines changed

8 files changed

+778
-2
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4430,6 +4430,105 @@ layers:
44304430
unit: COUNT
44314431
aggregation: AVG
44324432
derivative: NON_NEGATIVE_DERIVATIVE
4433+
- name: jobs.fingerprint.currently_idle
4434+
exported_name: jobs_fingerprint_currently_idle
4435+
labeled_name: 'jobs{type: fingerprint, status: currently_idle}'
4436+
description: Number of fingerprint jobs currently considered Idle and can be freely shut down
4437+
y_axis_label: jobs
4438+
type: GAUGE
4439+
unit: COUNT
4440+
aggregation: AVG
4441+
derivative: NONE
4442+
- name: jobs.fingerprint.currently_paused
4443+
exported_name: jobs_fingerprint_currently_paused
4444+
labeled_name: 'jobs{name: fingerprint, status: currently_paused}'
4445+
description: Number of fingerprint jobs currently considered Paused
4446+
y_axis_label: jobs
4447+
type: GAUGE
4448+
unit: COUNT
4449+
aggregation: AVG
4450+
derivative: NONE
4451+
- name: jobs.fingerprint.currently_running
4452+
exported_name: jobs_fingerprint_currently_running
4453+
labeled_name: 'jobs{type: fingerprint, status: currently_running}'
4454+
description: Number of fingerprint jobs currently running in Resume or OnFailOrCancel state
4455+
y_axis_label: jobs
4456+
type: GAUGE
4457+
unit: COUNT
4458+
aggregation: AVG
4459+
derivative: NONE
4460+
- name: jobs.fingerprint.expired_pts_records
4461+
exported_name: jobs_fingerprint_expired_pts_records
4462+
labeled_name: 'jobs.expired_pts_records{type: fingerprint}'
4463+
description: Number of expired protected timestamp records owned by fingerprint jobs
4464+
y_axis_label: records
4465+
type: COUNTER
4466+
unit: COUNT
4467+
aggregation: AVG
4468+
derivative: NON_NEGATIVE_DERIVATIVE
4469+
- name: jobs.fingerprint.fail_or_cancel_completed
4470+
exported_name: jobs_fingerprint_fail_or_cancel_completed
4471+
labeled_name: 'jobs.fail_or_cancel{name: fingerprint, status: completed}'
4472+
description: Number of fingerprint jobs which successfully completed their failure or cancelation process
4473+
y_axis_label: jobs
4474+
type: COUNTER
4475+
unit: COUNT
4476+
aggregation: AVG
4477+
derivative: NON_NEGATIVE_DERIVATIVE
4478+
- name: jobs.fingerprint.fail_or_cancel_retry_error
4479+
exported_name: jobs_fingerprint_fail_or_cancel_retry_error
4480+
labeled_name: 'jobs.fail_or_cancel{name: fingerprint, status: retry_error}'
4481+
description: Number of fingerprint jobs which failed with a retriable error on their failure or cancelation process
4482+
y_axis_label: jobs
4483+
type: COUNTER
4484+
unit: COUNT
4485+
aggregation: AVG
4486+
derivative: NON_NEGATIVE_DERIVATIVE
4487+
- name: jobs.fingerprint.protected_age_sec
4488+
exported_name: jobs_fingerprint_protected_age_sec
4489+
labeled_name: 'jobs.protected_age_sec{type: fingerprint}'
4490+
description: The age of the oldest PTS record protected by fingerprint jobs
4491+
y_axis_label: seconds
4492+
type: GAUGE
4493+
unit: SECONDS
4494+
aggregation: AVG
4495+
derivative: NONE
4496+
- name: jobs.fingerprint.protected_record_count
4497+
exported_name: jobs_fingerprint_protected_record_count
4498+
labeled_name: 'jobs.protected_record_count{type: fingerprint}'
4499+
description: Number of protected timestamp records held by fingerprint jobs
4500+
y_axis_label: records
4501+
type: GAUGE
4502+
unit: COUNT
4503+
aggregation: AVG
4504+
derivative: NONE
4505+
- name: jobs.fingerprint.resume_completed
4506+
exported_name: jobs_fingerprint_resume_completed
4507+
labeled_name: 'jobs.resume{name: fingerprint, status: completed}'
4508+
description: Number of fingerprint jobs which successfully resumed to completion
4509+
y_axis_label: jobs
4510+
type: COUNTER
4511+
unit: COUNT
4512+
aggregation: AVG
4513+
derivative: NON_NEGATIVE_DERIVATIVE
4514+
- name: jobs.fingerprint.resume_failed
4515+
exported_name: jobs_fingerprint_resume_failed
4516+
labeled_name: 'jobs.resume{name: fingerprint, status: failed}'
4517+
description: Number of fingerprint jobs which failed with a non-retriable error
4518+
y_axis_label: jobs
4519+
type: COUNTER
4520+
unit: COUNT
4521+
aggregation: AVG
4522+
derivative: NON_NEGATIVE_DERIVATIVE
4523+
- name: jobs.fingerprint.resume_retry_error
4524+
exported_name: jobs_fingerprint_resume_retry_error
4525+
labeled_name: 'jobs.resume{name: fingerprint, status: retry_error}'
4526+
description: Number of fingerprint jobs which failed with a retriable error
4527+
y_axis_label: jobs
4528+
type: COUNTER
4529+
unit: COUNT
4530+
aggregation: AVG
4531+
derivative: NON_NEGATIVE_DERIVATIVE
44334532
- name: jobs.history_retention.currently_idle
44344533
exported_name: jobs_history_retention_currently_idle
44354534
labeled_name: 'jobs{type: history_retention, status: currently_idle}'

pkg/jobs/job_info_utils.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ package jobs
88
import (
99
"bytes"
1010
"context"
11+
"encoding/binary"
1112
"fmt"
1213
"io"
1314
"strings"
1415

1516
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1617
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1719
"github.com/cockroachdb/errors"
1820
"github.com/klauspost/compress/gzip"
1921
)
@@ -108,3 +110,47 @@ func ReadChunkedFileToJobInfo(
108110

109111
return buf.Bytes(), nil
110112
}
113+
114+
// WriteUint64 writes a uint64 value to the info storage.
115+
func (i InfoStorage) WriteUint64(ctx context.Context, infoKey string, value uint64) error {
116+
buf := make([]byte, 8)
117+
binary.LittleEndian.PutUint64(buf, value)
118+
return i.Write(ctx, infoKey, buf)
119+
}
120+
121+
// GetUint64 retrieves a uint64 value from the info storage.
122+
// Returns (value, found, error).
123+
func (i InfoStorage) GetUint64(ctx context.Context, infoKey string) (uint64, bool, error) {
124+
data, found, err := i.Get(ctx, "get-uint64", infoKey)
125+
if err != nil || !found {
126+
return 0, found, err
127+
}
128+
if len(data) != 8 {
129+
return 0, false, errors.Newf("invalid uint64 data length: %d", len(data))
130+
}
131+
return binary.LittleEndian.Uint64(data), true, nil
132+
}
133+
134+
// WriteProto writes a protobuf message to the info storage.
135+
func (i InfoStorage) WriteProto(ctx context.Context, infoKey string, msg protoutil.Message) error {
136+
data, err := protoutil.Marshal(msg)
137+
if err != nil {
138+
return errors.Wrap(err, "failed to marshal proto")
139+
}
140+
return i.Write(ctx, infoKey, data)
141+
}
142+
143+
// GetProto retrieves a protobuf message from the info storage.
144+
// Returns (found, error). If found, the message is unmarshaled into msg.
145+
func (i InfoStorage) GetProto(
146+
ctx context.Context, infoKey string, msg protoutil.Message,
147+
) (bool, error) {
148+
data, found, err := i.Get(ctx, "get-proto", infoKey)
149+
if err != nil || !found {
150+
return found, err
151+
}
152+
if err := protoutil.Unmarshal(data, msg); err != nil {
153+
return false, errors.Wrap(err, "failed to unmarshal proto")
154+
}
155+
return true, nil
156+
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,18 @@ message InspectDetails {
15341534
];
15351535
}
15361536

1537+
message FingerprintDetails {
1538+
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
1539+
util.hlc.Timestamp as_of = 2 [(gogoproto.nullable) = false];
1540+
util.hlc.Timestamp start = 3 [(gogoproto.nullable) = false];
1541+
bool stripped = 4;
1542+
bytes pts = 5 [(gogoproto.customname) = "PTS", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", (gogoproto.nullable) = false];
1543+
}
1544+
1545+
message FingerprintProgress {
1546+
// Not used: progress is stored in its own info key(s) and frontier.
1547+
}
1548+
15371549
message UpdateTableMetadataCacheDetails {}
15381550
message UpdateTableMetadataCacheProgress {
15391551
enum Status {
@@ -1659,6 +1671,7 @@ message Payload {
16591671
SqlActivityFlushDetails sql_activity_flush_details = 51;
16601672
HotRangesLoggerDetails hot_ranges_logger_details = 52;
16611673
InspectDetails inspect_details = 53;
1674+
FingerprintDetails fingerprint_details = 54;
16621675
}
16631676
reserved 26;
16641677
// PauseReason is used to describe the reason that the job is currently paused
@@ -1743,6 +1756,7 @@ message Progress {
17431756
SqlActivityFlushProgress sql_activity_flush = 39;
17441757
HotRangesLoggerProgress hot_ranges_logger = 40;
17451758
InspectProgress inspect = 41;
1759+
FingerprintProgress fingerprint = 42;
17461760
}
17471761

17481762
uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
@@ -1790,6 +1804,7 @@ enum Type {
17901804
SQL_ACTIVITY_FLUSH = 31 [(gogoproto.enumvalue_customname) = "TypeSQLActivityFlush"];
17911805
HOT_RANGES_LOGGER = 32 [(gogoproto.enumvalue_customname) = "TypeHotRangesLogger"];
17921806
INSPECT = 33 [(gogoproto.enumvalue_customname) = "TypeInspect"];
1807+
FINGERPRINT = 34 [(gogoproto.enumvalue_customname) = "TypeFingerprint"];
17931808
}
17941809

17951810
message Job {

pkg/jobs/jobspb/wrap.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var (
4949
_ Details = SqlActivityFlushDetails{}
5050
_ Details = HotRangesLoggerDetails{}
5151
_ Details = InspectDetails{}
52+
_ Details = FingerprintDetails{}
5253
)
5354

5455
// ProgressDetails is a marker interface for job progress details proto structs.
@@ -82,6 +83,7 @@ var (
8283
_ ProgressDetails = SqlActivityFlushProgress{}
8384
_ ProgressDetails = HotRangesLoggerProgress{}
8485
_ ProgressDetails = InspectProgress{}
86+
_ ProgressDetails = FingerprintProgress{}
8587
)
8688

8789
// Type returns the payload's job type and panics if the type is invalid.
@@ -249,6 +251,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
249251
return TypeHotRangesLogger, nil
250252
case *Payload_InspectDetails:
251253
return TypeInspect, nil
254+
case *Payload_FingerprintDetails:
255+
return TypeFingerprint, nil
252256
default:
253257
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
254258
}
@@ -305,6 +309,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
305309
TypeSQLActivityFlush: SqlActivityFlushDetails{},
306310
TypeHotRangesLogger: HotRangesLoggerDetails{},
307311
TypeInspect: InspectDetails{},
312+
TypeFingerprint: FingerprintDetails{},
308313
}
309314

310315
// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
@@ -378,6 +383,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
378383
return &Progress_HotRangesLogger{HotRangesLogger: &d}
379384
case InspectProgress:
380385
return &Progress_Inspect{Inspect: &d}
386+
case FingerprintProgress:
387+
return &Progress_Fingerprint{Fingerprint: &d}
381388
default:
382389
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
383390
}
@@ -449,6 +456,8 @@ func (p *Payload) UnwrapDetails() Details {
449456
return *d.HotRangesLoggerDetails
450457
case *Payload_InspectDetails:
451458
return *d.InspectDetails
459+
case *Payload_FingerprintDetails:
460+
return *d.FingerprintDetails
452461
default:
453462
return nil
454463
}
@@ -520,6 +529,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
520529
return *d.HotRangesLogger
521530
case *Progress_Inspect:
522531
return d.Inspect
532+
case *Progress_Fingerprint:
533+
return d.Fingerprint
523534
default:
524535
return nil
525536
}
@@ -615,6 +626,8 @@ func WrapPayloadDetails(details Details) interface {
615626
return &Payload_HotRangesLoggerDetails{HotRangesLoggerDetails: &d}
616627
case InspectDetails:
617628
return &Payload_InspectDetails{InspectDetails: &d}
629+
case FingerprintDetails:
630+
return &Payload_FingerprintDetails{FingerprintDetails: &d}
618631
default:
619632
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
620633
}
@@ -650,7 +663,7 @@ const (
650663
func (Type) SafeValue() {}
651664

652665
// NumJobTypes is the number of jobs types.
653-
const NumJobTypes = 34
666+
const NumJobTypes = 35
654667

655668
// ChangefeedDetailsMarshaler allows for dependency injection of
656669
// cloud.SanitizeExternalStorageURI to avoid the dependency from this

pkg/roachprod/agents/opentelemetry/files/cockroachdb_metrics.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,17 @@ jobs_create_stats_fail_or_cancel_retry_error: jobs.create_stats.fail_or_cancel_r
842842
jobs_create_stats_resume_completed: jobs.create_stats.resume_completed
843843
jobs_create_stats_resume_failed: jobs.create_stats.resume_failed
844844
jobs_create_stats_resume_retry_error: jobs.create_stats.resume_retry_error
845+
jobs_fingerprint_currently_idle: jobs.fingerprint.currently_idle
846+
jobs_fingerprint_currently_paused: jobs.fingerprint.currently_paused
847+
jobs_fingerprint_currently_running: jobs.fingerprint.currently_running
848+
jobs_fingerprint_expired_pts_records: jobs.fingerprint.expired_pts_records
849+
jobs_fingerprint_fail_or_cancel_completed: jobs.fingerprint.fail_or_cancel_completed
850+
jobs_fingerprint_fail_or_cancel_retry_error: jobs.fingerprint.fail_or_cancel_retry_error
851+
jobs_fingerprint_protected_age_sec: jobs.fingerprint.protected_age_sec
852+
jobs_fingerprint_protected_record_count: jobs.fingerprint.protected_record_count
853+
jobs_fingerprint_resume_completed: jobs.fingerprint.resume_completed
854+
jobs_fingerprint_resume_failed: jobs.fingerprint.resume_failed
855+
jobs_fingerprint_resume_retry_error: jobs.fingerprint.resume_retry_error
845856
jobs_history_retention_currently_idle: jobs.history_retention.currently_idle
846857
jobs_history_retention_currently_paused: jobs.history_retention.currently_paused
847858
jobs_history_retention_currently_running: jobs.history_retention.currently_running

pkg/sql/fingerprint/BUILD.bazel

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,68 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "fingerprint",
5-
srcs = ["gatherer.go"],
5+
srcs = [
6+
"fingerprint_job.go",
7+
"gatherer.go",
8+
],
69
importpath = "github.com/cockroachdb/cockroach/pkg/sql/fingerprint",
710
visibility = ["//visibility:public"],
811
deps = [
12+
"//pkg/ccl/kvccl/kvfollowerreadsccl",
13+
"//pkg/jobs",
14+
"//pkg/jobs/joberror",
15+
"//pkg/jobs/jobfrontier",
16+
"//pkg/jobs/jobspb",
17+
"//pkg/kv",
18+
"//pkg/kv/kvpb",
19+
"//pkg/kv/kvserver/protectedts",
920
"//pkg/roachpb",
21+
"//pkg/settings/cluster",
1022
"//pkg/sql",
23+
"//pkg/sql/isql",
24+
"//pkg/util/admission/admissionpb",
1125
"//pkg/util/ctxgroup",
1226
"//pkg/util/hlc",
27+
"//pkg/util/log",
28+
"//pkg/util/retry",
1329
"//pkg/util/span",
1430
"//pkg/util/syncutil",
31+
"//pkg/util/timeutil",
32+
"//pkg/util/tracing",
33+
"//pkg/util/tracing/tracingpb",
34+
"//pkg/util/uuid",
1535
"@com_github_cockroachdb_errors//:errors",
36+
"@com_github_cockroachdb_redact//:redact",
1637
],
1738
)
1839

1940
go_test(
2041
name = "fingerprint_test",
2142
srcs = [
43+
"fingerprint_job_test.go",
2244
"gatherer_test.go",
2345
"main_test.go",
2446
],
2547
embed = [":fingerprint"],
2648
deps = [
2749
"//pkg/base",
50+
"//pkg/jobs",
51+
"//pkg/jobs/jobspb",
2852
"//pkg/roachpb",
2953
"//pkg/security/securityassets",
3054
"//pkg/security/securitytest",
55+
"//pkg/security/username",
3156
"//pkg/server",
3257
"//pkg/sql",
58+
"//pkg/testutils",
3359
"//pkg/testutils/serverutils",
60+
"//pkg/testutils/sqlutils",
3461
"//pkg/testutils/testcluster",
3562
"//pkg/util/hlc",
3663
"//pkg/util/leaktest",
64+
"//pkg/util/log",
3765
"//pkg/util/span",
66+
"@com_github_cockroachdb_errors//:errors",
3867
"@com_github_stretchr_testify//require",
3968
],
4069
)

0 commit comments

Comments
 (0)