Skip to content

Commit a9f281d

Browse files
authored
Refactor setup controllers by passing roleTracker directly instead of options (#8166)
1 parent 9d327a1 commit a9f281d

File tree

38 files changed

+105
-198
lines changed

38 files changed

+105
-198
lines changed

cmd/kueue/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func main() {
291291
os.Exit(1)
292292
}
293293

294-
if failedWebhook, err := webhooks.Setup(mgr, webhooks.WithRoleTracker(roleTracker)); err != nil {
294+
if failedWebhook, err := webhooks.Setup(mgr, roleTracker); err != nil {
295295
setupLog.Error(err, "Unable to create webhook", "webhook", failedWebhook)
296296
os.Exit(1)
297297
}
@@ -352,11 +352,11 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
352352
}
353353

354354
func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.Cache, queues *qcache.Manager, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher, roleTracker *roletracker.RoleTracker) error {
355-
if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg, core.WithSetupRoleTracker(roleTracker)); err != nil {
355+
if failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg, roleTracker); err != nil {
356356
return fmt.Errorf("unable to create controller %s: %w", failedCtrl, err)
357357
}
358358
if features.Enabled(features.FailureRecoveryPolicy) {
359-
if failedCtrlName, err := failurerecovery.SetupControllers(mgr, cfg, failurerecovery.SetupWithRoleTracker(roleTracker)); err != nil {
359+
if failedCtrlName, err := failurerecovery.SetupControllers(mgr, cfg, roleTracker); err != nil {
360360
return fmt.Errorf("could not setup FailureRecovery controller %s: %w", failedCtrlName, err)
361361
}
362362
}
@@ -407,13 +407,13 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.C
407407
return fmt.Errorf("could not setup MultiKueue controller: %w", err)
408408
}
409409

410-
if failedDispatcher, err := dispatcher.SetupControllers(mgr, cfg, dispatcher.WithRoleTracker(roleTracker)); err != nil {
410+
if failedDispatcher, err := dispatcher.SetupControllers(mgr, cfg, roleTracker); err != nil {
411411
return fmt.Errorf("could not setup Dispatcher controller %q for MultiKueue: %w", failedDispatcher, err)
412412
}
413413
}
414414

415415
if features.Enabled(features.TopologyAwareScheduling) {
416-
if failedCtrl, err := tas.SetupControllers(mgr, queues, cCache, cfg, tas.WithRoleTracker(roleTracker)); err != nil {
416+
if failedCtrl, err := tas.SetupControllers(mgr, queues, cCache, cfg, roleTracker); err != nil {
417417
return fmt.Errorf("could not setup TAS controller %s: %w", failedCtrl, err)
418418
}
419419
}

pkg/controller/core/core.go

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,20 @@ const (
3535
updateChBuffer = 10
3636
)
3737

38-
// SetupOption configures SetupControllers.
39-
type SetupOption func(*setupOptions)
40-
41-
type setupOptions struct {
42-
roleTracker *roletracker.RoleTracker
43-
}
44-
45-
// WithSetupRoleTracker sets the role tracker for HA setups.
46-
func WithSetupRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
47-
return func(o *setupOptions) {
48-
o.roleTracker = tracker
49-
}
50-
}
51-
5238
// SetupControllers sets up the core controllers. It returns the name of the
5339
// controller that failed to create and an error, if any.
54-
func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.Cache, cfg *configapi.Configuration, opts ...SetupOption) (string, error) {
55-
options := &setupOptions{}
56-
for _, opt := range opts {
57-
opt(options)
58-
}
59-
rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc, options.roleTracker)
40+
func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.Cache, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) (string, error) {
41+
rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc, roleTracker)
6042
if err := rfRec.SetupWithManager(mgr, cfg); err != nil {
6143
return "ResourceFlavor", err
6244
}
63-
acRec := NewAdmissionCheckReconciler(mgr.GetClient(), qManager, cc, options.roleTracker)
45+
acRec := NewAdmissionCheckReconciler(mgr.GetClient(), qManager, cc, roleTracker)
6446
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
6547
return "AdmissionCheck", err
6648
}
6749
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc,
6850
WithAdmissionFairSharingConfig(cfg.AdmissionFairSharing),
69-
WithRoleTracker(options.roleTracker))
51+
WithRoleTracker(roleTracker))
7052
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
7153
return "LocalQueue", err
7254
}
@@ -76,7 +58,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.
7658
if features.Enabled(features.HierarchicalCohorts) {
7759
cohortRec := NewCohortReconciler(mgr.GetClient(), cc, qManager,
7860
CohortReconcilerWithFairSharing(fairSharingEnabled),
79-
CohortReconcilerWithRoleTracker(options.roleTracker))
61+
CohortReconcilerWithRoleTracker(roleTracker))
8062
if err := cohortRec.SetupWithManager(mgr, cfg); err != nil {
8163
return "Cohort", err
8264
}
@@ -90,7 +72,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.
9072
WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources),
9173
WithFairSharing(fairSharingEnabled),
9274
WithWatchers(watchers...),
93-
WithClusterQueueRoleTracker(options.roleTracker),
75+
WithClusterQueueRoleTracker(roleTracker),
9476
)
9577
rfRec.AddUpdateWatcher(cqRec)
9678
acRec.AddUpdateWatchers(cqRec)
@@ -103,7 +85,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.
10385
WithWorkloadUpdateWatchers(qRec, cqRec),
10486
WithWaitForPodsReady(waitForPodsReady(cfg.WaitForPodsReady)),
10587
WithWorkloadRetention(workloadRetention(cfg.ObjectRetentionPolicies)),
106-
WithWorkloadRoleTracker(options.roleTracker),
88+
WithWorkloadRoleTracker(roleTracker),
10789
)
10890
if features.Enabled(features.DynamicResourceAllocation) {
10991
qManager.SetDRAReconcileChannel(workloadRec.GetDRAReconcileChannel())

pkg/controller/failurerecovery/failurerecovery.go

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,11 @@ import (
2424
"sigs.k8s.io/kueue/pkg/util/roletracker"
2525
)
2626

27-
// SetupOption configures the failurerecovery controllers setup.
28-
type SetupOption func(*setupOptions)
29-
30-
type setupOptions struct {
31-
roleTracker *roletracker.RoleTracker
32-
}
33-
34-
// SetupWithRoleTracker sets the roleTracker for failurerecovery controllers setup.
35-
func SetupWithRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
36-
return func(o *setupOptions) {
37-
o.roleTracker = tracker
38-
}
39-
}
40-
41-
func SetupControllers(mgr manager.Manager, cfg *configapi.Configuration, opts ...SetupOption) (string, error) {
42-
options := &setupOptions{}
43-
for _, opt := range opts {
44-
opt(options)
45-
}
46-
27+
func SetupControllers(mgr manager.Manager, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) (string, error) {
4728
tpRec := NewTerminatingPodReconciler(
4829
mgr.GetClient(),
4930
mgr.GetEventRecorderFor(constants.PodTerminationControllerName),
50-
WithRoleTracker(options.roleTracker),
31+
WithRoleTracker(roleTracker),
5132
)
5233
return tpRec.SetupWithManager(mgr, cfg)
5334
}

pkg/controller/tas/controllers.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,40 +26,22 @@ import (
2626
"sigs.k8s.io/kueue/pkg/util/roletracker"
2727
)
2828

29-
// SetupOption configures the TAS controllers setup.
30-
type SetupOption func(*setupOptions)
31-
32-
type setupOptions struct {
33-
roleTracker *roletracker.RoleTracker
34-
}
35-
36-
// WithRoleTracker sets the roleTracker for TAS controllers.
37-
func WithRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
38-
return func(o *setupOptions) {
39-
o.roleTracker = tracker
40-
}
41-
}
42-
43-
func SetupControllers(mgr ctrl.Manager, queues *qcache.Manager, cache *schdcache.Cache, cfg *configapi.Configuration, opts ...SetupOption) (string, error) {
44-
options := &setupOptions{}
45-
for _, opt := range opts {
46-
opt(options)
47-
}
29+
func SetupControllers(mgr ctrl.Manager, queues *qcache.Manager, cache *schdcache.Cache, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) (string, error) {
4830
recorder := mgr.GetEventRecorderFor(TASResourceFlavorController)
49-
topologyRec := newTopologyReconciler(mgr.GetClient(), queues, cache, options.roleTracker)
31+
topologyRec := newTopologyReconciler(mgr.GetClient(), queues, cache, roleTracker)
5032
if ctrlName, err := topologyRec.setupWithManager(mgr, cfg); err != nil {
5133
return ctrlName, err
5234
}
53-
rfRec := newRfReconciler(mgr.GetClient(), queues, cache, recorder, options.roleTracker)
35+
rfRec := newRfReconciler(mgr.GetClient(), queues, cache, recorder, roleTracker)
5436
if ctrlName, err := rfRec.setupWithManager(mgr, cache, cfg); err != nil {
5537
return ctrlName, err
5638
}
57-
topologyUngater := newTopologyUngater(mgr.GetClient(), options.roleTracker)
39+
topologyUngater := newTopologyUngater(mgr.GetClient(), roleTracker)
5840
if ctrlName, err := topologyUngater.setupWithManager(mgr, cfg); err != nil {
5941
return ctrlName, err
6042
}
6143
if features.Enabled(features.TASFailedNodeReplacement) {
62-
nodeFailureReconciler := newNodeFailureReconciler(mgr.GetClient(), recorder, options.roleTracker)
44+
nodeFailureReconciler := newNodeFailureReconciler(mgr.GetClient(), recorder, roleTracker)
6345
if ctrlName, err := nodeFailureReconciler.SetupWithManager(mgr, cfg); err != nil {
6446
return ctrlName, err
6547
}

pkg/controller/workloaddispatcher/controllers.go

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,7 @@ import (
2424
"sigs.k8s.io/kueue/pkg/util/roletracker"
2525
)
2626

27-
// SetupOption configures the dispatcher controllers setup.
28-
type SetupOption func(*setupOptions)
29-
30-
type setupOptions struct {
31-
roleTracker *roletracker.RoleTracker
32-
}
33-
34-
// WithRoleTracker sets the roleTracker for dispatcher controllers.
35-
func WithRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
36-
return func(o *setupOptions) {
37-
o.roleTracker = tracker
38-
}
39-
}
40-
41-
func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, opts ...SetupOption) (string, error) {
42-
options := &setupOptions{}
43-
for _, opt := range opts {
44-
opt(options)
45-
}
46-
27+
func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) (string, error) {
4728
if *cfg.MultiKueue.DispatcherName != configapi.MultiKueueDispatcherModeIncremental {
4829
return "", nil
4930
}
@@ -53,7 +34,7 @@ func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, opts ...Se
5334
return "", err
5435
}
5536

56-
idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, options.roleTracker)
37+
idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker)
5738
err = idRec.SetupWithManager(mgr, cfg)
5839
if err != nil {
5940
return "multikueue-incremental-dispatcher", err

pkg/webhooks/webhooks.go

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,45 +23,26 @@ import (
2323
"sigs.k8s.io/kueue/pkg/util/roletracker"
2424
)
2525

26-
// SetupOption configures webhook Setup.
27-
type SetupOption func(*setupOptions)
28-
29-
type setupOptions struct {
30-
roleTracker *roletracker.RoleTracker
31-
}
32-
33-
// WithRoleTracker sets the role tracker for HA setups.
34-
func WithRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
35-
return func(o *setupOptions) {
36-
o.roleTracker = tracker
37-
}
38-
}
39-
4026
// Setup sets up the webhooks for core controllers. It returns the name of the
4127
// webhook that failed to create and an error, if any.
42-
func Setup(mgr ctrl.Manager, opts ...SetupOption) (string, error) {
43-
options := &setupOptions{}
44-
for _, opt := range opts {
45-
opt(options)
46-
}
47-
48-
if err := setupWebhookForWorkload(mgr, options.roleTracker); err != nil {
28+
func Setup(mgr ctrl.Manager, roleTracker *roletracker.RoleTracker) (string, error) {
29+
if err := setupWebhookForWorkload(mgr, roleTracker); err != nil {
4930
return "Workload", err
5031
}
5132

52-
if err := setupWebhookForResourceFlavor(mgr, options.roleTracker); err != nil {
33+
if err := setupWebhookForResourceFlavor(mgr, roleTracker); err != nil {
5334
return "ResourceFlavor", err
5435
}
5536

56-
if err := setupWebhookForClusterQueue(mgr, options.roleTracker); err != nil {
37+
if err := setupWebhookForClusterQueue(mgr, roleTracker); err != nil {
5738
return "ClusterQueue", err
5839
}
5940

60-
if err := setupWebhookForCohort(mgr, options.roleTracker); err != nil {
41+
if err := setupWebhookForCohort(mgr, roleTracker); err != nil {
6142
return "Cohort", err
6243
}
6344

64-
if err := setupWebhookForLocalQueue(mgr, options.roleTracker); err != nil {
45+
if err := setupWebhookForLocalQueue(mgr, roleTracker); err != nil {
6546
return "LocalQueue", err
6647
}
6748

test/integration/multikueue/external_job_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ var _ = ginkgo.Describe("MultiKueue", ginkgo.Label("area:multikueue", "feature:m
8484
configuration := &config.Configuration{}
8585
mgr.GetScheme().Default(configuration)
8686

87-
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
87+
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration, nil)
8888
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
8989

90-
failedWebhook, err := webhooks.Setup(mgr)
90+
failedWebhook, err := webhooks.Setup(mgr, nil)
9191
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
9292

9393
// Set up RayJob webhook (but not MultiKueue integration)
@@ -135,7 +135,7 @@ var _ = ginkgo.Describe("MultiKueue", ginkgo.Label("area:multikueue", "feature:m
135135
)
136136
gomega.Expect(err).NotTo(gomega.HaveOccurred())
137137

138-
_, err = dispatcher.SetupControllers(mgr, configuration)
138+
_, err = dispatcher.SetupControllers(mgr, configuration, nil)
139139
gomega.Expect(err).NotTo(gomega.HaveOccurred())
140140
})
141141
})

test/integration/multikueue/suite_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {
137137
configuration := &config.Configuration{}
138138
mgr.GetScheme().Default(configuration)
139139

140-
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
140+
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration, nil)
141141
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
142142

143-
failedWebhook, err := webhooks.Setup(mgr)
143+
failedWebhook, err := webhooks.Setup(mgr, nil)
144144
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
145145

146146
err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
@@ -377,7 +377,7 @@ func managerAndMultiKueueSetup(
377377
},
378378
}
379379
mgr.GetScheme().Default(configuration)
380-
_, err = dispatcher.SetupControllers(mgr, configuration)
380+
_, err = dispatcher.SetupControllers(mgr, configuration, nil)
381381
gomega.Expect(err).NotTo(gomega.HaveOccurred())
382382
}
383383

test/integration/multikueue/tas/suite_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,16 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {
118118
configuration := &config.Configuration{}
119119
mgr.GetScheme().Default(configuration)
120120

121-
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
121+
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration, nil)
122122
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
123123

124-
failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration)
124+
failedCtrl, err = tas.SetupControllers(mgr, queues, cCache, configuration, nil)
125125
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "TAS controller", failedCtrl)
126126

127127
err = tasindexer.SetupIndexes(ctx, mgr.GetFieldIndexer())
128128
gomega.Expect(err).NotTo(gomega.HaveOccurred())
129129

130-
failedWebhook, err := webhooks.Setup(mgr)
130+
failedWebhook, err := webhooks.Setup(mgr, nil)
131131
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
132132

133133
err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer())

test/integration/singlecluster/controller/admissionchecks/provisioning/suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func managerSetup(options ...managerSetupOption) framework.ManagerSetup {
9898
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
9999
gomega.Expect(err).NotTo(gomega.HaveOccurred())
100100

101-
failedWebhook, err := webhooks.Setup(mgr)
101+
failedWebhook, err := webhooks.Setup(mgr, nil)
102102
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
103103

104104
controllersCfg := &config.Configuration{}
@@ -126,7 +126,7 @@ func managerSetup(options ...managerSetupOption) framework.ManagerSetup {
126126
jobframework.EnableIntegration(job.FrameworkName)
127127
}
128128

129-
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg)
129+
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg, nil)
130130
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
131131

132132
err = provisioning.SetupIndexer(ctx, mgr.GetFieldIndexer())

0 commit comments

Comments
 (0)