Skip to content

Commit da00462

Browse files
craig[bot]stevendannatbg
committed
157955: taskpacer,kvclient: remove more uses of timeutil.Now r=tbg a=stevendanna This removes a few more uses of timeutil.Now. The biggest of the changes is to the taskpacer which is used in multiple places. Overall, my goal in this and other PRs is to slowly reduce the number of places using timeutil.Now() since ideally anything taking a time reading for anything other than measuring elapsed time would use an hlc.Clock. Epic: none Release note: none 159063: gen-cockroach-metrics: tweak a regexp r=tbg a=tbg This still filters out the large gossip histograms, but keeps the processed callbacks metric. Epic: none Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Tobias Grieger <[email protected]>
3 parents ea92c6b + ece5d78 + 74d350a commit da00462

File tree

15 files changed

+57
-41
lines changed

15 files changed

+57
-41
lines changed

build/tools/gen-cockroachdb-metrics/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ var (
9696
skipPatterns = []*regexp.Regexp{
9797
regexp.MustCompile(`^auth_`),
9898
regexp.MustCompile(`^distsender_rpc_err_errordetailtype_`),
99-
regexp.MustCompile(`^gossip_callbacks_`),
99+
regexp.MustCompile(`^gossip_callbacks_[^_]+_`),
100100
regexp.MustCompile(`^jobs_auto_config_env_runner_`),
101101
regexp.MustCompile(`^jobs_update_table_`),
102102
regexp.MustCompile(`^logical_replication_`),
@@ -304,7 +304,9 @@ func loadBaseMappings(path string) (*BaseMappingsYAML, error) {
304304

305305
// collectAllCRDBMetrics collects all CRDB metrics from the YAML output and combines them
306306
// with any runtime conditional metrics that aren't documented in metrics.yaml
307-
func collectAllCRDBMetrics(yamlOutput *YAMLOutput, runtimeConditionalMetrics []MetricInfo) []MetricInfo {
307+
func collectAllCRDBMetrics(
308+
yamlOutput *YAMLOutput, runtimeConditionalMetrics []MetricInfo,
309+
) []MetricInfo {
308310
allMetrics := make([]MetricInfo, 0)
309311

310312
for _, layer := range yamlOutput.Layers {
@@ -328,7 +330,9 @@ func shouldSkipMetric(promName string) bool {
328330
}
329331

330332
// mapMetricsToDatadog processes the CRDB metrics and maps them to Datadog names
331-
func mapMetricsToDatadog(metrics []MetricInfo, datadogMappings map[string]string) map[string]string {
333+
func mapMetricsToDatadog(
334+
metrics []MetricInfo, datadogMappings map[string]string,
335+
) map[string]string {
332336
result := make(map[string]string)
333337

334338
for _, metric := range metrics {
@@ -465,7 +469,7 @@ func main() {
465469
flag.Parse()
466470

467471
args := flag.Args()
468-
472+
469473
// Check if running in datadog-only mode
470474
if *datadogOnly {
471475
if len(flag.Args()) < 2 {

pkg/kv/kvclient/kvcoord/transport_race.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2020
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
2121
"github.com/cockroachdb/cockroach/pkg/util/log"
22-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
22+
"github.com/cockroachdb/crlib/crtime"
2323
)
2424

2525
var running int32 // atomically updated
@@ -107,7 +107,7 @@ func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
107107
encoder := json.NewEncoder(io.Discard)
108108
for {
109109
iters++
110-
start := timeutil.Now()
110+
start := crtime.NowMono()
111111
for _, ba := range bas {
112112
if ba != nil {
113113
if err := encoder.Encode(ba); err != nil {
@@ -119,7 +119,7 @@ func GRPCTransportFactory(nodeDialer *nodedialer.Dialer) TransportFactory {
119119
// times skyrocket. Sleep on average for as long as we worked
120120
// on the last iteration so we spend no more than half our CPU
121121
// time on this task.
122-
jittered := time.After(jitter(timeutil.Since(start)))
122+
jittered := time.After(jitter(start.Elapsed()))
123123
// Collect incoming requests until the jittered timer fires,
124124
// then access everything we have.
125125
for {

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
564564
// bypass AC.
565565
ba.AdmissionHeader = kvpb.AdmissionHeader{
566566
Priority: txn.AdmissionPriority,
567-
CreateTime: timeutil.Now().UnixNano(),
567+
CreateTime: h.clock.PhysicalTime().UnixNano(),
568568
Source: kvpb.AdmissionHeader_OTHER,
569569
}
570570

pkg/kv/kvclient/rangefeed/rangefeedcache/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ go_library(
1717
"//pkg/util/retry",
1818
"//pkg/util/stop",
1919
"//pkg/util/syncutil",
20-
"//pkg/util/timeutil",
20+
"@com_github_cockroachdb_crlib//crtime",
2121
"@com_github_cockroachdb_errors//:errors",
2222
"@com_github_cockroachdb_redact//:redact",
2323
],

pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"github.com/cockroachdb/cockroach/pkg/util/retry"
2424
"github.com/cockroachdb/cockroach/pkg/util/stop"
2525
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
26-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
26+
"github.com/cockroachdb/crlib/crtime"
2727
"github.com/cockroachdb/errors"
2828
"github.com/cockroachdb/redact"
2929
)
@@ -198,7 +198,7 @@ func Start[E rangefeedbuffer.Event](
198198

199199
const aWhile = 5 * time.Minute // arbitrary but much longer than a retry
200200
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
201-
started := timeutil.Now()
201+
started := crtime.NowMono()
202202
if err := c.Run(ctx); err != nil {
203203
if errors.Is(err, context.Canceled) {
204204
return // we're done here
@@ -207,7 +207,7 @@ func Start[E rangefeedbuffer.Event](
207207
onError(err)
208208
}
209209

210-
if timeutil.Since(started) > aWhile {
210+
if started.Elapsed() > aWhile {
211211
r.Reset()
212212
}
213213

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/util/syncutil",
3131
"//pkg/util/taskpacer",
3232
"//pkg/util/timeutil",
33+
"@com_github_cockroachdb_crlib//crtime",
3334
"@com_github_cockroachdb_errors//:errors",
3435
],
3536
)

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3636
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
3737
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
38+
"github.com/cockroachdb/crlib/crtime"
3839
"github.com/cockroachdb/errors"
3940
)
4041

@@ -684,11 +685,11 @@ func (b *updatesBuf) PaceBroadcastUpdate(ctx context.Context, condVar *sync.Cond
684685
pacer := b.mu.pacer
685686
b.mu.Unlock()
686687

687-
pacer.StartTask(timeutil.Now())
688+
pacer.StartTask(crtime.NowMono())
688689

689690
workLeft := originalNumWaiters
690691
for workLeft > 0 {
691-
todo, by := pacer.Pace(timeutil.Now(), workLeft)
692+
todo, by := pacer.Pace(crtime.NowMono(), workLeft)
692693

693694
b.mu.Lock()
694695
for i := 0; i < todo && workLeft > 0; i++ {
@@ -698,7 +699,7 @@ func (b *updatesBuf) PaceBroadcastUpdate(ctx context.Context, condVar *sync.Cond
698699
b.mu.Unlock()
699700

700701
if workLeft > 0 {
701-
if wait := timeutil.Until(by); wait > 0 {
702+
if wait := by.Sub(crtime.NowMono()); wait > 0 {
702703
time.Sleep(wait)
703704
}
704705
}

pkg/kv/kvserver/store.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ import (
105105
"github.com/cockroachdb/cockroach/pkg/util/tracing"
106106
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
107107
"github.com/cockroachdb/cockroach/pkg/util/uuid"
108+
"github.com/cockroachdb/crlib/crtime"
108109
"github.com/cockroachdb/errors"
109110
"github.com/cockroachdb/logtags"
110111
"github.com/cockroachdb/redact"
@@ -2548,12 +2549,12 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
25482549
var timer timeutil.Timer
25492550
defer timer.Stop()
25502551
errInterrupted := errors.New("waiting interrupted")
2551-
wait := func(ctx context.Context, until time.Time, interrupt <-chan struct{}) error {
2552-
now := timeutil.Now()
2553-
if !now.Before(until) {
2552+
wait := func(ctx context.Context, until crtime.Mono, interrupt <-chan struct{}) error {
2553+
wait := until.Sub(crtime.NowMono())
2554+
if wait <= 0 {
25542555
return nil
25552556
}
2556-
timer.Reset(until.Sub(now))
2557+
timer.Reset(wait)
25572558
select {
25582559
case <-timer.C:
25592560
return nil
@@ -2597,7 +2598,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
25972598
return // context canceled
25982599
}
25992600
// Aim to complete this run in exactly refresh interval.
2600-
now := timeutil.Now()
2601+
now := crtime.NowMono()
26012602
pacer.StartTask(now)
26022603

26032604
// We're about to perform one work cycle, where we go through all replicas
@@ -2615,7 +2616,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
26152616
break
26162617
}
26172618

2618-
todo, by := pacer.Pace(timeutil.Now(), len(work))
2619+
todo, by := pacer.Pace(crtime.NowMono(), len(work))
26192620
for _, id := range work[:todo] {
26202621
if r := s.GetReplicaIfExists(id); r != nil {
26212622
cts := r.GetCurrentClosedTimestamp(ctx)

pkg/kv/kvserver/store_raft.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
2929
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3030
"github.com/cockroachdb/cockroach/pkg/util/tracing"
31+
"github.com/cockroachdb/crlib/crtime"
3132
"github.com/cockroachdb/errors"
3233
)
3334

@@ -920,12 +921,12 @@ func (s *Store) raftTickLoop(ctx context.Context) {
920921
defer timer.Stop()
921922
// waitUntil is used to wait between different tick batches to pace the
922923
// ticking process over the entire tick interval.
923-
waitUntil := func(until time.Time) {
924-
now := timeutil.Now()
925-
if !now.Before(until) {
924+
waitUntil := func(until crtime.Mono) {
925+
wait := until.Sub(crtime.NowMono())
926+
if wait <= 0 {
926927
return
927928
}
928-
timer.Reset(until.Sub(now))
929+
timer.Reset(wait)
929930
<-timer.C
930931
}
931932

@@ -937,7 +938,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
937938
for {
938939
select {
939940
case <-ticker.C:
940-
now := timeutil.Now()
941+
now := crtime.NowMono()
941942
pacer.StartTask(now)
942943
// Update the liveness map.
943944
if s.cfg.NodeLiveness != nil {
@@ -968,7 +969,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
968969
// are ticked, which can lead to increased goroutine scheduling latency.
969970
for startAt := now; len(rangeIDs) != 0; {
970971
waitUntil(startAt)
971-
todo, by := pacer.Pace(timeutil.Now(), len(rangeIDs))
972+
todo, by := pacer.Pace(crtime.NowMono(), len(rangeIDs))
972973
batch := s.scheduler.NewEnqueueBatch()
973974
for _, id := range rangeIDs[:todo] {
974975
batch.Add(id)

pkg/kv/kvserver/storeliveness/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ go_library(
3939
"//pkg/util/syncutil",
4040
"//pkg/util/taskpacer",
4141
"//pkg/util/timeutil",
42+
"@com_github_cockroachdb_crlib//crtime",
4243
"@com_github_cockroachdb_errors//:errors",
4344
"@com_github_cockroachdb_redact//:redact",
4445
"@io_storj_drpc//:drpc",

0 commit comments

Comments
 (0)