Skip to content

Commit 6a96138

Browse files
committed
fix after review
1 parent 0c8806e commit 6a96138

File tree

6 files changed

+76
-43
lines changed

6 files changed

+76
-43
lines changed

e2e/start_work_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ozontech/file.d/fd"
2525
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
2626
_ "github.com/ozontech/file.d/plugin/action/add_host"
27+
_ "github.com/ozontech/file.d/plugin/action/cardinality"
2728
_ "github.com/ozontech/file.d/plugin/action/convert_date"
2829
_ "github.com/ozontech/file.d/plugin/action/convert_log_level"
2930
_ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes"

plugin/action/cardinality/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ Leave empty for default metric naming.
5050

5151
<br>
5252

53+
**`metric_label_value`** *`string`* *`default=unknown`*
54+
55+
Value assigned to the metric label when cardinality limit is exceeded.
56+
57+
<br>
58+
5359
**`limit`** *`int`* *`default=10000`*
5460

5561
Maximum allowed number of unique values for monitored fields.

plugin/action/cardinality/cache.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package discard
1+
package cardinality
22

33
import (
44
"sync"
@@ -14,12 +14,12 @@ type Cache struct {
1414
ttl int64
1515
}
1616

17-
func NewCache(ttl time.Duration) (*Cache, error) {
17+
func NewCache(ttl time.Duration) *Cache {
1818
return &Cache{
1919
tree: radix.New(),
2020
ttl: ttl.Nanoseconds(),
2121
mu: &sync.RWMutex{},
22-
}, nil
22+
}
2323
}
2424

2525
func (c *Cache) Set(key string) bool {
@@ -39,7 +39,7 @@ func (c *Cache) IsExists(key string) bool {
3939
now := xtime.GetInaccurateUnixNano()
4040
isExpire := c.isExpire(now, timeValue.(int64))
4141
if isExpire {
42-
c.delete(key)
42+
c.delete(key, true)
4343
return false
4444
}
4545
}
@@ -51,17 +51,19 @@ func (c *Cache) isExpire(now, value int64) bool {
5151
return diff > c.ttl
5252
}
5353

54-
func (c *Cache) delete(key string) {
55-
c.mu.Lock()
56-
defer c.mu.Unlock()
54+
func (c *Cache) delete(key string, lock bool) {
55+
if lock {
56+
c.mu.Lock()
57+
defer c.mu.Unlock()
58+
}
5759

5860
c.tree.Delete(key)
5961
}
6062

6163
func (c *Cache) CountPrefix(prefix string) (count int) {
6264
var keysToDelete []string
63-
c.mu.RLock()
6465
now := xtime.GetInaccurateUnixNano()
66+
c.mu.RLock()
6567
c.tree.WalkPrefix(prefix, func(s string, v any) bool {
6668
timeValue := v.(int64)
6769
if c.isExpire(now, timeValue) {
@@ -74,9 +76,11 @@ func (c *Cache) CountPrefix(prefix string) (count int) {
7476
c.mu.RUnlock()
7577

7678
if len(keysToDelete) > 0 {
79+
c.mu.Lock()
7780
for _, key := range keysToDelete {
78-
c.delete(key)
81+
c.delete(key, false)
7982
}
83+
c.mu.Unlock()
8084
}
8185
return
8286
}

plugin/action/cardinality/cache_test.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package discard
1+
package cardinality
22

33
import (
44
"sync"
@@ -9,13 +9,12 @@ import (
99
)
1010

1111
func TestNewCache(t *testing.T) {
12-
cache, err := NewCache(time.Minute)
13-
assert.NoError(t, err)
12+
cache := NewCache(time.Minute)
1413
assert.NotNil(t, cache)
1514
}
1615

1716
func TestSetAndExists(t *testing.T) {
18-
cache, _ := NewCache(time.Minute)
17+
cache := NewCache(time.Minute)
1918

2019
t.Run("basic set and get", func(t *testing.T) {
2120
key := "test-key"
@@ -32,25 +31,47 @@ func TestSetAndExists(t *testing.T) {
3231
}
3332

3433
func TestDelete(t *testing.T) {
35-
cache, _ := NewCache(time.Minute)
34+
cache := NewCache(time.Minute)
3635

37-
key := "to-delete"
38-
cache.Set(key)
36+
t.Run("delete existing key with lock=true", func(t *testing.T) {
37+
key := "to-delete-1"
38+
cache.Set(key)
39+
40+
cache.delete(key, true)
3941

40-
t.Run("delete existing key", func(t *testing.T) {
41-
cache.delete(key)
4242
found := cache.IsExists(key)
43-
assert.False(t, found)
43+
assert.False(t, found, "Key should be deleted")
44+
})
45+
46+
t.Run("delete existing key with lock=false", func(t *testing.T) {
47+
key := "to-delete-2"
48+
cache.Set(key)
49+
50+
// Manually acquire lock since we're using lock=false
51+
cache.mu.Lock()
52+
cache.delete(key, false)
53+
cache.mu.Unlock()
54+
55+
found := cache.IsExists(key)
56+
assert.False(t, found, "Key should be deleted when lock=false")
4457
})
4558

4659
t.Run("delete non-existent key", func(t *testing.T) {
47-
// Should not panic
48-
cache.delete("never-existed")
60+
// Should not panic or cause issues
61+
assert.NotPanics(t, func() {
62+
cache.delete("never-existed-1", true)
63+
})
64+
65+
// Verify cache is still functional
66+
key := "test-after-non-existent"
67+
cache.Set(key)
68+
found := cache.IsExists(key)
69+
assert.True(t, found, "Cache should still work after deleting non-existent key")
4970
})
5071
}
5172

5273
func TestCountPrefix(t *testing.T) {
53-
cache, _ := NewCache(time.Minute)
74+
cache := NewCache(time.Minute)
5475

5576
keys := []string{
5677
"key1_subkey1",
@@ -80,13 +101,13 @@ func TestCountPrefix(t *testing.T) {
80101
}
81102

82103
t.Run("count after delete", func(t *testing.T) {
83-
cache.delete("key1_subkey1")
104+
cache.delete("key1_subkey1", true)
84105
assert.Equal(t, 1, cache.CountPrefix("key1"))
85106
})
86107
}
87108

88109
func TestConcurrentOperations(t *testing.T) {
89-
cache, _ := NewCache(time.Minute)
110+
cache := NewCache(time.Minute)
90111

91112
var wg sync.WaitGroup
92113
keys := []string{"key1", "key2", "key3"}
@@ -128,7 +149,7 @@ func TestConcurrentOperations(t *testing.T) {
128149
go func(k string) {
129150
defer wg.Done()
130151
for i := 0; i < 100; i++ {
131-
cache.delete(k)
152+
cache.delete(k, true)
132153
}
133154
}(key)
134155
}
@@ -147,14 +168,14 @@ func TestConcurrentOperations(t *testing.T) {
147168
for i := 0; i < 100; i++ {
148169
cache.Set("key-x")
149170
cache.Set("key-y")
150-
cache.delete("key-x")
171+
cache.delete("key-x", true)
151172
}
152173
}()
153174
wg.Wait()
154175
}
155176

156177
func TestTTL(t *testing.T) {
157-
cache, _ := NewCache(100 * time.Millisecond)
178+
cache := NewCache(100 * time.Millisecond)
158179

159180
key := "ttl-key"
160181
cache.Set(key)

plugin/action/cardinality/cardinality.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package discard
1+
package cardinality
22

33
import (
44
"fmt"
@@ -41,7 +41,7 @@ type Plugin struct {
4141
fields []string
4242
logger *zap.Logger
4343

44-
cardinalityDiscardCounter *prometheus.CounterVec
44+
cardinalityApplyCounter *prometheus.CounterVec
4545
}
4646

4747
const (
@@ -82,6 +82,11 @@ type Config struct {
8282
// > Leave empty for default metric naming.
8383
MetricPrefix string `json:"metric_prefix" default:""` // *
8484

85+
// > @3@4@5@6
86+
// >
87+
// > Value assigned to the metric label when cardinality limit is exceeded.
88+
MetricLabelValue string `json:"metric_label_value" default:"unknown"` // *
89+
8590
// > @3@4@5@6
8691
// >
8792
// > Maximum allowed number of unique values for monitored fields.
@@ -134,13 +139,13 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string
134139
func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) {
135140
var metricName string
136141
if prefix == "" {
137-
metricName = "cardinality_discard_total"
142+
metricName = "cardinality_applied_total"
138143
} else {
139-
metricName = fmt.Sprintf(`cardinality_discard_%s_total`, prefix)
144+
metricName = fmt.Sprintf(`cardinality_applied_%s_total`, prefix)
140145
}
141-
p.cardinalityDiscardCounter = p.makeMetric(ctl,
146+
p.cardinalityApplyCounter = p.makeMetric(ctl,
142147
metricName,
143-
"Total number of events discarded due to cardinality limits",
148+
"Total number of events applied due to cardinality limits",
144149
p.keys...,
145150
)
146151
}
@@ -149,11 +154,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP
149154
p.config = config.(*Config)
150155
p.logger = params.Logger.Desugar()
151156

152-
var err error
153-
p.cache, err = NewCache(p.config.TTL_)
154-
if err != nil {
155-
panic(err)
156-
}
157+
p.cache = NewCache(p.config.TTL_)
157158

158159
p.keys = make([]string, 0, len(p.config.KeyFields))
159160
for _, fs := range p.config.KeyFields {
@@ -198,16 +199,16 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
198199
value := mapToStringSorted(cacheKey, cacheValue)
199200
keysCount := p.cache.CountPrefix(key)
200201

201-
if p.config.Limit > 0 && keysCount >= p.config.Limit {
202+
if p.config.Limit >= 0 && keysCount >= p.config.Limit {
202203
labelsValues := make([]string, 0, len(p.keys))
203204
for _, key := range p.keys {
204205
if val, exists := cacheKey[key]; exists {
205206
labelsValues = append(labelsValues, val)
206207
} else {
207-
labelsValues = append(labelsValues, "unknown")
208+
labelsValues = append(labelsValues, p.config.MetricLabelValue)
208209
}
209210
}
210-
p.cardinalityDiscardCounter.WithLabelValues(labelsValues...).Inc()
211+
p.cardinalityApplyCounter.WithLabelValues(labelsValues...).Inc()
211212
switch p.config.Action {
212213
case actionDiscard:
213214
return pipeline.ActionDiscard

plugin/action/cardinality/cardinality_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package discard
1+
package cardinality
22

33
import (
44
"fmt"
@@ -183,7 +183,7 @@ func TestCardinalityLimitDiscardIfNoSetKeyFields(t *testing.T) {
183183
}
184184

185185
func TestSetAndCountPrefix(t *testing.T) {
186-
cache, _ := NewCache(time.Minute)
186+
cache := NewCache(time.Minute)
187187

188188
cacheKey := map[string]string{
189189
"host": "localhost",

0 commit comments

Comments
 (0)