-
Notifications
You must be signed in to change notification settings - Fork 106
message's Context doesn't save when a worker fetch the message in v3 #196
Copy link
Copy link
Open
Description
func (c *Consumer) worker(ctx context.Context, workerID int32) {
var lock *redislock.Lock
defer func() {
if lock != nil {
_ = lock.Release(ctx)
}
}()
timer := time.NewTimer(time.Minute)
timer.Stop()
for {
if workerID >= atomic.LoadInt32(&c.numWorker) {
return
}
if c.opt.WorkerLimit > 0 {
lock = c.lockWorker(ctx, lock, workerID)
}
msg := c.waitMessage(ctx, timer)
if msg == nil {
if atomic.LoadInt32(&c.state) >= stateStoppingWorkers {
return
}
continue
}
msg.Ctx = ctx // this place
_ = c.Process(msg)
}
}
if we use consumer's context, we will not use the redis queue's message's context, it's a specified context which definited by the sdk users, it may contains some their own context's value, so I think it's better to remove the line "msg.Ctx = ctx", let users use their context.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels