Skip to content

Commit 86ca2bd

Browse files
authored
Merge pull request #159190 from jasonlmfong/blathers/backport-release-26.1-158716
release-26.1: pkg/cli: allow tsdump to also dump the labeled child metrics
2 parents d60e6ad + c20999f commit 86ca2bd

File tree

6 files changed

+204
-23
lines changed

6 files changed

+204
-23
lines changed

pkg/cli/tsdump_upload.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,12 @@ func (d *datadogWriter) dump(kv *roachpb.KeyValue) (*datadogV2.MetricSeries, err
321321
if err := kv.Value.GetProto(&idata); err != nil {
322322
return nil, err
323323
}
324+
braceIdx := strings.IndexByte(name, '{')
325+
if braceIdx != -1 {
326+
// has child labels
327+
// TODO(jasonlmfong): support uploading child labels
328+
return nil, errors.Errorf("timeseries key with child labels: %s", name)
329+
}
324330

325331
series := &datadogV2.MetricSeries{
326332
Metric: name,

pkg/server/status/recorder.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -813,25 +813,6 @@ type registryRecorder struct {
813813
childMetricNameCache *syncutil.Map[uint64, cacheEntry]
814814
}
815815

816-
// allowedChangefeedMetrics is the list of changefeed metrics that should have
817-
// child metrics collected and recorded to TSDB. This is a curated list to prevent
818-
// unbounded cardinality while still capturing the most important per-changefeed metrics.
819-
var allowedChangefeedMetrics = map[string]struct{}{
820-
"changefeed.max_behind_nanos": {},
821-
"changefeed.error_retries": {},
822-
"changefeed.internal_retry_message_count": {},
823-
"changefeed.stage.downstream_client_send.latency": {},
824-
"changefeed.emitted_messages": {},
825-
"changefeed.sink_backpressure_nanos": {},
826-
"changefeed.backfill_pending_ranges": {},
827-
"changefeed.sink_io_inflight": {},
828-
"changefeed.lagging_ranges": {},
829-
"changefeed.aggregator_progress": {},
830-
"changefeed.checkpoint_progress": {},
831-
"changefeed.emitted_batch_sizes": {},
832-
"changefeed.total_ranges": {},
833-
}
834-
835816
// extractValue extracts the metric value(s) for the given metric and passes it, along with the metric name, to the
836817
// provided callback function.
837818
func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
@@ -1023,7 +1004,7 @@ func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesD
10231004

10241005
labels := rr.registry.GetLabels()
10251006
rr.registry.Each(func(name string, v interface{}) {
1026-
if _, allowed := allowedChangefeedMetrics[name]; !allowed {
1007+
if _, allowed := tsutil.AllowedChildMetrics[name]; !allowed {
10271008
return
10281009
}
10291010
// Check if the metric has child collection enabled in its metadata

pkg/server/status/recorder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,8 +1283,8 @@ func BenchmarkRecordChangefeedChildMetrics(b *testing.B) {
12831283
enableChildCollection := true
12841284

12851285
// Get metrics from the allowed list and convert to slice for indexing
1286-
allowedMetricsList := make([]string, 0, len(allowedChangefeedMetrics))
1287-
for metricName := range allowedChangefeedMetrics {
1286+
allowedMetricsList := make([]string, 0, len(tsutil.AllowedChildMetrics))
1287+
for metricName := range tsutil.AllowedChildMetrics {
12881288
allowedMetricsList = append(allowedMetricsList, metricName)
12891289
}
12901290

pkg/ts/server.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
1616
"github.com/cockroachdb/cockroach/pkg/roachpb"
1717
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
18+
"github.com/cockroachdb/cockroach/pkg/ts/tsutil"
1819
"github.com/cockroachdb/cockroach/pkg/util/log"
1920
"github.com/cockroachdb/cockroach/pkg/util/metric"
2021
"github.com/cockroachdb/cockroach/pkg/util/mon"
@@ -481,12 +482,32 @@ func dumpImpl(
481482
ResolutionFromProto(res),
482483
req.StartNanos,
483484
req.EndNanos,
485+
false,
484486
d,
485487
); err != nil {
486488
return err
487489
}
488490
}
489491
}
492+
493+
// Dump child metrics only for allowed metrics at 1M resolution
494+
for _, seriesName := range req.Names {
495+
if !tsutil.IsAllowedChildMetric(seriesName) {
496+
continue
497+
}
498+
if err := dumpTimeseriesAllSources(
499+
ctx,
500+
db,
501+
seriesName,
502+
Resolution1m,
503+
req.StartNanos,
504+
req.EndNanos,
505+
true,
506+
d,
507+
); err != nil {
508+
return err
509+
}
510+
}
490511
return nil
491512
}
492513

@@ -536,6 +557,7 @@ func dumpTimeseriesAllSources(
536557
seriesName string,
537558
diskResolution Resolution,
538559
startNanos, endNanos int64,
560+
includeChildMetrics bool,
539561
dump func(*roachpb.KeyValue) error,
540562
) error {
541563
if endNanos == 0 {
@@ -548,12 +570,20 @@ func dumpTimeseriesAllSources(
548570
endNanos += delta
549571
}
550572

573+
var endKeyName string
574+
if includeChildMetrics {
575+
// Create a span that covers the metric's children.
576+
endKeyName = seriesName + string(rune(0x7C)) // '|' is the next char after '{'
577+
} else {
578+
endKeyName = seriesName
579+
}
580+
551581
span := &roachpb.Span{
552582
Key: MakeDataKey(
553583
seriesName, "" /* source */, diskResolution, startNanos,
554584
),
555585
EndKey: MakeDataKey(
556-
seriesName, "" /* source */, diskResolution, endNanos,
586+
endKeyName, "" /* source */, diskResolution, endNanos,
557587
),
558588
}
559589

pkg/ts/server_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"io"
1212
"reflect"
1313
"sort"
14+
"strings"
1415
"testing"
1516
"unsafe"
1617

@@ -877,6 +878,137 @@ func TestServerDump(t *testing.T) {
877878
}
878879
}
879880

881+
func TestServerDumpChildMetrics(t *testing.T) {
882+
defer leaktest.AfterTest(t)()
883+
defer log.Scope(t).Close(t)
884+
885+
ctx := context.Background()
886+
887+
s := serverutils.StartServerOnly(t, base.TestServerArgs{
888+
// For now, direct access to the tsdb is reserved to the storage layer.
889+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
890+
891+
Knobs: base.TestingKnobs{
892+
Store: &kvserver.StoreTestingKnobs{
893+
DisableTimeSeriesMaintenanceQueue: true,
894+
},
895+
},
896+
})
897+
defer s.Stopper().Stop(ctx)
898+
899+
tsdb := s.TsDB().(*ts.DB)
900+
901+
// Store parent metric (without labels)
902+
parentMetric := "cr.node.changefeed.emitted_messages"
903+
if err := tsdb.StoreData(ctx, ts.Resolution10s, []tspb.TimeSeriesData{
904+
{
905+
Name: parentMetric,
906+
Source: "1",
907+
Datapoints: []tspb.TimeSeriesDatapoint{
908+
{TimestampNanos: 100 * 1e9, Value: 1000.0},
909+
{TimestampNanos: 200 * 1e9, Value: 2000.0},
910+
},
911+
},
912+
}); err != nil {
913+
t.Fatal(err)
914+
}
915+
916+
// Store child metrics (with labels encoded in name)
917+
childMetric1 := fmt.Sprintf(`%s{feed_id="123",scope="default"}`, parentMetric)
918+
childMetric2 := fmt.Sprintf(`%s{feed_id="456",scope="system"}`, parentMetric)
919+
if err := tsdb.StoreData(ctx, ts.Resolution1m, []tspb.TimeSeriesData{
920+
{
921+
Name: childMetric1,
922+
Source: "1",
923+
Datapoints: []tspb.TimeSeriesDatapoint{
924+
{TimestampNanos: 100 * 1e9, Value: 500.0},
925+
{TimestampNanos: 200 * 1e9, Value: 1500.0},
926+
},
927+
},
928+
{
929+
Name: childMetric2,
930+
Source: "1",
931+
Datapoints: []tspb.TimeSeriesDatapoint{
932+
{TimestampNanos: 100 * 1e9, Value: 300.0},
933+
{TimestampNanos: 200 * 1e9, Value: 800.0},
934+
},
935+
},
936+
}); err != nil {
937+
t.Fatal(err)
938+
}
939+
940+
conn := s.RPCClientConn(t, username.RootUserName())
941+
client := conn.NewTimeSeriesClient()
942+
943+
t.Run("includes child metrics", func(t *testing.T) {
944+
dumpClient, err := client.Dump(ctx, &tspb.DumpRequest{
945+
Names: []string{parentMetric},
946+
StartNanos: 100 * 1e9,
947+
EndNanos: 300 * 1e9,
948+
})
949+
require.NoError(t, err)
950+
951+
resultMap := make(map[string][]tspb.TimeSeriesDatapoint)
952+
for {
953+
msg, err := dumpClient.Recv()
954+
if err == io.EOF {
955+
break
956+
}
957+
require.NoError(t, err)
958+
resultMap[msg.Name] = append(resultMap[msg.Name], msg.Datapoints...)
959+
}
960+
961+
// Should have parent metric AND both child metrics
962+
require.Contains(t, resultMap, parentMetric, "parent metric should be included")
963+
require.Contains(t, resultMap, childMetric1, "child metric 1 should be included")
964+
require.Contains(t, resultMap, childMetric2, "child metric 2 should be included")
965+
require.Equal(t, 3, len(resultMap), "should have parent and both child metrics")
966+
967+
// Verify data correctness for parent metric
968+
require.Len(t, resultMap[parentMetric], 2, "parent metric should have 2 datapoints")
969+
require.Equal(t, 1000.0, resultMap[parentMetric][0].Value)
970+
require.Equal(t, 2000.0, resultMap[parentMetric][1].Value)
971+
972+
// Verify data correctness for child metrics
973+
require.Len(t, resultMap[childMetric1], 2, "child metric 1 should have 2 datapoints")
974+
require.Equal(t, 500.0, resultMap[childMetric1][0].Value)
975+
require.Equal(t, 1500.0, resultMap[childMetric1][1].Value)
976+
977+
require.Len(t, resultMap[childMetric2], 2, "child metric 2 should have 2 datapoints")
978+
require.Equal(t, 300.0, resultMap[childMetric2][0].Value)
979+
require.Equal(t, 800.0, resultMap[childMetric2][1].Value)
980+
})
981+
982+
t.Run("DumpRaw sees child metrics", func(t *testing.T) {
983+
dumpRawClient, err := client.DumpRaw(ctx, &tspb.DumpRequest{
984+
Names: []string{parentMetric},
985+
StartNanos: 100 * 1e9,
986+
EndNanos: 300 * 1e9,
987+
})
988+
require.NoError(t, err)
989+
990+
var kvCount int
991+
seenMetrics := make(map[string]bool)
992+
for {
993+
kv, err := dumpRawClient.Recv()
994+
if err == io.EOF {
995+
break
996+
}
997+
require.NoError(t, err)
998+
kvCount++
999+
// Decode the key to verify it contains child metrics
1000+
// The key contains the encoded metric name
1001+
keyStr := string(kv.Key)
1002+
if strings.Contains(keyStr, "{") {
1003+
seenMetrics["child"] = true
1004+
}
1005+
}
1006+
1007+
require.Greater(t, kvCount, 0, "should have raw KVs")
1008+
require.True(t, seenMetrics["child"], "should have seen child metrics in raw dump")
1009+
})
1010+
}
1011+
8801012
func BenchmarkServerQuery(b *testing.B) {
8811013
defer log.Scope(b).Close(b)
8821014

pkg/ts/tsutil/util.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,38 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
1515
)
1616

17+
// AllowedChildMetrics is the list of metrics that should have child metrics
18+
// collected and recorded to TSDB. This is a curated list to prevent unbounded
19+
// cardinality while still capturing the most important per-changefeed metrics.
20+
var AllowedChildMetrics = map[string]struct{}{
21+
"changefeed.max_behind_nanos": {},
22+
"changefeed.error_retries": {},
23+
"changefeed.internal_retry_message_count": {},
24+
"changefeed.stage.downstream_client_send.latency": {},
25+
"changefeed.emitted_messages": {},
26+
"changefeed.sink_backpressure_nanos": {},
27+
"changefeed.backfill_pending_ranges": {},
28+
"changefeed.sink_io_inflight": {},
29+
"changefeed.lagging_ranges": {},
30+
"changefeed.aggregator_progress": {},
31+
"changefeed.checkpoint_progress": {},
32+
"changefeed.emitted_batch_sizes": {},
33+
"changefeed.total_ranges": {},
34+
}
35+
36+
// IsAllowedChildMetric checks if a metric name matches one of the allowed child metrics.
37+
func IsAllowedChildMetric(name string) bool {
38+
metricName := name
39+
for _, prefix := range []string{"cr.node.", "cr.store."} {
40+
if strings.HasPrefix(metricName, prefix) {
41+
metricName = strings.TrimPrefix(metricName, prefix)
42+
break
43+
}
44+
}
45+
_, ok := AllowedChildMetrics[metricName]
46+
return ok
47+
}
48+
1749
// DumpRawTo is a helper that gob-encodes all messages received from the
1850
// source stream to the given WriteCloser.
1951
func DumpRawTo(src tspb.RPCTimeSeries_DumpRawClient, out io.Writer) error {

0 commit comments

Comments
 (0)