-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Closed
Description
go-zero/core/collection/timingwheel.go
Lines 165 to 180 in 0a72444
| func (tw *TimingWheel) drainAll(fn func(key, value any)) { | |
| runner := threading.NewTaskRunner(drainWorkers) | |
| for _, slot := range tw.slots { | |
| for e := slot.Front(); e != nil; { | |
| task := e.Value.(*timingEntry) | |
| next := e.Next() | |
| slot.Remove(e) | |
| e = next | |
| if !task.removed { | |
| runner.Schedule(func() { | |
| fn(task.key, task.value) | |
| }) | |
| } | |
| } | |
| } | |
| } |
// 1. 修复 drainAll
func (tw *TimingWheel) drainAll(fn func(key, value any)) {
runner := threading.NewTaskRunner(drainWorkers)
for _, slot := range tw.slots {
for e := slot.Front(); e != nil; {
task := e.Value.(*timingEntry)
next := e.Next()
slot.Remove(e)
e = next
if !task.removed {
k, v := task.key, task.value // ✓ 复制变量
runner.Schedule(func() {
fn(k, v)
})
}
// 🔥 建议:同时清理 timers 中的数据
tw.timers.Del(task.key)
}
}
runner.Wait() // ✓ 等待完成
}
go-zero/core/collection/timingwheel.go
Lines 266 to 278 in 0a72444
| func (tw *TimingWheel) runTasks(tasks []timingTask) { | |
| if len(tasks) == 0 { | |
| return | |
| } | |
| go func() { | |
| for i := range tasks { | |
| threading.RunSafe(func() { | |
| tw.execute(tasks[i].key, tasks[i].value) | |
| }) | |
| } | |
| }() | |
| } |
// 2. 修复 runTasks
func (tw *TimingWheel) runTasks(tasks []timingTask) {
if len(tasks) == 0 {
return
}
go func() {
for i := range tasks {
task := tasks[i] // ✓ 复制
threading.RunSafe(func() {
tw.execute(task.key, task.value)
})
}
}()
}
Metadata
Metadata
Assignees
Labels
No labels