Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.

Commit ea8f2e7

Browse files
authored
Merge pull request #8 from reddit/v2.6.6-patched-251123-trace
Additional trace -time enqueued/len
2 parents d48bf0a + 4395a8f commit ea8f2e7

3 files changed

Lines changed: 71 additions & 2 deletions

File tree

internal/querynodev2/services.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525

2626
"github.com/samber/lo"
27+
"go.opentelemetry.io/otel/attribute"
2728
"go.opentelemetry.io/otel/trace"
2829
"go.uber.org/zap"
2930
"golang.org/x/sync/errgroup"
@@ -745,6 +746,8 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
745746
zap.String("channel", channel),
746747
zap.String("scope", req.GetScope().String()),
747748
)
749+
span := trace.SpanFromContext(ctx)
750+
hasSpan := span.SpanContext().IsValid()
748751
channelsMvcc := make(map[string]uint64)
749752
for _, ch := range req.GetDmlChannels() {
750753
channelsMvcc[ch] = req.GetReq().GetMvccTimestamp()
@@ -771,7 +774,7 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
771774
searchCtx, cancel := context.WithCancel(ctx)
772775
defer cancel()
773776

774-
tr := timerecord.NewTimeRecorder("searchSegments")
777+
tr := timerecord.NewTimeRecorderWithTrace(ctx, "searchSegments")
775778
log.Debug("search segments...")
776779

777780
if !node.manager.Collection.Ref(req.Req.GetCollectionID(), 1) {
@@ -792,18 +795,36 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe
792795
task = tasks.NewSearchTask(searchCtx, collection, node.manager, req, node.serverID)
793796
}
794797

798+
if hasSpan {
799+
span.AddEvent("searchSegments.schedule.enqueue", trace.WithAttributes(
800+
attribute.Int("segment_count", len(req.GetSegmentIDs())),
801+
attribute.String("channel", channel),
802+
))
803+
}
804+
enqueueStart := time.Now()
795805
if err := node.scheduler.Add(task); err != nil {
796806
log.Warn("failed to search channel", zap.Error(err))
797807
resp.Status = merr.Status(err)
798808
return resp, nil
799809
}
810+
if hasSpan {
811+
span.AddEvent("searchSegments.schedule.enqueued", trace.WithAttributes(
812+
attribute.Int64("enqueue_ms", time.Since(enqueueStart).Milliseconds()),
813+
))
814+
}
800815

816+
waitStart := time.Now()
801817
err := task.Wait()
802818
if err != nil {
803819
log.Warn("failed to search segments", zap.Error(err))
804820
resp.Status = merr.Status(err)
805821
return resp, nil
806822
}
823+
if hasSpan {
824+
span.AddEvent("searchSegments.schedule.wait_done", trace.WithAttributes(
825+
attribute.Int64("wait_ms", time.Since(waitStart).Milliseconds()),
826+
))
827+
}
807828

808829
tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v",
809830
channel,

internal/querynodev2/tasks/search_task.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"context"
1010
"fmt"
1111
"strconv"
12+
"time"
1213

1314
"github.com/samber/lo"
1415
"go.opentelemetry.io/otel"
16+
"go.opentelemetry.io/otel/attribute"
1517
"go.opentelemetry.io/otel/trace"
1618
"go.uber.org/zap"
1719
"google.golang.org/protobuf/proto"
@@ -55,6 +57,7 @@ type SearchTask struct {
5557

5658
tr *timerecord.TimeRecorder
5759
scheduleSpan trace.Span
60+
readyEnqueue time.Time
5861
}
5962

6063
func NewSearchTask(ctx context.Context,
@@ -97,6 +100,25 @@ func (t *SearchTask) IsGpuIndex() bool {
97100
return t.collection.IsGpuIndex()
98101
}
99102

103+
// MarkReadyEnqueue records when the task enters the ready queue.
104+
func (t *SearchTask) MarkReadyEnqueue(attrs ...attribute.KeyValue) {
105+
if t.scheduleSpan == nil {
106+
return
107+
}
108+
t.readyEnqueue = time.Now()
109+
t.scheduleSpan.AddEvent("schedule.ready_enqueue", trace.WithAttributes(attrs...))
110+
}
111+
112+
// MarkReadyDequeue records when the task leaves the ready queue and how long it waited there.
113+
func (t *SearchTask) MarkReadyDequeue(attrs ...attribute.KeyValue) {
114+
if t.scheduleSpan == nil || t.readyEnqueue.IsZero() {
115+
return
116+
}
117+
waitMs := time.Since(t.readyEnqueue).Milliseconds()
118+
attrs = append(attrs, attribute.Int64("ready_wait_ms", waitMs))
119+
t.scheduleSpan.AddEvent("schedule.ready_dequeue", trace.WithAttributes(attrs...))
120+
}
121+
100122
func (t *SearchTask) PreExecute() error {
101123
// Update task wait time metric before execute
102124
nodeID := strconv.FormatInt(t.GetNodeID(), 10)
@@ -228,13 +250,14 @@ func (t *SearchTask) Execute() error {
228250
log.Warn("failed to reduce search results", zap.Error(err))
229251
return err
230252
}
253+
reduceDuration := tr.CtxRecord(t.ctx, "search reduce")
231254
defer segcore.DeleteSearchResultDataBlobs(blobs)
232255
metrics.QueryNodeReduceLatency.WithLabelValues(
233256
fmt.Sprint(t.GetNodeID()),
234257
metrics.SearchLabel,
235258
metrics.ReduceSegments,
236259
metrics.BatchReduce).
237-
Observe(float64(tr.RecordSpan().Milliseconds()))
260+
Observe(float64(reduceDuration.Milliseconds()))
238261
for i := range t.originNqs {
239262
blob, cost, err := segcore.GetSearchResultDataBlob(t.ctx, blobs, i)
240263
if err != nil {

internal/util/searchutil/scheduler/concurrent_safe_scheduler.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sync"
66

7+
"go.opentelemetry.io/otel/attribute"
78
"go.uber.org/atomic"
89
"go.uber.org/zap"
910

@@ -41,6 +42,12 @@ type addTaskReq struct {
4142
err chan<- error
4243
}
4344

45+
// readyQueueTracer optionally lets tasks receive instrumentation callbacks for ready-queue timing.
46+
type readyQueueTracer interface {
47+
MarkReadyEnqueue(attrs ...attribute.KeyValue)
48+
MarkReadyDequeue(attrs ...attribute.KeyValue)
49+
}
50+
4451
// scheduler is a general concurrent safe scheduler implementation by wrapping a schedule policy.
4552
type scheduler struct {
4653
policy schedulePolicy
@@ -140,6 +147,12 @@ func (s *scheduler) schedule() {
140147
// And consume recv chan as much as possible.
141148
s.consumeRecvChan(req, maxReceiveChanBatchConsumeNum)
142149
case execChan <- task:
150+
if tracer, ok := task.(readyQueueTracer); ok {
151+
tracer.MarkReadyDequeue(
152+
attribute.Int("ready_len", s.policy.Len()),
153+
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
154+
)
155+
}
143156
// Task sent, drop the ownership of sent task.
144157
// Update waiting task counter.
145158
s.updateWaitingTaskCounter(-1, -nq)
@@ -185,6 +198,12 @@ func (s *scheduler) handleAddTaskRequest(req addTaskReq, maxWaitTaskNum int64) b
185198
newTaskAdded, err := s.policy.Push(req.task)
186199
if err == nil {
187200
s.updateWaitingTaskCounter(int64(newTaskAdded), nq)
201+
if tracer, ok := req.task.(readyQueueTracer); ok {
202+
tracer.MarkReadyEnqueue(
203+
attribute.Int("ready_len", s.policy.Len()),
204+
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
205+
)
206+
}
188207
}
189208
req.err <- err
190209
}
@@ -203,6 +222,12 @@ func (s *scheduler) produceExecChan() Task {
203222

204223
select {
205224
case execChan <- task:
225+
if tracer, ok := task.(readyQueueTracer); ok {
226+
tracer.MarkReadyDequeue(
227+
attribute.Int("ready_len", s.policy.Len()),
228+
attribute.Int64("waiting_nq", s.GetWaitingTaskTotalNQ()),
229+
)
230+
}
206231
// Update waiting task counter.
207232
s.updateWaitingTaskCounter(-1, -nq)
208233
// Task sent, drop the ownership of sent task.

0 commit comments

Comments
 (0)