Skip to content

Commit 1645953

Browse files
craig[bot]dhartunianyuzefovich
committed
158931: obs, kv, sql: add workloadID to profile tags r=tbg,jasonlmfong a=dhartunian We introduce `pkg/obs/workloadid` which defines a commond workloadID definition, subject to extensive future modification, that we can use to tag CPU profiles and execution trace regions. Currently, the workloadID is either a static value, or a statement fingerprint ID. Where appropriate, we've tagged CPU profiles with a workload identifier. When tracing is enabled, on a request, we will attempt to include the workloadID into the region tag in the execution trace. This path is a bit perf sensitive so we only want to incur the cost of allocating the extra string when we're already paying the cost of tracing. It is expected that the combination of statementb diagnostic bundle capture and execution tracing can then be cross-referenced. Epic: CRDB-55080 Resolves: CRDB-55928 Resolves: CRDB-55924 Release note: None 159113: kvcoord: optimize txn write buffer for read-only txns r=yuzefovich a=yuzefovich In the txn write buffer we need to merge the ScanResponse coming from the server with any overlapping writes that we've buffered. This requires us to allocate `respMerger.batchResponses` which is the "staging" area of the merged response. However, if we don't have any overlapping buffered writes, then we simply copy the server response into that staging area. This commit adds an optimization to avoid this extra copy in a special but common case when the buffer is empty (i.e. we're in a read-only txn). We could extend the optimization further but that is left as a TODO. Epic: None Release note: None Co-authored-by: David Hartunian <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents 367d1b8 + 25c153c + 4289d0d commit 1645953

File tree

17 files changed

+129
-8
lines changed

17 files changed

+129
-8
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,6 +1681,7 @@ GO_TARGETS = [
16811681
"//pkg/obs/eventagg:eventagg_test",
16821682
"//pkg/obs/logstream:logstream",
16831683
"//pkg/obs/logstream:logstream_test",
1684+
"//pkg/obs/workloadid:workloadid",
16841685
"//pkg/obs:obs",
16851686
"//pkg/raft/confchange:confchange",
16861687
"//pkg/raft/confchange:confchange_test",

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"//pkg/kv",
4747
"//pkg/kv/bulk",
4848
"//pkg/kv/kvpb",
49+
"//pkg/obs/workloadid",
4950
"//pkg/repstream/streampb",
5051
"//pkg/roachpb",
5152
"//pkg/security/username",

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2323
"github.com/cockroachdb/cockroach/pkg/keys"
2424
kvbulk "github.com/cockroachdb/cockroach/pkg/kv/bulk"
25+
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
2526
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
2627
"github.com/cockroachdb/cockroach/pkg/roachpb"
2728
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -359,7 +360,8 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
359360
log.Dev.Infof(lrw.Ctx(), "consumer completed. Error: %s", err)
360361
lrw.sendError(errors.Wrap(err, "consume events"))
361362
}
362-
}, "proc", fmt.Sprintf("%d", lrw.ProcessorID))
363+
}, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_LDR,
364+
"proc", fmt.Sprintf("%d", lrw.ProcessorID))
363365
return nil
364366
})
365367
}

pkg/crosscluster/logical/offline_initial_scan_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
1717
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1818
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
19+
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
1920
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
@@ -235,7 +236,8 @@ func (o *offlineInitialScanProcessor) Start(ctx context.Context) {
235236
if err := o.subscription.Err(); err != nil {
236237
o.sendError(errors.Wrap(err, "subscription"))
237238
}
238-
}, "proc", fmt.Sprintf("%d", o.ProcessorID))
239+
}, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_LDR,
240+
"proc", fmt.Sprintf("%d", o.ProcessorID))
239241
return nil
240242
})
241243
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,6 +1162,15 @@ func (twb *txnWriteBuffer) mergeWithScanResp(
11621162
"with COL_BATCH_RESPONSE scan format")
11631163
}
11641164

1165+
if twb.buffer.Len() == 0 {
1166+
// If we haven't buffered any writes, then we can just return the server
1167+
// response unchanged.
1168+
// TODO(yuzefovich): we could take the optimization further by examining
1169+
// whether any buffered writes overlap with the Scan request and
1170+
// skipping the merge step if not.
1171+
return resp, nil
1172+
}
1173+
11651174
respIter := newScanRespIter(req, resp)
11661175
// First, calculate the size of the merged response. This then allows us to
11671176
// exactly pre-allocate the response slice when constructing the respMerger.
@@ -1186,6 +1195,15 @@ func (twb *txnWriteBuffer) mergeWithReverseScanResp(
11861195
"ReverseScanRequest with COL_BATCH_RESPONSE scan format")
11871196
}
11881197

1198+
if twb.buffer.Len() == 0 {
1199+
// If we haven't buffered any writes, then we can just return the server
1200+
// response unchanged.
1201+
// TODO(yuzefovich): we could take the optimization further by examining
1202+
// whether any buffered writes overlap with the ReverseScan request and
1203+
// skipping the merge step if not.
1204+
return resp, nil
1205+
}
1206+
11891207
respIter := newReverseScanRespIter(req, resp)
11901208
// First, calculate the size of the merged response. This then allows us to
11911209
// exactly pre-allocate the response slice when constructing the respMerger.

pkg/kv/kvclient/rangefeed/rangefeed.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,9 @@ func (f *RangeFeed) start(
241241
// pprof.Do function does exactly what we do here, but it also results in
242242
// pprof.Do function showing up in the stack traces -- so, just set and reset
243243
// labels manually.
244-
ctx, reset := pprofutil.SetProfilerLabels(ctx, append(f.extraPProfLabels, "rangefeed", f.name)...)
244+
ctx, reset := pprofutil.SetProfilerLabels(
245+
ctx, append(f.extraPProfLabels, "rangefeed", f.name)...,
246+
)
245247
defer reset()
246248

247249
if f.invoker != nil {

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ go_library(
191191
"//pkg/multitenant",
192192
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
193193
"//pkg/multitenant/tenantcostmodel",
194+
"//pkg/obs/workloadid",
194195
"//pkg/raft",
195196
"//pkg/raft/raftlogger",
196197
"//pkg/raft/raftpb",

pkg/kv/kvserver/raft_transport.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/node_rac2"
2121
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
22+
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
2223
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
2324
"github.com/cockroachdb/cockroach/pkg/roachpb"
2425
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
@@ -931,7 +932,7 @@ func (t *RaftTransport) startProcessNewQueue(
931932
}
932933
go func(ctx context.Context) {
933934
defer hdl.Activate(ctx).Release(ctx)
934-
pprofutil.Do(ctx, worker, "remote_node_id", toNodeID.String())
935+
pprofutil.Do(ctx, worker, workloadid.ProfileTag, workloadid.WORKLOAD_NAME_RAFT, "remote_node_id", toNodeID.String())
935936
}(ctx)
936937
return true
937938
}

pkg/kv/kvserver/replica_send.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
2323
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
2424
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
25+
"github.com/cockroachdb/cockroach/pkg/obs/workloadid"
2526
"github.com/cockroachdb/cockroach/pkg/roachpb"
2627
"github.com/cockroachdb/cockroach/pkg/settings"
2728
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -139,7 +140,21 @@ func (r *Replica) SendWithWriteBytes(
139140
}
140141
defer reset()
141142
}
143+
142144
if trace.IsEnabled() {
145+
foundLabel := ""
146+
for i, l := range ba.ProfileLabels {
147+
if i%2 == 0 && l == workloadid.ProfileTag && i < len(ba.ProfileLabels)-1 {
148+
// This label is set in conn_executor_exec if tracing is active.
149+
foundLabel = ba.ProfileLabels[i+1]
150+
break
151+
}
152+
}
153+
// This construction avoids calling `defer` in a loop which is
154+
// not permitted by our linter.
155+
if foundLabel != "" {
156+
defer trace.StartRegion(ctx, foundLabel).End()
157+
}
143158
defer trace.StartRegion(ctx, r.rangeStr.String() /* cheap */).End()
144159
}
145160
// Add the range log tag.

pkg/kv/kvserver/storeliveness/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ go_library(
2121
"//pkg/base",
2222
"//pkg/keys",
2323
"//pkg/kv/kvserver/storeliveness/storelivenesspb",
24+
"//pkg/obs/workloadid",
2425
"//pkg/roachpb",
2526
"//pkg/rpc/nodedialer",
2627
"//pkg/rpc/rpcbase",

0 commit comments

Comments
 (0)