Skip to content

Commit 58db2bf

Browse files
better oom detection (#370)
* better oom detection * fix data race on mpaPublisher/oomReconciler
1 parent 7329493 commit 58db2bf

7 files changed

Lines changed: 454 additions & 7 deletions

File tree

internal/collector/container_metrics.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package collector
22

3+
import corev1 "k8s.io/api/core/v1"
4+
35
// ContainerMetricsSnapshot represents a strongly-typed snapshot of container resource metrics
46
type ContainerMetricsSnapshot struct {
57
// Container identification
@@ -75,3 +77,43 @@ type ContainerMetricsSnapshot struct {
7577
GpuTotalMemoryMb interface{} `json:"gpuTotalMemoryMb,omitempty"`
7678
IndividualGPUMetrics string `json:"individualGPUMetrics,omitempty"` // JSON string
7779
}
80+
81+
// BuildOOMSnapshot constructs a ContainerMetricsSnapshot for an OOM event.
82+
// Used by both the PodCollector (informer fast path) and OOMReconciler (sweep path)
83+
// to ensure consistent snapshot construction.
84+
func BuildOOMSnapshot(pod *corev1.Pod, cs corev1.ContainerStatus) *ContainerMetricsSnapshot {
85+
workloadName, workloadKind := getWorkloadInfo(pod)
86+
requestBytes, limitBytes := getContainerResources(pod, cs.Name)
87+
88+
var cpuRequestMillis, cpuLimitMillis int64
89+
for _, c := range pod.Spec.Containers {
90+
if c.Name == cs.Name {
91+
if req := c.Resources.Requests.Cpu(); req != nil {
92+
cpuRequestMillis = req.MilliValue()
93+
}
94+
if lim := c.Resources.Limits.Cpu(); lim != nil {
95+
cpuLimitMillis = lim.MilliValue()
96+
}
97+
break
98+
}
99+
}
100+
101+
return &ContainerMetricsSnapshot{
102+
ContainerName: cs.Name,
103+
PodName: pod.Name,
104+
Namespace: pod.Namespace,
105+
NodeName: pod.Spec.NodeName,
106+
WorkloadName: workloadName,
107+
WorkloadKind: workloadKind,
108+
CpuRequestMillis: cpuRequestMillis,
109+
CpuLimitMillis: cpuLimitMillis,
110+
MemoryUsageBytes: limitBytes, // OOM means usage >= limit
111+
MemoryRequestBytes: requestBytes,
112+
MemoryLimitBytes: limitBytes,
113+
PodLabels: pod.Labels,
114+
ContainerRunning: cs.State.Running != nil,
115+
ContainerRestarts: cs.RestartCount > 0,
116+
RestartCount: int64(cs.RestartCount),
117+
LastTerminationReason: ReasonOOMKilled,
118+
}
119+
}

internal/collector/container_resource_collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,10 +585,10 @@ func (c *ContainerResourceCollector) processContainerMetrics(
585585
lastTerminationReason = containerStatus.LastTerminationState.Terminated.Reason
586586
// Detect OOM during container init: Kubernetes reports as "StartError"
587587
// with message containing "OOM-killed" when memory limit is too low
588-
if lastTerminationReason == "StartError" {
588+
if lastTerminationReason == ReasonStartError {
589589
msg := containerStatus.LastTerminationState.Terminated.Message
590590
if strings.Contains(strings.ToLower(msg), "oom") {
591-
lastTerminationReason = "OOMKilled"
591+
lastTerminationReason = ReasonOOMKilled
592592
}
593593
}
594594
}

internal/collector/interface.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,22 @@ type ResourceCollector interface {
391391
// AddResource manually adds a resource to be processed by the collector
392392
AddResource(resource interface{}) error
393393
}
394+
395+
// Kubernetes container termination reason constants. Using constants instead of
396+
// raw strings prevents typo-induced silent failures across the OOM detection paths.
397+
const (
398+
// ReasonOOMKilled is the termination reason kubelet sets when the OOM killer
399+
// terminates a container that exceeded its memory limit.
400+
ReasonOOMKilled = "OOMKilled"
401+
402+
// ReasonStartError is the termination reason for containers that fail during
403+
// init. When the message contains "oom", it indicates an OOM during startup.
404+
ReasonStartError = "StartError"
405+
)
406+
407+
// MpaMetricsPublisher is the interface the collector package uses to publish
408+
// metrics directly to the MPA gRPC stream (bypassing the combinedChannel pipeline).
409+
// Implemented by server.MpaServer.
410+
type MpaMetricsPublisher interface {
411+
PublishMetrics(metrics *ContainerMetricsSnapshot, timestamp time.Time)
412+
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"strings"
6+
"sync"
7+
"time"
8+
9+
"github.com/go-logr/logr"
10+
corev1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/client-go/kubernetes"
13+
)
14+
15+
const (
16+
// oomReconcileInterval is how often the reconciler sweeps pods for missed OOM events.
17+
// 30s balances latency (operator gets missed OOMs within half a minute) against
18+
// K8s API load (one List call per namespace per sweep).
19+
oomReconcileInterval = 30 * time.Second
20+
21+
// oomDeduplicationTTL is how long a seen OOM entry is kept in the dedup map.
22+
// After this, the entry is evicted so a new OOM on the same container (after pod
23+
// recycling) can be detected. 10 minutes covers several reconciliation cycles
24+
// and the operator's emergency response cooldown (default 10s).
25+
oomDeduplicationTTL = 10 * time.Minute
26+
)
27+
28+
// OOMReconcilerMarker is the interface used by PodCollector to mark OOMs as seen,
29+
// preventing the periodic sweep from re-publishing events already sent via the
30+
// real-time informer path.
31+
type OOMReconcilerMarker interface {
32+
MarkSeen(namespace, podName, containerName string, restartCount int32)
33+
}
34+
35+
// oomSeenKey uniquely identifies an OOM event for deduplication.
36+
type oomSeenKey struct {
37+
namespace string
38+
podName string
39+
containerName string
40+
}
41+
42+
type oomSeenEntry struct {
43+
restartCount int32
44+
seenAt time.Time
45+
}
46+
47+
// OOMReconciler periodically sweeps pods for OOM termination states that the
48+
// informer-based PodCollector may have missed (informer coalescing, rapid
49+
// restart-then-recovery, zxporter restart). Detected OOMs are published directly
50+
// to the MPA stream, bypassing the lossy combinedChannel pipeline.
51+
type OOMReconciler struct {
52+
client kubernetes.Interface
53+
namespaces []string
54+
mpaPublisher MpaMetricsPublisher
55+
logger logr.Logger
56+
57+
mu sync.Mutex
58+
seen map[oomSeenKey]oomSeenEntry
59+
}
60+
61+
// NewOOMReconciler creates a new OOM reconciler.
62+
func NewOOMReconciler(
63+
client kubernetes.Interface,
64+
namespaces []string,
65+
mpaPublisher MpaMetricsPublisher,
66+
logger logr.Logger,
67+
) *OOMReconciler {
68+
return &OOMReconciler{
69+
client: client,
70+
namespaces: namespaces,
71+
mpaPublisher: mpaPublisher,
72+
logger: logger.WithName("oom-reconciler"),
73+
seen: make(map[oomSeenKey]oomSeenEntry),
74+
}
75+
}
76+
77+
// MarkSeen records that an OOM event has already been published (by the PodCollector's
78+
// real-time path), so the periodic sweep will skip it.
79+
func (r *OOMReconciler) MarkSeen(namespace, podName, containerName string, restartCount int32) {
80+
r.mu.Lock()
81+
defer r.mu.Unlock()
82+
key := oomSeenKey{namespace: namespace, podName: podName, containerName: containerName}
83+
r.seen[key] = oomSeenEntry{restartCount: restartCount, seenAt: time.Now()}
84+
}
85+
86+
// Start runs the periodic OOM reconciliation loop. Blocks until ctx is cancelled.
87+
func (r *OOMReconciler) Start(ctx context.Context) {
88+
r.logger.Info("Starting OOM reconciler", "interval", oomReconcileInterval, "namespaces", r.namespaces)
89+
ticker := time.NewTicker(oomReconcileInterval)
90+
defer ticker.Stop()
91+
92+
for {
93+
select {
94+
case <-ctx.Done():
95+
r.logger.Info("OOM reconciler stopped")
96+
return
97+
case <-ticker.C:
98+
r.sweep(ctx)
99+
r.evictStaleEntries()
100+
}
101+
}
102+
}
103+
104+
// sweep lists pods in all watched namespaces and checks for OOM termination states.
105+
func (r *OOMReconciler) sweep(ctx context.Context) {
106+
namespaces := r.namespaces
107+
// Empty or single empty string means all namespaces
108+
if len(namespaces) == 0 || (len(namespaces) == 1 && namespaces[0] == "") {
109+
namespaces = []string{""}
110+
}
111+
112+
for _, ns := range namespaces {
113+
if err := r.sweepNamespace(ctx, ns); err != nil {
114+
r.logger.Error(err, "Failed to sweep namespace for OOM events", "namespace", ns)
115+
}
116+
}
117+
}
118+
119+
// sweepNamespace checks all pods in a single namespace for OOM termination.
120+
// Uses chunked listing (500 pods per page) to bound memory usage on large clusters.
121+
func (r *OOMReconciler) sweepNamespace(ctx context.Context, namespace string) error {
122+
continueToken := ""
123+
for {
124+
listOpts := metav1.ListOptions{
125+
Limit: 500,
126+
Continue: continueToken,
127+
}
128+
129+
var podList *corev1.PodList
130+
var err error
131+
if namespace == "" {
132+
podList, err = r.client.CoreV1().Pods("").List(ctx, listOpts)
133+
} else {
134+
podList, err = r.client.CoreV1().Pods(namespace).List(ctx, listOpts)
135+
}
136+
if err != nil {
137+
return err
138+
}
139+
140+
for i := range podList.Items {
141+
r.checkPodForOOM(&podList.Items[i])
142+
}
143+
144+
continueToken = podList.Continue
145+
if continueToken == "" {
146+
break
147+
}
148+
}
149+
return nil
150+
}
151+
152+
// checkPodForOOM inspects container statuses for OOM termination and publishes
153+
// to the MPA stream if the OOM hasn't been seen before.
154+
func (r *OOMReconciler) checkPodForOOM(pod *corev1.Pod) {
155+
for _, cs := range pod.Status.ContainerStatuses {
156+
if !isOOMTermination(cs) {
157+
continue
158+
}
159+
160+
key := oomSeenKey{
161+
namespace: pod.Namespace,
162+
podName: pod.Name,
163+
containerName: cs.Name,
164+
}
165+
166+
r.mu.Lock()
167+
entry, exists := r.seen[key]
168+
alreadySeen := exists && entry.restartCount >= cs.RestartCount
169+
if !alreadySeen {
170+
r.seen[key] = oomSeenEntry{restartCount: cs.RestartCount, seenAt: time.Now()}
171+
}
172+
r.mu.Unlock()
173+
174+
if alreadySeen {
175+
continue
176+
}
177+
178+
r.publishOOM(pod, cs)
179+
}
180+
}
181+
182+
// isOOMTermination returns true if the container's last termination was OOM-related.
183+
func isOOMTermination(cs corev1.ContainerStatus) bool {
184+
terminated := cs.LastTerminationState.Terminated
185+
if terminated == nil {
186+
return false
187+
}
188+
if terminated.Reason == ReasonOOMKilled {
189+
return true
190+
}
191+
// Kubernetes reports init-container OOM as StartError with "oom" in message
192+
if terminated.Reason == ReasonStartError && strings.Contains(strings.ToLower(terminated.Message), "oom") {
193+
return true
194+
}
195+
return false
196+
}
197+
198+
// publishOOM sends a synthetic OOM metric snapshot directly to the MPA stream.
199+
func (r *OOMReconciler) publishOOM(pod *corev1.Pod, cs corev1.ContainerStatus) {
200+
if r.mpaPublisher == nil {
201+
return
202+
}
203+
204+
snapshot := BuildOOMSnapshot(pod, cs)
205+
r.mpaPublisher.PublishMetrics(snapshot, time.Now())
206+
207+
r.logger.Info("OOM reconciler: published missed OOM event to MPA stream",
208+
"namespace", pod.Namespace,
209+
"pod", pod.Name,
210+
"container", cs.Name,
211+
"restartCount", cs.RestartCount,
212+
"memoryLimitBytes", snapshot.MemoryLimitBytes,
213+
"memoryRequestBytes", snapshot.MemoryRequestBytes,
214+
"workloadName", snapshot.WorkloadName,
215+
"workloadKind", snapshot.WorkloadKind)
216+
}
217+
218+
// evictStaleEntries removes dedup entries older than oomDeduplicationTTL.
219+
func (r *OOMReconciler) evictStaleEntries() {
220+
r.mu.Lock()
221+
defer r.mu.Unlock()
222+
cutoff := time.Now().Add(-oomDeduplicationTTL)
223+
for key, entry := range r.seen {
224+
if entry.seenAt.Before(cutoff) {
225+
delete(r.seen, key)
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)