Skip to content

Commit 433195f

Browse files
craig[bot]sumeerbhola
andcommitted
Merge #159099
159099: mma: plumbing changes for existing MMA metrics r=wenyihu6 a=sumeerbhola - Metrics are now per local store. This is similar to SMA metrics. For multi-store nodes, it is preferable to have counters reflecting what each local store was doing wrt changing leases and replicas. - The metrics were not hooked up to a MetricRegistry -- this is fixed. - MMAMetrics is moved to a different file and renamed counterMetrics. These are directly modified by the MMA code as it takes actions. Many of the existing counter metrics are unused, and there will be cleanup in subsequent PRs to remove/add metrics and hook them up. Epic: CRDB-55052 Release note: None Co-authored-by: sumeerbhola <[email protected]>
2 parents 31614dd + 0f317cd commit 433195f

File tree

15 files changed

+219
-160
lines changed

15 files changed

+219
-160
lines changed

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import (
5555
func MakeAllocatorSync(
5656
sp *storepool.StorePool, st *cluster.Settings,
5757
) *mmaintegration.AllocatorSync {
58-
mmAllocator := mmaprototype.NewAllocatorState(timeutil.DefaultTimeSource{},
58+
mmAllocator := mmaprototype.NewAllocatorState(timeutil.DefaultTimeSource{}, nil,
5959
rand.New(rand.NewSource(timeutil.Now().UnixNano())))
6060
return mmaintegration.NewAllocatorSync(sp, mmAllocator, st, nil)
6161
}

pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ func CreateTestAllocatorWithKnobs(
4747
liveness.TestTimeUntilNodeDeadOff, deterministic,
4848
func() int { return numNodes },
4949
livenesspb.NodeLivenessStatus_LIVE)
50-
mmAllocator := mmaprototype.NewAllocatorState(timeutil.DefaultTimeSource{}, rand.New(rand.NewSource(timeutil.Now().UnixNano())))
50+
mmAllocator := mmaprototype.NewAllocatorState(
51+
timeutil.DefaultTimeSource{}, nil, rand.New(rand.NewSource(timeutil.Now().UnixNano())))
5152
as := mmaintegration.NewAllocatorSync(storePool, mmAllocator, st, allocSyncKnobs)
5253
a := MakeAllocator(st, as, deterministic, func(id roachpb.NodeID) (time.Duration, bool) {
5354
return 0, true

pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"load.go",
1414
"memo_helper.go",
1515
"messages.go",
16+
"mma_metrics.go",
1617
"range_change.go",
1718
"rebalance_advisor.go",
1819
"store_load_summary.go",

pkg/kv/kvserver/allocator/mmaprototype/allocator.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type ChangeOptions struct {
3030
// be less different than integration with the old allocator.
3131
type Allocator interface {
3232
LoadSummaryForAllStores(context.Context) string
33-
Metrics() *MMAMetrics
33+
3434
// Methods to update the state of the external world. The allocator starts
3535
// with no knowledge.
3636

@@ -59,13 +59,14 @@ type Allocator interface {
5959
AdjustPendingChangeDisposition(change ExternalRangeChange, success bool)
6060

6161
// RegisterExternalChange informs this allocator about yet to complete
62-
// changes to the cluster which were not initiated by this allocator. The
63-
// ownership of all state inside change is handed off to the callee. If ok
64-
// is true, the change was registered, and the caller is returned an
65-
// ExternalRangeChange that it should subsequently use in a call to
66-
// AdjustPendingChangeDisposition when the changes are completed, either
67-
// successfully or not. If ok is false, the change was not registered.
68-
RegisterExternalChange(change PendingRangeChange) (_ ExternalRangeChange, ok bool)
62+
// changes to the cluster (on behalf of localStoreID) which were not
63+
// initiated by this allocator. The ownership of all state inside change is
64+
// handed off to the callee. If ok is true, the change was registered, and
65+
// the caller is returned an ExternalRangeChange that it should subsequently
66+
// use in a call to AdjustPendingChangeDisposition when the changes are
67+
// completed, either successfully or not. If ok is false, the change was not
68+
// registered.
69+
RegisterExternalChange(localStoreID roachpb.StoreID, change PendingRangeChange) (_ ExternalRangeChange, ok bool)
6970

7071
// ComputeChanges is called periodically and frequently, say every 10s.
7172
//

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 46 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -26,128 +26,6 @@ import (
2626
"github.com/cockroachdb/redact/interfaces"
2727
)
2828

29-
type MMAMetrics struct {
30-
DroppedDueToStateInconsistency *metric.Counter
31-
ExternalFailedToRegister *metric.Counter
32-
ExternaRegisterSuccess *metric.Counter
33-
ExternalReplicaRebalanceSuccess *metric.Counter
34-
ExternalReplicaRebalanceFailure *metric.Counter
35-
ExternalLeaseTransferSuccess *metric.Counter
36-
ExternalLeaseTransferFailure *metric.Counter
37-
MMAReplicaRebalanceSuccess *metric.Counter
38-
MMAReplicaRebalanceFailure *metric.Counter
39-
MMALeaseTransferSuccess *metric.Counter
40-
MMALeaseTransferFailure *metric.Counter
41-
MMARegisterLeaseSuccess *metric.Counter
42-
MMARegisterRebalanceSuccess *metric.Counter
43-
}
44-
45-
func makeMMAMetrics() *MMAMetrics {
46-
return &MMAMetrics{
47-
DroppedDueToStateInconsistency: metric.NewCounter(metaDroppedDueToStateInconsistency),
48-
ExternalFailedToRegister: metric.NewCounter(metaExternalFailedToRegister),
49-
ExternaRegisterSuccess: metric.NewCounter(metaExternaRegisterSuccess),
50-
MMARegisterLeaseSuccess: metric.NewCounter(metaMMARegisterLeaseSuccess),
51-
MMARegisterRebalanceSuccess: metric.NewCounter(metaMMARegisterRebalanceSuccess),
52-
ExternalReplicaRebalanceSuccess: metric.NewCounter(metaExternalReplicaRebalanceSuccess),
53-
ExternalReplicaRebalanceFailure: metric.NewCounter(metaExternalReplicaRebalanceFailure),
54-
ExternalLeaseTransferSuccess: metric.NewCounter(metaExternalLeaseTransferSuccess),
55-
ExternalLeaseTransferFailure: metric.NewCounter(metaExternalLeaseTransferFailure),
56-
MMAReplicaRebalanceSuccess: metric.NewCounter(metaMMAReplicaRebalanceSuccess),
57-
MMAReplicaRebalanceFailure: metric.NewCounter(metaMMAReplicaRebalanceFailure),
58-
MMALeaseTransferSuccess: metric.NewCounter(metaMMALeaseTransferSuccess),
59-
MMALeaseTransferFailure: metric.NewCounter(metaMMALeaseTransferFailure),
60-
}
61-
}
62-
63-
var (
64-
metaDroppedDueToStateInconsistency = metric.Metadata{
65-
Name: "mma.dropped",
66-
Help: "Number of operations dropped due to MMA state inconsistency",
67-
Measurement: "Range Rebalances",
68-
Unit: metric.Unit_COUNT,
69-
}
70-
metaExternalFailedToRegister = metric.Metadata{
71-
Name: "mma.external.dropped",
72-
Help: "Number of external operations that failed to register with MMA",
73-
Measurement: "Range Rebalances",
74-
Unit: metric.Unit_COUNT,
75-
}
76-
metaExternaRegisterSuccess = metric.Metadata{
77-
Name: "mma.external.success",
78-
Help: "Number of external operations successfully registered with MMA",
79-
Measurement: "Range Rebalances",
80-
Unit: metric.Unit_COUNT,
81-
}
82-
metaMMARegisterLeaseSuccess = metric.Metadata{
83-
Name: "mma.lease.register.success",
84-
Help: "Number of lease transfers successfully registered with MMA",
85-
Measurement: "Range Rebalances",
86-
Unit: metric.Unit_COUNT,
87-
}
88-
metaMMARegisterRebalanceSuccess = metric.Metadata{
89-
Name: "mma.rebalance.register.success",
90-
Help: "Number of rebalance operations successfully registered with MMA",
91-
Measurement: "Range Rebalances",
92-
Unit: metric.Unit_COUNT,
93-
}
94-
metaExternalReplicaRebalanceSuccess = metric.Metadata{
95-
Name: "mma.rebalances.external.success",
96-
Help: "Number of successful external replica rebalance operations",
97-
Measurement: "Range Rebalances",
98-
Unit: metric.Unit_COUNT,
99-
}
100-
101-
metaExternalLeaseTransferSuccess = metric.Metadata{
102-
Name: "mma.lease.external.success",
103-
Help: "Number of successful external lease transfer operations",
104-
Measurement: "Lease Transfers",
105-
Unit: metric.Unit_COUNT,
106-
}
107-
108-
metaExternalReplicaRebalanceFailure = metric.Metadata{
109-
Name: "mma.rebalances.external.failure",
110-
Help: "Number of failed external replica rebalance operations",
111-
Measurement: "Range Rebalances",
112-
Unit: metric.Unit_COUNT,
113-
}
114-
115-
metaExternalLeaseTransferFailure = metric.Metadata{
116-
Name: "mma.lease.external.failure",
117-
Help: "Number of failed external lease transfer operations",
118-
Measurement: "Lease Transfers",
119-
Unit: metric.Unit_COUNT,
120-
}
121-
122-
metaMMAReplicaRebalanceSuccess = metric.Metadata{
123-
Name: "mma.rebalance.success",
124-
Help: "Number of successful MMA-initiated replica rebalance operations",
125-
Measurement: "Range Rebalances",
126-
Unit: metric.Unit_COUNT,
127-
}
128-
129-
metaMMAReplicaRebalanceFailure = metric.Metadata{
130-
Name: "mma.rebalance.failure",
131-
Help: "Number of failed MMA-initiated replica rebalance operations",
132-
Measurement: "Range Rebalances",
133-
Unit: metric.Unit_COUNT,
134-
}
135-
136-
metaMMALeaseTransferSuccess = metric.Metadata{
137-
Name: "mma.lease.success",
138-
Help: "Number of successful MMA-initiated lease transfer operations",
139-
Measurement: "Lease Transfers",
140-
Unit: metric.Unit_COUNT,
141-
}
142-
143-
metaMMALeaseTransferFailure = metric.Metadata{
144-
Name: "mma.lease.failure",
145-
Help: "Number of failed MMA-initiated lease transfer operations",
146-
Measurement: "Lease Transfers",
147-
Unit: metric.Unit_COUNT,
148-
}
149-
)
150-
15129
type allocatorState struct {
15230
// Locking.
15331
//
@@ -195,9 +73,14 @@ type allocatorState struct {
19573
// try-write-lock that could quickly return with failure then we could avoid
19674
// this. We could of course build our own queueing mechanism instead of
19775
// relying on the queueing in mutex.
198-
mmaMetrics *MMAMetrics
76+
77+
// mrProvider can be nil in tests.
78+
mrProvider MetricRegistryForStoreProvider
19979
mu syncutil.Mutex
200-
cs *clusterState
80+
// TODO(sumeer): localStoreMetrics is also protected by mu. Nest in struct
81+
// with mu, when locking story is cleaned up.
82+
localStoreMetrics map[roachpb.StoreID]*counterMetrics
83+
cs *clusterState
20184

20285
// Ranges that are under-replicated, over-replicated, don't satisfy
20386
// constraints, have low diversity etc. Avoids iterating through all ranges.
@@ -212,15 +95,29 @@ type allocatorState struct {
21295

21396
var _ Allocator = &allocatorState{}
21497

215-
func NewAllocatorState(ts timeutil.TimeSource, rand *rand.Rand) *allocatorState {
98+
type MetricRegistryForStoreProvider interface {
99+
// GetStoreMetricRegistry returns the registry for the store, if it is
100+
// known, else nil.
101+
GetStoreMetricRegistry(storeID roachpb.StoreID) *metric.Registry
102+
}
103+
104+
// NewAllocatorState constructs a new implementation of Allocator.
105+
//
106+
// The metricRegistryProvider allows the allocator to lazily initialize
107+
// per-local-store metrics once the StoreID is known. It can be nil in tests,
108+
// in which case no metrics will be collected.
109+
func NewAllocatorState(
110+
ts timeutil.TimeSource, metricRegistryProvider MetricRegistryForStoreProvider, rand *rand.Rand,
111+
) *allocatorState {
216112
interner := newStringInterner()
217113
cs := newClusterState(ts, interner)
218114
return &allocatorState{
115+
mrProvider: metricRegistryProvider,
116+
localStoreMetrics: map[roachpb.StoreID]*counterMetrics{},
219117
cs: cs,
220118
rangesNeedingAttention: map[roachpb.RangeID]struct{}{},
221119
diversityScoringMemo: newDiversityScoringMemo(),
222120
rand: rand,
223-
mmaMetrics: makeMMAMetrics(),
224121
}
225122
}
226123

@@ -231,8 +128,23 @@ func NewAllocatorState(ts timeutil.TimeSource, rand *rand.Rand) *allocatorState
231128
const remoteStoreLeaseSheddingGraceDuration = 2 * time.Minute
232129
const overloadGracePeriod = time.Minute
233130

234-
func (a *allocatorState) Metrics() *MMAMetrics {
235-
return a.mmaMetrics
131+
func (a *allocatorState) ensureMetricsForLocalStoreLocked(
132+
localStoreID roachpb.StoreID,
133+
) *counterMetrics {
134+
m, ok := a.localStoreMetrics[localStoreID]
135+
if ok {
136+
return m
137+
}
138+
m = makeCounterMetrics()
139+
if a.mrProvider != nil {
140+
mr := a.mrProvider.GetStoreMetricRegistry(localStoreID)
141+
if mr == nil {
142+
panic(errors.AssertionFailedf("no MetricRegistry for store s%v", localStoreID))
143+
}
144+
mr.AddMetricStruct(*m)
145+
}
146+
a.localStoreMetrics[localStoreID] = m
147+
return m
236148
}
237149

238150
func (a *allocatorState) LoadSummaryForAllStores(ctx context.Context) string {
@@ -300,17 +212,18 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change ExternalRangeChan
300212

301213
// RegisterExternalChange implements the Allocator interface.
302214
func (a *allocatorState) RegisterExternalChange(
303-
change PendingRangeChange,
215+
localStoreID roachpb.StoreID, change PendingRangeChange,
304216
) (_ ExternalRangeChange, ok bool) {
305217
a.mu.Lock()
306218
defer a.mu.Unlock()
219+
counterMetrics := a.ensureMetricsForLocalStoreLocked(localStoreID)
307220
if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil {
308-
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
221+
counterMetrics.ExternalFailedToRegister.Inc(1)
309222
log.KvDistribution.Infof(context.Background(),
310223
"did not register external changes: due to %v", err)
311224
return ExternalRangeChange{}, false
312225
} else {
313-
a.mmaMetrics.ExternaRegisterSuccess.Inc(1)
226+
counterMetrics.ExternaRegisterSuccess.Inc(1)
314227
}
315228
a.cs.addPendingRangeChange(change)
316229
return MakeExternalRangeChange(change), true
@@ -325,7 +238,8 @@ func (a *allocatorState) ComputeChanges(
325238
if msg.StoreID != opts.LocalStoreID {
326239
panic(fmt.Sprintf("ComputeChanges: expected StoreID %d, got %d", opts.LocalStoreID, msg.StoreID))
327240
}
328-
a.cs.processStoreLeaseholderMsg(ctx, msg, a.mmaMetrics)
241+
counterMetrics := a.ensureMetricsForLocalStoreLocked(opts.LocalStoreID)
242+
a.cs.processStoreLeaseholderMsg(ctx, msg, counterMetrics)
329243
re := newRebalanceEnv(a.cs, a.rand, a.diversityScoringMemo, a.cs.ts.Now())
330244
return re.rebalanceStores(ctx, opts.LocalStoreID)
331245
}

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,13 +1348,13 @@ func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *Store
13481348
}
13491349

13501350
func (cs *clusterState) processStoreLeaseholderMsg(
1351-
ctx context.Context, msg *StoreLeaseholderMsg, metrics *MMAMetrics,
1351+
ctx context.Context, msg *StoreLeaseholderMsg, metrics *counterMetrics,
13521352
) {
13531353
cs.processStoreLeaseholderMsgInternal(ctx, msg, numTopKReplicas, metrics)
13541354
}
13551355

13561356
func (cs *clusterState) processStoreLeaseholderMsgInternal(
1357-
ctx context.Context, msg *StoreLeaseholderMsg, numTopKReplicas int, metrics *MMAMetrics,
1357+
ctx context.Context, msg *StoreLeaseholderMsg, numTopKReplicas int, metrics *counterMetrics,
13581358
) {
13591359
now := cs.ts.Now()
13601360
cs.gcPendingChanges(now)

0 commit comments

Comments
 (0)