Skip to content

Commit 1acb16b

Browse files
committed
admission: compute cpu time token refill rates
This commit introduces a linear model that computes cpu time token refill rates. cpuTimeTokenFiller uses this model to determine how many tokens to add per second to the buckets in cpuTimeTokenGranter. Fixes: #154471 Release note: None.
1 parent 02c7429 commit 1acb16b

File tree

7 files changed

+1000
-84
lines changed

7 files changed

+1000
-84
lines changed

pkg/util/admission/cpu_time_token_filler.go

Lines changed: 447 additions & 47 deletions
Large diffs are not rendered by default.

pkg/util/admission/cpu_time_token_filler_test.go

Lines changed: 288 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,20 @@
66
package admission
77

88
import (
9+
"context"
910
"fmt"
11+
"math"
1012
"strings"
1113
"testing"
1214
"time"
1315

16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1417
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
1518
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1619
"github.com/cockroachdb/cockroach/pkg/util/log"
1720
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1821
"github.com/cockroachdb/datadriven"
22+
"github.com/stretchr/testify/require"
1923
)
2024

2125
func TestCPUTimeTokenFiller(t *testing.T) {
@@ -68,6 +72,8 @@ type testTokenAllocator struct {
6872
buf *strings.Builder
6973
}
7074

75+
func (m *testTokenAllocator) init() {}
76+
7177
func (a *testTokenAllocator) resetInterval() {
7278
fmt.Fprintf(a.buf, "resetInterval()\n")
7379
}
@@ -76,6 +82,31 @@ func (a *testTokenAllocator) allocateTokens(remainingTicks int64) {
7682
fmt.Fprintf(a.buf, "allocateTokens(%d)\n", remainingTicks)
7783
}
7884

85+
type testModel struct {
86+
buf *strings.Builder
87+
rates rates
88+
}
89+
90+
func (m *testModel) init() {}
91+
92+
func (m *testModel) fit(targets targetUtilizations) rates {
93+
// targets uses float64, which when written to golden file can lead to
94+
// test reproducibility issues. Here, we multiply by 100 & then round to
95+
// the nearest integer.
96+
round := func(x float64) int {
97+
scaled := x * 100
98+
return int(math.Round(scaled))
99+
}
100+
fmt.Fprint(m.buf, "fit(\n")
101+
for tier := int(numResourceTiers - 1); tier >= 0; tier-- {
102+
for qual := int(numBurstQualifications - 1); qual >= 0; qual-- {
103+
fmt.Fprintf(m.buf, "\ttier%d %s -> %v%%\n", tier, burstQualification(qual).String(), round(targets[tier][qual]))
104+
}
105+
}
106+
fmt.Fprint(m.buf, ")\n")
107+
return m.rates
108+
}
109+
79110
func TestCPUTimeTokenAllocator(t *testing.T) {
80111
defer leaktest.AfterTest(t)()
81112
defer log.Scope(t).Close(t)
@@ -101,43 +132,284 @@ func TestCPUTimeTokenAllocator(t *testing.T) {
101132
granter.requester[testTier0] = requesters[testTier0]
102133
granter.requester[testTier1] = requesters[testTier1]
103134

104-
allocator := cpuTimeTokenAllocator{
105-
granter: granter,
106-
}
107-
allocator.rates[testTier0][canBurst] = 5
108-
allocator.rates[testTier0][noBurst] = 4
109-
allocator.rates[testTier1][canBurst] = 3
110-
allocator.rates[testTier1][noBurst] = 2
111-
allocator.bucketCapacity = allocator.rates
112-
113135
var buf strings.Builder
114-
flushAndReset := func(printGranter bool) string {
115-
if printGranter {
116-
fmt.Fprint(&buf, granter.String())
117-
}
136+
flushAndReset := func() string {
137+
fmt.Fprint(&buf, granter.String())
118138
str := buf.String()
119139
buf.Reset()
120140
return str
121141
}
122142

143+
model := &testModel{buf: &buf}
144+
model.rates[testTier0][canBurst] = 5
145+
model.rates[testTier0][noBurst] = 4
146+
model.rates[testTier1][canBurst] = 3
147+
model.rates[testTier1][noBurst] = 2
148+
allocator := cpuTimeTokenAllocator{
149+
granter: granter,
150+
settings: cluster.MakeClusterSettings(),
151+
model: model,
152+
}
153+
123154
datadriven.RunTest(t, datapathutils.TestDataPath(t, "cpu_time_token_allocator"), func(t *testing.T, d *datadriven.TestData) string {
124155
switch d.Cmd {
125156
case "resetInterval":
157+
var increaseRatesBy int64
158+
d.MaybeScanArgs(t, "increase_rates_by", &increaseRatesBy)
159+
if increaseRatesBy != 0 {
160+
model.rates[testTier0][canBurst] += increaseRatesBy
161+
model.rates[testTier0][noBurst] += increaseRatesBy
162+
model.rates[testTier1][canBurst] += increaseRatesBy
163+
model.rates[testTier1][noBurst] += increaseRatesBy
164+
}
126165
allocator.resetInterval()
127-
return flushAndReset(false /* printGranter */)
166+
return flushAndReset()
128167
case "allocate":
129168
var remainingTicks int64
130169
d.ScanArgs(t, "remaining", &remainingTicks)
131170
allocator.allocateTokens(remainingTicks)
132-
return flushAndReset(true /* printGranter */)
171+
return flushAndReset()
133172
case "clear":
134173
granter.mu.buckets[testTier0][canBurst].tokens = 0
135174
granter.mu.buckets[testTier0][noBurst].tokens = 0
136175
granter.mu.buckets[testTier1][canBurst].tokens = 0
137176
granter.mu.buckets[testTier1][noBurst].tokens = 0
138-
return flushAndReset(true /* printGranter */)
177+
return flushAndReset()
178+
case "setClusterSettings":
179+
ctx := context.Background()
180+
var override float64
181+
if d.MaybeScanArgs(t, "app", &override) {
182+
fmt.Fprintf(&buf, "SET CLUSTER SETTING admission.cpu_time_tokens.target_util.app_tenant = %v\n", override)
183+
KVCPUTimeAppUtilGoal.Override(ctx, &allocator.settings.SV, override)
184+
}
185+
if d.MaybeScanArgs(t, "system", &override) {
186+
fmt.Fprintf(&buf, "SET CLUSTER SETTING admission.cpu_time_tokens.target_util.system_tenant = %v\n", override)
187+
KVCPUTimeSystemUtilGoal.Override(ctx, &allocator.settings.SV, override)
188+
}
189+
if d.MaybeScanArgs(t, "burst", &override) {
190+
fmt.Fprintf(&buf, "SET CLUSTER SETTING admission.cpu_time_tokens.target_util.burst_delta = %v\n", override)
191+
KVCPUTimeUtilBurstDelta.Override(ctx, &allocator.settings.SV, override)
192+
}
193+
return flushAndReset()
139194
default:
140195
return fmt.Sprintf("unknown command: %s", d.Cmd)
141196
}
142197
})
143198
}
199+
200+
func TestCPUTimeTokenLinearModel(t *testing.T) {
201+
defer leaktest.AfterTest(t)()
202+
defer log.Scope(t).Close(t)
203+
204+
unixNanos := int64(1758938600000000000) // 2025-09-24T14:30:00Z
205+
testTime := timeutil.NewManualTime(time.Unix(0, unixNanos).UTC())
206+
model := cpuTimeTokenLinearModel{
207+
timeSource: testTime,
208+
lastFitTime: testTime.Now(),
209+
totalCPUTimeMillis: 0,
210+
tokenToCPUTimeMultiplier: 1,
211+
}
212+
tokenCPUTime := &testTokenUsageTracker{}
213+
model.granter = tokenCPUTime
214+
actualCPUTime := &testCPUMetricProvider{
215+
capacity: 10,
216+
}
217+
model.cpuMetricProvider = actualCPUTime
218+
219+
dur := 5 * time.Second
220+
actualCPUTime.append(dur.Nanoseconds(), 1) // appended value ignored by init
221+
222+
var targets targetUtilizations
223+
targets[testTier1][noBurst] = 0.8
224+
targets[testTier1][canBurst] = 0.85
225+
targets[testTier0][noBurst] = 0.9
226+
targets[testTier0][canBurst] = 0.95
227+
228+
// The first call to fit inits the model, by setting tokenToCPUTimeMultiplier
229+
// to one, since in prod on the first call to fit, there will be no CPU
230+
// usage data to use to determine tokenToCPUTimeMultiplier.
231+
refillRates := model.fit(targets)
232+
require.Equal(t, float64(1), model.tokenToCPUTimeMultiplier)
233+
// Given that tokenToCPUTimeMultiplier equals one, refillRates is equal
234+
// to target utilization for the bucket * the vCPU count (10 vCPUs in this
235+
// test). The unit of refillRates is nanoseconds.
236+
//
237+
// 80% util -> 10 vCPUs * .8 * 1s = 8s
238+
require.Equal(t, int64(8000000000), refillRates[testTier1][noBurst])
239+
// 85% util -> 10 vCPUs * .85 * 1s = 8.5s
240+
require.Equal(t, int64(8500000000), refillRates[testTier1][canBurst])
241+
// 90% util -> 10 vCPUs * .9 * 1s = 9s
242+
require.Equal(t, int64(9000000000), refillRates[testTier0][noBurst])
243+
// 95% util -> 10 vCPUs * .95 * 1s = 9.5s
244+
require.Equal(t, int64(9500000000), refillRates[testTier0][canBurst])
245+
246+
// Below tests are of the computation of tokenToCPUTimeMultiplier only. The
247+
// computation of tokenToCPUTimeMultiplier involves state stored on the model,
248+
// since the model does exponential smoothing. The computation of refillRates
249+
// (given a fixed tokenToCPUTimeMultiplier) is simpler: It is a pure function,
250+
// described up above in the test case of the first call to fit. So here we
251+
// focus on tokenToCPUTimeMultiplier.
252+
//
253+
// 2x
254+
// Token time is half of actual time, so tokenToCPUTimeMultiplier is two.
255+
// 100 data points are appended, to give the filter time to converge on two.
256+
tokenCPUTime.append(dur.Nanoseconds()/2, 100)
257+
actualCPUTime.append(dur.Milliseconds(), 100)
258+
for i := 0; i < 100; i++ {
259+
testTime.Advance(time.Second)
260+
_ = model.fit(targets)
261+
}
262+
tolerance := 0.01
263+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
264+
265+
// 4x
266+
// Token time is one fourth of actual time, so tokenToCPUTimeMultiplier is
267+
// four.
268+
tokenCPUTime.append(dur.Nanoseconds()/2, 100)
269+
actualCPUTime.append(dur.Milliseconds()*2, 100)
270+
for i := 0; i < 100; i++ {
271+
testTime.Advance(time.Second)
272+
_ = model.fit(targets)
273+
}
274+
require.InDelta(t, 4, model.tokenToCPUTimeMultiplier, tolerance)
275+
276+
// 1x
277+
// Token time is one equal to actual time, so tokenToCPUTimeMultiplier is one.
278+
tokenCPUTime.append(dur.Nanoseconds()*2, 100)
279+
actualCPUTime.append(dur.Milliseconds()*2, 100)
280+
for i := 0; i < 100; i++ {
281+
testTime.Advance(time.Second)
282+
_ = model.fit(targets)
283+
}
284+
require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance)
285+
286+
// 20x
287+
// tokenToCPUTimeMultiplier should be 40, based on the data, but the model caps
288+
// tokenToCPUTimeMultiplier at 20.
289+
tokenCPUTime.append(dur.Nanoseconds(), 100)
290+
actualCPUTime.append(dur.Milliseconds()*40, 100)
291+
for i := 0; i < 100; i++ {
292+
testTime.Advance(time.Second)
293+
_ = model.fit(targets)
294+
}
295+
require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance)
296+
297+
// 1x
298+
// tokenToCPUTimeMultiplier should be 0.5, based on the data, but the model caps
299+
// tokenToCPUTimeMultiplier at 1.
300+
tokenCPUTime.append(dur.Nanoseconds()*2, 100)
301+
actualCPUTime.append(dur.Milliseconds(), 100)
302+
for i := 0; i < 100; i++ {
303+
testTime.Advance(time.Second)
304+
_ = model.fit(targets)
305+
}
306+
require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance)
307+
308+
// 2x
309+
// Token time is half of actual time, so tokenToCPUTimeMultiplier is two.
310+
tokenCPUTime.append(dur.Nanoseconds(), 100)
311+
actualCPUTime.append(dur.Milliseconds()*2, 100)
312+
for i := 0; i < 100; i++ {
313+
testTime.Advance(time.Second)
314+
_ = model.fit(targets)
315+
}
316+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
317+
318+
// Below tests are of the low CPU logic. See the comments in fit for a full
319+
// explanation of the logic & especially the rationale for the logic. TLDR:
320+
// if CPU is less than 25%, and if tokenToCPUTimeMultiplier is less 3.6,
321+
// tokenToCPUTimeMultiplier is left alone. If tokenToCPUTimeMultiplier is
322+
// greater than 3.6, tokenToCPUTimeMultiplier is divided by 1.5 until it is
323+
// <= 3.6.
324+
//
325+
// vCPU count is 10. dur /.5 = 1s. 1s / 10s = 0.1 < 0.25. So low CPU mode
326+
// should be activated.
327+
//
328+
// Leave existing tokenToCPUTimeMultiplier multiplier as is, since 2 <= 3.6.
329+
tokenCPUTime.append(dur.Nanoseconds()/5, 100)
330+
actualCPUTime.append(dur.Milliseconds()/5, 100)
331+
for i := 0; i < 100; i++ {
332+
testTime.Advance(time.Second)
333+
_ = model.fit(targets)
334+
}
335+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
336+
337+
// Leave low vCPU mode, in order to set tokenToCPUTimeMultiplier equal to 20,
338+
// which is set up for the next test case.
339+
tokenCPUTime.append(dur.Nanoseconds(), 100)
340+
actualCPUTime.append(dur.Milliseconds()*100, 100)
341+
for i := 0; i < 100; i++ {
342+
testTime.Advance(time.Second)
343+
_ = model.fit(targets)
344+
}
345+
require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance)
346+
347+
// Iteratively reduce to 3.6x, since low CPU mode, and
348+
// tokenToCPUTimeMultiplier = 20 > 3.6.
349+
tokenCPUTime.append(dur.Nanoseconds()/5, 100)
350+
actualCPUTime.append(dur.Milliseconds()/5, 100)
351+
{
352+
lastMult := model.tokenToCPUTimeMultiplier
353+
for i := 0; ; i++ {
354+
require.Less(t, i, 100)
355+
testTime.Advance(time.Second)
356+
refillRates = model.fit(targets)
357+
mult := model.tokenToCPUTimeMultiplier
358+
if mult == lastMult {
359+
break
360+
}
361+
require.Less(t, mult, lastMult)
362+
lastMult = mult
363+
}
364+
}
365+
require.InDelta(t, 3.6, model.tokenToCPUTimeMultiplier, tolerance)
366+
367+
// Check refillRates again, this time with tokenToCPUTimeMultiplier
368+
// equal to 3.6 instead of one.
369+
370+
// 80% -> 10 vCPUs * .8 * 1s = 8s -> 8s / 3.6 ~= 2.22222222s
371+
require.Equal(t, int64(2222222222), refillRates[testTier1][noBurst])
372+
// 85% -> 10 vCPUs * .85 * 1s = 8.5s -> 8.5s / 3.6 ~= 2.36111111s
373+
require.Equal(t, int64(2361111111), refillRates[testTier1][canBurst])
374+
// 90% -> 10 vCPUs * .9 * 1s = 9s -> 9s / 3.6 ~= 2.5s
375+
require.Equal(t, int64(2500000000), refillRates[testTier0][noBurst])
376+
// 95% -> 10 vCPUs * .95 * 1s = 9.5s -> 9.5s / 3.6 ~= 2.63888889
377+
require.Equal(t, int64(2638888888), refillRates[testTier0][canBurst])
378+
}
379+
380+
type testTokenUsageTracker struct {
381+
i int
382+
tokensUsed []int64
383+
}
384+
385+
func (t *testTokenUsageTracker) append(tokens int64, count int) {
386+
for i := 0; i < count; i++ {
387+
t.tokensUsed = append(t.tokensUsed, tokens)
388+
}
389+
}
390+
391+
func (t *testTokenUsageTracker) resetTokensUsedInInterval() int64 {
392+
ret := t.tokensUsed[t.i]
393+
t.i++
394+
return ret
395+
}
396+
397+
type testCPUMetricProvider struct {
398+
i int
399+
cum int64
400+
millis []int64
401+
capacity float64
402+
}
403+
404+
func (m *testCPUMetricProvider) GetCPUInfo() (int64, float64) {
405+
cycle := m.millis[m.i]
406+
m.i++
407+
m.cum += cycle
408+
return m.cum, m.capacity
409+
}
410+
411+
func (t *testCPUMetricProvider) append(millis int64, count int) {
412+
for i := 0; i < count; i++ {
413+
t.millis = append(t.millis, millis)
414+
}
415+
}

0 commit comments

Comments
 (0)