Skip to content

Commit cc101ce

Browse files
authored
add crd binding (#344)
1 parent b4b09bf commit cc101ce

File tree

1 file changed

+46
-5
lines changed

1 file changed

+46
-5
lines changed

internal/collector/workload_rule_collector.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ import (
1717
"k8s.io/client-go/tools/cache"
1818
)
1919

20-
// WorkloadRuleCollector watches WorkloadRule CRD status changes and sends OOM events
20+
// WorkloadRuleCollector watches WorkloadRule CRDs and sends spec/status changes
2121
// back to the control plane via SendResourceBatch.
22+
// Handles two-way sync: sends on Add (kubectl apply), Delete (kubectl delete),
23+
// and Update (spec changes or OOM/throttle status changes).
2224
type WorkloadRuleCollector struct {
2325
client dynamic.Interface
2426
informerFactory dynamicinformer.DynamicSharedInformerFactory
@@ -94,6 +96,16 @@ func (c *WorkloadRuleCollector) Start(ctx context.Context) error {
9496
c.wrInformer = c.informerFactory.ForResource(workloadRuleGVR).Informer()
9597

9698
_, err := c.wrInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
99+
AddFunc: func(obj interface{}) {
100+
wr, ok := obj.(*unstructured.Unstructured)
101+
if !ok {
102+
c.logger.Error(nil, "WorkloadRule AddFunc: failed to convert object to unstructured")
103+
return
104+
}
105+
c.logger.Info("WorkloadRule added, sending to control plane",
106+
"namespace", wr.GetNamespace(), "name", wr.GetName())
107+
c.handleWorkloadRuleEvent(wr, EventTypeAdd)
108+
},
97109
UpdateFunc: func(oldObj, newObj interface{}) {
98110
oldWR := oldObj.(*unstructured.Unstructured)
99111
newWR := newObj.(*unstructured.Unstructured)
@@ -106,11 +118,30 @@ func (c *WorkloadRuleCollector) Start(ctx context.Context) error {
106118
return
107119
}
108120

109-
// Only send when OOM events have changed
110-
if c.oomEventsChanged(oldWR, newWR) {
121+
// Send when spec or OOM/throttle status has changed
122+
if c.specChanged(oldWR, newWR) || c.oomEventsChanged(oldWR, newWR) {
111123
c.handleWorkloadRuleEvent(newWR, EventTypeUpdate)
112124
}
113125
},
126+
DeleteFunc: func(obj interface{}) {
127+
wr, ok := obj.(*unstructured.Unstructured)
128+
if !ok {
129+
// Handle tombstone (object already deleted from cache)
130+
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
131+
if wr, ok = tombstone.Obj.(*unstructured.Unstructured); ok {
132+
c.logger.Info("WorkloadRule deleted (tombstone), sending to control plane",
133+
"namespace", wr.GetNamespace(), "name", wr.GetName())
134+
c.handleWorkloadRuleEvent(wr, EventTypeDelete)
135+
return
136+
}
137+
}
138+
c.logger.Error(nil, "WorkloadRule DeleteFunc: failed to convert object")
139+
return
140+
}
141+
c.logger.Info("WorkloadRule deleted, sending to control plane",
142+
"namespace", wr.GetNamespace(), "name", wr.GetName())
143+
c.handleWorkloadRuleEvent(wr, EventTypeDelete)
144+
},
114145
})
115146
if err != nil {
116147
return fmt.Errorf("failed to add event handler: %w", err)
@@ -140,7 +171,7 @@ func (c *WorkloadRuleCollector) Start(ctx context.Context) error {
140171
return nil
141172
}
142173

143-
// handleWorkloadRuleEvent processes a WorkloadRule status update containing OOM events
174+
// handleWorkloadRuleEvent processes a WorkloadRule event (add, update, or delete)
144175
func (c *WorkloadRuleCollector) handleWorkloadRuleEvent(wr *unstructured.Unstructured, eventType EventType) {
145176
c.mu.RLock()
146177
defer c.mu.RUnlock()
@@ -151,9 +182,10 @@ func (c *WorkloadRuleCollector) handleWorkloadRuleEvent(wr *unstructured.Unstruc
151182
namespace := wr.GetNamespace()
152183
name := wr.GetName()
153184

154-
c.logger.Info("Sending WorkloadRule status with OOM events to control plane",
185+
c.logger.Info("Sending WorkloadRule event to control plane",
155186
"namespace", namespace,
156187
"name", name,
188+
"eventType", eventType.String(),
157189
)
158190
c.batchChan <- CollectedResource{
159191
ResourceType: WorkloadRule,
@@ -180,6 +212,15 @@ func (c *WorkloadRuleCollector) oomEventsChanged(oldWR, newWR *unstructured.Unst
180212
return !reflect.DeepEqual(oldEvents, newEvents)
181213
}
182214

215+
// specChanged detects if the WorkloadRule spec has changed between old and new versions.
216+
// Used for two-way sync: when a user edits the CRD via kubectl, the spec changes
217+
// and the CP needs to be notified.
218+
func (c *WorkloadRuleCollector) specChanged(oldWR, newWR *unstructured.Unstructured) bool {
219+
oldSpec, _, _ := unstructured.NestedMap(oldWR.Object, "spec")
220+
newSpec, _, _ := unstructured.NestedMap(newWR.Object, "spec")
221+
return !reflect.DeepEqual(oldSpec, newSpec)
222+
}
223+
183224
// Stop gracefully shuts down the WorkloadRule collector
184225
func (c *WorkloadRuleCollector) Stop() error {
185226
c.logger.Info("Stopping WorkloadRule collector")

0 commit comments

Comments
 (0)