Skip to content

Commit afd0bd3

Browse files
authored
Merge pull request #55 from dgrisonnet/instrument-informers
Instrument informers
2 parents a1ea13b + ce2cd7b commit afd0bd3

File tree

286 files changed

+393
-20771
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

286 files changed

+393
-20771
lines changed

cmd/kube-events-exporter/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ func main() {
6060
klog.Fatalf("failed to create cluster config: %v", err)
6161
}
6262

63+
exporterRegistry := prometheus.NewRegistry()
64+
exporter.RegisterExporterCollectors(exporterRegistry)
65+
6366
eventRegistry := prometheus.NewRegistry()
64-
eventCollector := collector.NewEventCollector(kubeClient, opts)
67+
eventCollector := collector.NewEventCollector(kubeClient, exporterRegistry, opts)
6568

6669
stopCh := make(chan struct{})
6770
defer close(stopCh)
6871

6972
eventCollector.Run(stopCh)
7073
eventRegistry.MustRegister(eventCollector)
7174

72-
exporterRegistry := prometheus.NewRegistry()
73-
exporter.RegisterExporterCollectors(exporterRegistry)
74-
7575
eventMux := http.NewServeMux()
7676
exporterhttp.RegisterEventsMuxHandlers(eventMux, eventRegistry, exporterRegistry)
7777
exporterMux := http.NewServeMux()

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+
8585
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
8686
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
8787
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
88-
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
8988
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
9089
github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
9190
github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
@@ -189,7 +188,6 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
189188
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
190189
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
191190
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
192-
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
193191
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
194192
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
195193
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

internal/collector/event.go

Lines changed: 42 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -22,119 +22,103 @@ import (
2222

2323
"github.com/prometheus/client_golang/prometheus"
2424
"github.com/rhobs/kube-events-exporter/internal/options"
25+
"github.com/rhobs/kube-events-exporter/pkg/informer"
2526

2627
v1 "k8s.io/api/core/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28-
"k8s.io/client-go/informers"
2929
"k8s.io/client-go/kubernetes"
3030
"k8s.io/client-go/tools/cache"
3131
)
3232

3333
// EventCollector is a prometeus.Collector that bundles all the metrics related
3434
// to Kubernetes Events.
3535
type EventCollector struct {
36-
eventsTotal *prometheus.CounterVec
37-
38-
lock sync.Mutex
39-
informerFactories []informers.SharedInformerFactory
40-
filter eventFilter
36+
kclient kubernetes.Interface
37+
metrics *exporterMetrics
38+
lock sync.Mutex
39+
filter eventFilter
40+
informers []cache.SharedIndexInformer
4141
}
4242

4343
// NewEventCollector returns a prometheus.Collector collecting metrics about
4444
// Kubernetes Events.
45-
func NewEventCollector(kubeClient kubernetes.Interface, opts *options.Options) *EventCollector {
46-
var factories []informers.SharedInformerFactory
47-
for _, ns := range opts.InvolvedObjectNamespaces {
48-
for _, eventType := range opts.EventTypes {
49-
factories = append(factories, newFilteredInformerFactory(kubeClient, ns, eventType))
50-
}
51-
}
52-
45+
func NewEventCollector(kclient kubernetes.Interface, exporterRegistry *prometheus.Registry, opts *options.Options) *EventCollector {
5346
collector := &EventCollector{
54-
eventsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
55-
Name: "kube_events_total",
56-
Help: "Count of all Kubernetes Events",
57-
}, []string{"type", "involved_object_namespace", "involved_object_kind", "reason"}),
58-
59-
lock: sync.Mutex{},
60-
informerFactories: factories,
47+
kclient: kclient,
48+
lock: sync.Mutex{},
6149
filter: eventFilter{
6250
creationTimestamp: time.Now(),
6351
apiGroups: opts.InvolvedObjectAPIGroups,
6452
controllers: opts.ReportingControllers,
6553
},
54+
metrics: newExporterMetrics(exporterRegistry),
6655
}
6756

68-
collector.initInformers()
69-
57+
for _, ns := range opts.InvolvedObjectNamespaces {
58+
for _, evType := range opts.EventTypes {
59+
inf := collector.newEventInformer(ns, evType)
60+
inf.AddEventHandler(collector.eventHandler())
61+
collector.informers = append(collector.informers, inf)
62+
}
63+
}
7064
return collector
7165
}
7266

73-
func newFilteredInformerFactory(kubeClient kubernetes.Interface, ns, eventType string) informers.SharedInformerFactory {
74-
return informers.NewFilteredSharedInformerFactory(
75-
kubeClient,
76-
0,
77-
metav1.NamespaceAll,
78-
func(list *metav1.ListOptions) {
79-
filterInvolvedObjectNs(list, ns)
80-
filterEventType(list, eventType)
81-
},
82-
)
83-
}
84-
8567
// Describe implements the prometheus.Collector interface.
8668
func (collector *EventCollector) Describe(ch chan<- *prometheus.Desc) {
87-
collector.eventsTotal.Describe(ch)
69+
collector.metrics.eventsTotal.Describe(ch)
8870
}
8971

9072
// Collect implements the prometheus.Collector interface.
9173
func (collector *EventCollector) Collect(ch chan<- prometheus.Metric) {
92-
collector.eventsTotal.Collect(ch)
74+
collector.metrics.eventsTotal.Collect(ch)
9375
}
9476

9577
// Run starts updating EventCollector metrics.
9678
func (collector *EventCollector) Run(stopCh <-chan struct{}) {
97-
for _, factory := range collector.informerFactories {
98-
go factory.Start(stopCh)
79+
for _, informer := range collector.informers {
80+
go informer.Run(stopCh)
9981
}
10082
}
10183

102-
func (collector *EventCollector) initInformers() {
103-
for _, factory := range collector.informerFactories {
104-
eventsTotalInformer := factory.Core().V1().Events().Informer()
105-
eventsTotalInformer.AddEventHandler(collector.eventsTotalHandler())
106-
}
84+
func (collector *EventCollector) newEventInformer(ns, evType string) cache.SharedIndexInformer {
85+
return informer.NewInstrumentedEventInformer(
86+
collector.kclient,
87+
metav1.NamespaceAll,
88+
collector.metrics.listWatchMetrics,
89+
0,
90+
cache.Indexers{},
91+
func(list *metav1.ListOptions) {
92+
filterInvolvedObjectNs(list, ns)
93+
filterEventType(list, evType)
94+
},
95+
)
10796
}
10897

109-
func (collector *EventCollector) eventsTotalHandler() cache.ResourceEventHandler {
98+
func (collector *EventCollector) eventHandler() cache.ResourceEventHandler {
11099
return cache.FilteringResourceEventHandler{
111100
FilterFunc: collector.filter.filter,
112101
Handler: &cache.ResourceEventHandlerFuncs{
113102
AddFunc: func(obj interface{}) {
103+
collector.lock.Lock()
104+
defer collector.lock.Unlock()
105+
114106
ev := obj.(*v1.Event)
115-
collector.increaseEventsTotal(ev, 1)
107+
collector.metrics.increaseEventsTotal(ev, 1)
116108
},
117109
UpdateFunc: func(oldObj, newObj interface{}) {
110+
collector.lock.Lock()
111+
defer collector.lock.Unlock()
112+
118113
oldEv := oldObj.(*v1.Event)
119114
newEv := newObj.(*v1.Event)
120115
nbNew := updatedEventNb(oldEv, newEv)
121-
collector.increaseEventsTotal(newEv, float64(nbNew))
116+
collector.metrics.increaseEventsTotal(newEv, float64(nbNew))
122117
},
123118
},
124119
}
125120
}
126121

127-
func (collector *EventCollector) increaseEventsTotal(event *v1.Event, nbNew float64) {
128-
collector.lock.Lock()
129-
collector.eventsTotal.With(prometheus.Labels{
130-
"type": event.Type,
131-
"involved_object_namespace": event.InvolvedObject.Namespace,
132-
"involved_object_kind": event.InvolvedObject.Kind,
133-
"reason": event.Reason,
134-
}).Add(nbNew)
135-
collector.lock.Unlock()
136-
}
137-
138122
func updatedEventNb(oldEv, newEv *v1.Event) int32 {
139123
if newEv.Series != nil {
140124
if oldEv.Series != nil {

internal/collector/metrics.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
Copyright 2020 Red Hat, Inc. All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package collector
18+
19+
import (
20+
"github.com/prometheus/client_golang/prometheus"
21+
"github.com/rhobs/kube-events-exporter/pkg/informer"
22+
23+
v1 "k8s.io/api/core/v1"
24+
)
25+
26+
type exporterMetrics struct {
27+
eventsTotal *prometheus.CounterVec
28+
listWatchMetrics *informer.ListWatchMetrics
29+
}
30+
31+
func newExporterMetrics(exporterRegistry *prometheus.Registry) *exporterMetrics {
32+
return &exporterMetrics{
33+
eventsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
34+
Name: "kube_events_total",
35+
Help: "Count of all Kubernetes Events",
36+
}, []string{"type", "involved_object_namespace", "involved_object_kind", "reason"}),
37+
listWatchMetrics: informer.NewListWatchMetrics(exporterRegistry),
38+
}
39+
}
40+
41+
func (m *exporterMetrics) increaseEventsTotal(event *v1.Event, nbNew float64) {
42+
m.eventsTotal.With(prometheus.Labels{
43+
"type": event.Type,
44+
"involved_object_namespace": event.InvolvedObject.Namespace,
45+
"involved_object_kind": event.InvolvedObject.Kind,
46+
"reason": event.Reason,
47+
}).Add(nbNew)
48+
}

pkg/informer/event.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2020 Red Hat, Inc. All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package informer
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
v1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/watch"
27+
"k8s.io/client-go/informers/internalinterfaces"
28+
"k8s.io/client-go/kubernetes"
29+
"k8s.io/client-go/tools/cache"
30+
)
31+
32+
// NewInstrumentedEventInformer constructs a new informer for Event type with
33+
// instrumented list watch.
34+
func NewInstrumentedEventInformer(client kubernetes.Interface, namespace string, metrics *ListWatchMetrics, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
35+
return cache.NewSharedIndexInformer(
36+
NewInstrumentedListerWatcher(
37+
NewEventListerWatcher(client, namespace, tweakListOptions),
38+
metrics,
39+
),
40+
&v1.Event{},
41+
resyncPeriod,
42+
indexers,
43+
)
44+
}
45+
46+
// NewEventListerWatcher constructs a new lister watcher for Event type.
47+
func NewEventListerWatcher(client kubernetes.Interface, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.ListerWatcher {
48+
return &cache.ListWatch{
49+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
50+
if tweakListOptions != nil {
51+
tweakListOptions(&options)
52+
}
53+
return client.CoreV1().Events(namespace).List(context.TODO(), options)
54+
},
55+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
56+
if tweakListOptions != nil {
57+
tweakListOptions(&options)
58+
}
59+
return client.CoreV1().Events(namespace).Watch(context.TODO(), options)
60+
},
61+
}
62+
}

0 commit comments

Comments
 (0)