-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsingleflight.go
More file actions
106 lines (90 loc) · 3.33 KB
/
singleflight.go
File metadata and controls
106 lines (90 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Package singleflight provides generic helpers around golang.org/x/sync/singleflight
package singleflight
import (
"golang.org/x/sync/singleflight"
)
// Singleflighter is anything that implements singleflight.Group.
type Singleflighter[T ~string, V any] interface {
Do(key T, fn func() (V, error)) (V, error, bool)
DoChan(key T, fn func() (V, error)) <-chan Result[V]
Forget(key T)
}
// Group wraps singleflight.Group with generics.
//
// T must be a string-like type (constraint ~string) to ensure keys can be
// passed through to the underlying singleflight. V is the result type
// returned by the work function.
type Group[T ~string, V any] struct {
group singleflight.Group
}
// Result is the typed output sent on channels returned by Group.DoChan and
// ShardedGroup.DoChan.
//
// Val is the value produced by the underlying function. Err is any error
// returned by that function. Shared reports whether this caller received a
// duplicate-suppressed (shared) result, as opposed to being the caller that
// actually executed the function.
type Result[V any] struct {
Val V
Err error
Shared bool
}
// Do executes and deduplicates the provided function for the given key.
//
// If multiple goroutines call Do with the same key at the same time, the
// function fn will be invoked exactly once; the other callers will wait for
// that single invocation to complete and will receive the same results.
//
// It returns the function's value V, its error (if any), and a boolean
// shared indicating whether this caller received a shared result.
func (g *Group[T, V]) Do(key T, fn func() (V, error)) (v V, err error, shared bool) {
result, err, shared := g.group.Do(string(key), func() (any, error) {
return fn()
})
if result != nil {
v, _ = result.(V) //nolint:errcheck
}
return v, err, shared
}
// DoChan is the channel-based variant of Do.
//
// It schedules fn to run once for the given key (deduplicating concurrent
// calls with the same key) and returns a channel that will receive exactly
// one Result[V]. The channel is buffered with capacity 1 so a receiver is
// not strictly required to be ready at completion time.
//
// As with Do, callers that join an in-flight execution receive the same
// result and Err, and the Shared field indicates whether this caller
// received a shared result.
func (g *Group[T, V]) DoChan(key T, fn func() (V, error)) <-chan Result[V] {
ch := make(chan Result[V], 1)
upstreamCh := g.group.DoChan(string(key), func() (any, error) {
return fn()
})
go g.toResult(upstreamCh, ch)
return ch
}
// Forget tells the group to forget about an in-flight or completed entry for key.
//
// If there is a call in flight for key, subsequent Do/DoChan calls with the
// same key will not join that call after Forget has been invoked; instead,
// they will start a new, independent execution. If there is a cached
// result (from a recently completed call), it is also cleared.
func (g *Group[T, V]) Forget(key T) {
g.group.Forget(string(key))
}
// toResult adapts singleflight.Result into a typed Result[V].
func (g *Group[T, V]) toResult(
sourceCh <-chan singleflight.Result,
destCh chan<- Result[V],
) {
sourceResult := <-sourceCh
result := Result[V]{
Err: sourceResult.Err,
Shared: sourceResult.Shared,
}
if sourceResult.Val != nil {
result.Val, _ = sourceResult.Val.(V) //nolint:errcheck
}
destCh <- result
}