Skip to content

Commit 92cd5cb

Browse files
craig[bot]joshimhoffarulajmani
committed
157029: admission: compute cpu time token refill rates r=sumeerbhola a=joshimhoff **admission: compute cpu time token refill rates** This commit introduces a linear model that computes cpu time token refill rates. cpuTimeTokenFiller uses this model to determine how many tokens to add per second to the buckets in cpuTimeTokenGranter. Fixes: #154471 Release note: None. 158537: storeliveness: remember when support was withdrawn from a store r=iskettaneh a=arulajmani See individual commits. Co-authored-by: Josh Imhoff <[email protected]> Co-authored-by: Arul Ajmani <[email protected]>
3 parents 81777ee + 1acb16b + 99aa4df commit 92cd5cb

22 files changed

+1309
-189
lines changed

pkg/inspectz/inspectz.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ func NewServer(
5959
mux.Handle("/inspectz/v2/kvflowcontroller", server.makeKVFlowControllerHandler(server.KVFlowControllerV2))
6060
mux.Handle(
6161
"/inspectz/storeliveness/supportFrom",
62-
server.makeStoreLivenessHandler(server.StoreLivenessSupportFrom),
62+
server.makeStoreLivenessSupportFromHandler(server.StoreLivenessSupportFrom),
6363
)
6464
mux.Handle(
6565
"/inspectz/storeliveness/supportFor",
66-
server.makeStoreLivenessHandler(server.StoreLivenessSupportFor),
66+
server.makeStoreLivenessSupportForHandler(server.StoreLivenessSupportFor),
6767
)
6868

6969
return server
@@ -112,9 +112,27 @@ func (s *Server) makeKVFlowControllerHandler(
112112
}
113113
}
114114

115-
func (s *Server) makeStoreLivenessHandler(
115+
func (s *Server) makeStoreLivenessSupportFromHandler(
116116
impl func(ctx context.Context, request *slpb.InspectStoreLivenessRequest) (
117-
*slpb.InspectStoreLivenessResponse, error,
117+
*slpb.InspectSupportFromResponse, error,
118+
),
119+
) http.HandlerFunc {
120+
return func(w http.ResponseWriter, r *http.Request) {
121+
ctx := s.AnnotateCtx(context.Background())
122+
req := &slpb.InspectStoreLivenessRequest{}
123+
resp, err := impl(ctx, req)
124+
if err != nil {
125+
log.Dev.ErrorfDepth(ctx, 1, "%s", err)
126+
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
127+
return
128+
}
129+
respond(ctx, w, http.StatusOK, resp)
130+
}
131+
}
132+
133+
func (s *Server) makeStoreLivenessSupportForHandler(
134+
impl func(ctx context.Context, request *slpb.InspectStoreLivenessRequest) (
135+
*slpb.InspectSupportForResponse, error,
118136
),
119137
) http.HandlerFunc {
120138
return func(w http.ResponseWriter, r *http.Request) {
@@ -147,20 +165,20 @@ func (s *Server) KVFlowHandlesV2(
147165
// StoreLivenessSupportFrom implements the InspectzServer interface.
148166
func (s *Server) StoreLivenessSupportFrom(
149167
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
150-
) (*slpb.InspectStoreLivenessResponse, error) {
151-
resp := &slpb.InspectStoreLivenessResponse{}
168+
) (*slpb.InspectSupportFromResponse, error) {
169+
resp := &slpb.InspectSupportFromResponse{}
152170
support, err := s.storeLiveness.InspectAllSupportFrom()
153-
resp.SupportStatesPerStore = support
171+
resp.SupportFromStatesPerStore = support
154172
return resp, err
155173
}
156174

157175
// StoreLivenessSupportFor implements the InspectzServer interface.
158176
func (s *Server) StoreLivenessSupportFor(
159177
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
160-
) (*slpb.InspectStoreLivenessResponse, error) {
161-
resp := &slpb.InspectStoreLivenessResponse{}
178+
) (*slpb.InspectSupportForResponse, error) {
179+
resp := &slpb.InspectSupportForResponse{}
162180
support, err := s.storeLiveness.InspectAllSupportFor()
163-
resp.SupportStatesPerStore = support
181+
resp.SupportForStatesPerStore = support
164182
return resp, err
165183
}
166184

pkg/inspectz/inspectzpb/inspectz.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ service Inspectz {
3232
// storeliveness.SupportManagers' views of support provided from other stores.
3333
// It's housed under /inspectz/storeliveness/supportFrom.
3434
rpc StoreLivenessSupportFrom(kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessRequest)
35-
returns (kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessResponse) {}
35+
returns (kv.kvserver.storeliveness.storelivenesspb.InspectSupportFromResponse) {}
3636

3737
// StoreLivenessSupportFor exposes the in-memory state of all stores'
3838
// storeliveness.SupportManagers' views of support provided for other stores.
3939
// It's housed under /inspectz/storeliveness/supportFor.
4040
rpc StoreLivenessSupportFor(kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessRequest)
41-
returns (kv.kvserver.storeliveness.storelivenesspb.InspectStoreLivenessResponse) {}
41+
returns (kv.kvserver.storeliveness.storelivenesspb.InspectSupportForResponse) {}
4242

4343
}
4444

pkg/inspectz/unsupported.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ func (u Unsupported) KVFlowHandlesV2(
5151
// StoreLivenessSupportFrom is part of the inspectzpb.InspectzServer interface.
5252
func (u Unsupported) StoreLivenessSupportFrom(
5353
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
54-
) (*slpb.InspectStoreLivenessResponse, error) {
54+
) (*slpb.InspectSupportFromResponse, error) {
5555
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
5656
}
5757

5858
// StoreLivenessSupportFor is part of the inspectzpb.InspectzServer interface.
5959
func (u Unsupported) StoreLivenessSupportFor(
6060
_ context.Context, _ *slpb.InspectStoreLivenessRequest,
61-
) (*slpb.InspectStoreLivenessResponse, error) {
61+
) (*slpb.InspectSupportForResponse, error) {
6262
return nil, errorutil.UnsupportedUnderClusterVirtualization(errorutil.FeatureNotAvailableToNonSystemTenantsIssue)
6363
}

pkg/kv/kvserver/storeliveness/fabric.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ type Fabric interface {
7373
// InspectFabric is an interface that exposes all in-memory support state for a
7474
// given store. It is used to power the Store Liveness /inspectz functionality.
7575
type InspectFabric interface {
76-
InspectSupportFrom() slpb.SupportStatesPerStore
77-
InspectSupportFor() slpb.SupportStatesPerStore
76+
InspectSupportFrom() slpb.InspectSupportFromStatesPerStore
77+
InspectSupportFor() slpb.InspectSupportForStatesPerStore
7878
}

pkg/kv/kvserver/storeliveness/store_liveness_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ func TestStoreLiveness(t *testing.T) {
120120
case "debug-supporter-state":
121121
var sortedSupportMap []string
122122
for _, support := range sm.supporterStateHandler.supporterState.supportFor {
123-
sortedSupportMap = append(sortedSupportMap, fmt.Sprintf("%+v", support))
123+
sortedSupportMap = append(sortedSupportMap, fmt.Sprintf(
124+
"%+v lastSupportWithdrawnTime:%s",
125+
support.SupportState,
126+
support.lastSupportWithdrawnTime,
127+
))
124128
}
125129
slices.Sort(sortedSupportMap)
126130
return fmt.Sprintf(

pkg/kv/kvserver/storeliveness/storelivenesspb/service.proto

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,19 +155,42 @@ message SupportState {
155155
util.hlc.Timestamp expiration = 3 [(gogoproto.nullable) = false];
156156
}
157157

158+
// InspectSupportForState is used for observability via inspectz. It wraps
159+
// SupportState and includes additional non-persisted metadata like
160+
// LastSupportWithdrawnTime.
161+
message InspectSupportForState {
162+
SupportState support_state = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
163+
// LastSupportWithdrawnTime is the timestamp at which support was last
164+
// withdrawn from the remote store. This field is not persisted to disk.
165+
util.hlc.Timestamp last_support_withdrawn_time = 2 [(gogoproto.nullable) = false];
166+
}
167+
158168
// InspectStoreLivenessRequest is used to power the Store Liveness /inspectz
159169
// functionality. The request doesn't take any parameters.
160170
message InspectStoreLivenessRequest {}
161171

162-
// InspectStoreLivenessRequest is used to power the Store Liveness /inspectz
163-
// functionality. The response is a list of SupportStatesPerStore.
164-
message InspectStoreLivenessResponse {
165-
repeated SupportStatesPerStore support_states_per_store = 1 [(gogoproto.nullable) = false];
172+
// InspectSupportFromResponse is used to power the Store Liveness /inspectz
173+
// supportFrom functionality.
174+
message InspectSupportFromResponse {
175+
repeated InspectSupportFromStatesPerStore support_from_states_per_store = 1 [(gogoproto.nullable) = false];
176+
}
177+
178+
// InspectSupportForResponse is used to power the Store Liveness /inspectz
179+
// supportFor functionality.
180+
message InspectSupportForResponse {
181+
repeated InspectSupportForStatesPerStore support_for_states_per_store = 1 [(gogoproto.nullable) = false];
182+
}
183+
184+
// InspectSupportFromStatesPerStore includes all SupportStates for a given
185+
// store; they correspond to the support-from map of a given store.
186+
message InspectSupportFromStatesPerStore {
187+
StoreIdent store_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "StoreID"];
188+
repeated SupportState support_from_states = 2 [(gogoproto.nullable) = false];
166189
}
167190

168-
// SupportStatesPerStore includes all SupportStates for a given store; they
169-
// correspond to either the support-from or support-for map of a given store.
170-
message SupportStatesPerStore {
191+
// InspectSupportForStatesPerStore includes all InspectSupportForStates for a
192+
// given store; they correspond to the support-for map of a given store.
193+
message InspectSupportForStatesPerStore {
171194
StoreIdent store_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "StoreID"];
172-
repeated SupportState support_states = 2 [(gogoproto.nullable) = false];
195+
repeated InspectSupportForState support_for_states = 2 [(gogoproto.nullable) = false];
173196
}

pkg/kv/kvserver/storeliveness/support_manager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ func (sm *SupportManager) SupportFor(id slpb.StoreIdent) (slpb.Epoch, bool) {
121121
}
122122

123123
// InspectSupportFrom implements the InspectFabric interface.
124-
func (sm *SupportManager) InspectSupportFrom() slpb.SupportStatesPerStore {
125-
supportStates := sm.requesterStateHandler.exportAllSupportFrom()
126-
return slpb.SupportStatesPerStore{StoreID: sm.storeID, SupportStates: supportStates}
124+
func (sm *SupportManager) InspectSupportFrom() slpb.InspectSupportFromStatesPerStore {
125+
supportFromStates := sm.requesterStateHandler.exportAllSupportFrom()
126+
return slpb.InspectSupportFromStatesPerStore{StoreID: sm.storeID, SupportFromStates: supportFromStates}
127127
}
128128

129129
// InspectSupportFor implements the InspectFabric interface.
130-
func (sm *SupportManager) InspectSupportFor() slpb.SupportStatesPerStore {
131-
supportStates := sm.supporterStateHandler.exportAllSupportFor()
132-
return slpb.SupportStatesPerStore{StoreID: sm.storeID, SupportStates: supportStates}
130+
func (sm *SupportManager) InspectSupportFor() slpb.InspectSupportForStatesPerStore {
131+
supportForStates := sm.supporterStateHandler.exportAllSupportFor()
132+
return slpb.InspectSupportForStatesPerStore{StoreID: sm.storeID, SupportForStates: supportForStates}
133133
}
134134

135135
// SupportFrom implements the Fabric interface. It delegates the response to the

pkg/kv/kvserver/storeliveness/supporter_state.go

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,25 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1818
)
1919

20+
// supportState contains state pertaining to support provided to a single remote
21+
// store.
22+
type supportState struct {
23+
// SupportState contains all fields that must be persisted to disk.
24+
slpb.SupportState
25+
26+
// lastSupportWithdrawnTime is the timestamp at which support was last
27+
// withdrawn for the remote store.
28+
lastSupportWithdrawnTime hlc.Timestamp
29+
}
30+
2031
// supporterState stores the core data structures for providing support.
2132
type supporterState struct {
2233
// meta stores the SupporterMeta, including the max timestamp at which this
2334
// store has withdrawn support.
2435
meta slpb.SupporterMeta
25-
// supportFor stores the SupportState for each remote store for which this
36+
// supportFor stores the supportState for each remote store for which this
2637
// store has provided support.
27-
supportFor map[slpb.StoreIdent]slpb.SupportState
38+
supportFor map[slpb.StoreIdent]supportState
2839
}
2940

3041
// supporterStateHandler is the main interface for handling support for other
@@ -64,15 +75,15 @@ func newSupporterStateHandler() *supporterStateHandler {
6475
ssh := &supporterStateHandler{
6576
supporterState: supporterState{
6677
meta: slpb.SupporterMeta{},
67-
supportFor: make(map[slpb.StoreIdent]slpb.SupportState),
78+
supportFor: make(map[slpb.StoreIdent]supportState),
6879
},
6980
}
7081
ssh.update.Store(
7182
&supporterStateForUpdate{
7283
checkedIn: &ssh.supporterState,
7384
inProgress: supporterState{
7485
meta: slpb.SupporterMeta{},
75-
supportFor: make(map[slpb.StoreIdent]slpb.SupportState),
86+
supportFor: make(map[slpb.StoreIdent]supportState),
7687
},
7788
},
7889
)
@@ -101,7 +112,7 @@ type supporterStateForUpdate struct {
101112
func (ssh *supporterStateHandler) getSupportFor(id slpb.StoreIdent) slpb.SupportState {
102113
ssh.mu.RLock()
103114
defer ssh.mu.RUnlock()
104-
return ssh.supporterState.supportFor[id]
115+
return ssh.supporterState.supportFor[id].SupportState
105116
}
106117

107118
// getNumSupportFor returns the size of the supporterState.supportFor map.
@@ -111,16 +122,19 @@ func (ssh *supporterStateHandler) getNumSupportFor() int {
111122
return len(ssh.supporterState.supportFor)
112123
}
113124

114-
// exportAllSupportFor exports a copy of all SupportStates from the
125+
// exportAllSupportFor exports a copy of all InspectSupportForStates from the
115126
// supporterState.supportFor map.
116-
func (ssh *supporterStateHandler) exportAllSupportFor() []slpb.SupportState {
127+
func (ssh *supporterStateHandler) exportAllSupportFor() []slpb.InspectSupportForState {
117128
ssh.mu.RLock()
118129
defer ssh.mu.RUnlock()
119-
supportStates := make([]slpb.SupportState, len(ssh.supporterState.supportFor))
130+
supportForStates := make([]slpb.InspectSupportForState, 0, len(ssh.supporterState.supportFor))
120131
for _, ss := range ssh.supporterState.supportFor {
121-
supportStates = append(supportStates, ss)
132+
supportForStates = append(supportForStates, slpb.InspectSupportForState{
133+
SupportState: ss.SupportState,
134+
LastSupportWithdrawnTime: ss.lastSupportWithdrawnTime,
135+
})
122136
}
123-
return supportStates
137+
return supportForStates
124138
}
125139

126140
// Functions for handling supporterState updates.
@@ -146,13 +160,11 @@ func (ssfu *supporterStateForUpdate) getMeta() slpb.SupporterMeta {
146160
return ssfu.checkedIn.meta
147161
}
148162

149-
// getSupportFor returns the SupportState from the inProgress view; if not
150-
// present, it falls back to the SupportState from the checkedIn view.
163+
// getSupportFor returns the supportState from the inProgress view; if not
164+
// present, it falls back to the supportState from the checkedIn view.
151165
// The returned boolean indicates whether the store is present in the supportFor
152166
// map; it does NOT indicate whether support is provided.
153-
func (ssfu *supporterStateForUpdate) getSupportFor(
154-
storeID slpb.StoreIdent,
155-
) (slpb.SupportState, bool) {
167+
func (ssfu *supporterStateForUpdate) getSupportFor(storeID slpb.StoreIdent) (supportState, bool) {
156168
ss, ok := ssfu.inProgress.supportFor[storeID]
157169
if !ok {
158170
ss, ok = ssfu.checkedIn.supportFor[storeID]
@@ -180,7 +192,8 @@ func (ssfu *supporterStateForUpdate) write(ctx context.Context, b storage.Batch)
180192
}
181193
}
182194
for _, ss := range ssfu.inProgress.supportFor {
183-
if err := writeSupportForState(ctx, b, ss); err != nil {
195+
// Only write the persistent SupportState to disk.
196+
if err := writeSupportForState(ctx, b, ss.SupportState); err != nil {
184197
return err
185198
}
186199
}
@@ -201,9 +214,9 @@ func (ssh *supporterStateHandler) read(ctx context.Context, r storage.Reader) er
201214
ssh.mu.Lock()
202215
defer ssh.mu.Unlock()
203216
ssh.supporterState.meta = meta
204-
ssh.supporterState.supportFor = make(map[slpb.StoreIdent]slpb.SupportState, len(supportFor))
217+
ssh.supporterState.supportFor = make(map[slpb.StoreIdent]supportState, len(supportFor))
205218
for _, s := range supportFor {
206-
ssh.supporterState.supportFor[s.Target] = s
219+
ssh.supporterState.supportFor[s.Target] = supportState{SupportState: s}
207220
}
208221
return nil
209222
}
@@ -256,12 +269,15 @@ func (ssfu *supporterStateForUpdate) handleHeartbeat(
256269
from := msg.From
257270
ss, ok := ssfu.getSupportFor(from)
258271
if !ok {
259-
ss = slpb.SupportState{Target: from}
272+
ss = supportState{SupportState: slpb.SupportState{Target: from}}
260273
}
261274
ssNew := handleHeartbeat(ss, msg)
275+
assert(ss.lastSupportWithdrawnTime == ssNew.lastSupportWithdrawnTime,
276+
"lastSupportWithdrawnTime changed on successful heartbeat",
277+
)
262278
if ss != ssNew {
263279
ssfu.inProgress.supportFor[from] = ssNew
264-
logSupportForChange(ctx, ss, ssNew)
280+
logSupportForChange(ctx, ss.SupportState, ssNew.SupportState)
265281
}
266282
return slpb.Message{
267283
Type: slpb.MsgHeartbeatResp,
@@ -274,7 +290,7 @@ func (ssfu *supporterStateForUpdate) handleHeartbeat(
274290

275291
// handleHeartbeat contains the core logic for updating the epoch and expiration
276292
// of a support requester upon receiving a heartbeat.
277-
func handleHeartbeat(ss slpb.SupportState, msg *slpb.Message) slpb.SupportState {
293+
func handleHeartbeat(ss supportState, msg *slpb.Message) supportState {
278294
assert(!msg.Expiration.IsEmpty(), "requested support with zero expiration")
279295
if ss.Epoch == msg.Epoch {
280296
ss.Expiration.Forward(msg.Expiration)
@@ -315,10 +331,10 @@ func (ssfu *supporterStateForUpdate) withdrawSupport(
315331
)
316332
supportWithdrawnForStoreIDs = make(map[roachpb.StoreID]struct{})
317333
for id, ss := range ssfu.checkedIn.supportFor {
318-
ssNew := maybeWithdrawSupport(ss, now)
319-
if ss != ssNew {
334+
ssNew, withdrawn := maybeWithdrawSupport(ss, now)
335+
if withdrawn {
320336
ssfu.inProgress.supportFor[id] = ssNew
321-
log.KvExec.Infof(ctx, "withdrew support for %s", supportChangeStr(ss, ssNew))
337+
log.KvExec.Infof(ctx, "withdrew support for %s", supportChangeStr(ss.SupportState, ssNew.SupportState))
322338
meta := ssfu.getMeta()
323339
if meta.MaxWithdrawn.Forward(now) {
324340
ssfu.inProgress.meta = meta
@@ -330,11 +346,15 @@ func (ssfu *supporterStateForUpdate) withdrawSupport(
330346
}
331347

332348
// maybeWithdrawSupport contains the core logic for updating the epoch and
333-
// expiration of a support requester when withdrawing support.
334-
func maybeWithdrawSupport(ss slpb.SupportState, now hlc.ClockTimestamp) slpb.SupportState {
349+
// expiration of a support requester when withdrawing support. If support is
350+
// withdrawn, lastSupportWithdrawnTime is updated to the current timestamp and
351+
// a boolean indicating as such is returned.
352+
func maybeWithdrawSupport(ss supportState, now hlc.ClockTimestamp) (supportState, bool) {
335353
if !ss.Expiration.IsEmpty() && ss.Expiration.LessEq(now.ToTimestamp()) {
336354
ss.Epoch++
337355
ss.Expiration = hlc.Timestamp{}
356+
ss.lastSupportWithdrawnTime = now.ToTimestamp()
357+
return ss, true
338358
}
339-
return ss
359+
return ss, false
340360
}

pkg/kv/kvserver/storeliveness/testdata/basic

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ debug-supporter-state
4949
meta:
5050
{MaxWithdrawn:201.000000000,0}
5151
support for:
52-
{Target:{NodeID:2 StoreID:2} Epoch:3 Expiration:0,0}
52+
{Target:{NodeID:2 StoreID:2} Epoch:3 Expiration:0,0} lastSupportWithdrawnTime:201.000000000,0
5353

5454
debug-metrics
5555
----

0 commit comments

Comments
 (0)