Skip to content

Commit 76ac30b

Browse files
authored
Merge pull request #104 from awslabs/client
refactor: consolidate Kubernetes client management into unified `client.Set`
2 parents 7dec222 + 178a769 commit 76ac30b

File tree

10 files changed

+374
-376
lines changed

10 files changed

+374
-376
lines changed

cmd/controller/main.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ import (
3131
"sigs.k8s.io/controller-runtime/pkg/predicate"
3232

3333
xv1alpha1 "github.com/awslabs/kro/api/v1alpha1"
34+
kroclient "github.com/awslabs/kro/internal/client"
3435
resourcegroupctrl "github.com/awslabs/kro/internal/controller/resourcegroup"
3536
"github.com/awslabs/kro/internal/dynamiccontroller"
3637
"github.com/awslabs/kro/internal/graph"
37-
"github.com/awslabs/kro/internal/kubernetes"
3838
//+kubebuilder:scaffold:imports
3939
)
4040

@@ -90,7 +90,14 @@ func main() {
9090

9191
ctrl.SetLogger(rootLogger)
9292

93-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
93+
set, err := kroclient.NewSet(kroclient.Config{})
94+
if err != nil {
95+
setupLog.Error(err, "unable to create client set")
96+
os.Exit(1)
97+
}
98+
restConfig := set.RESTConfig()
99+
100+
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
94101
Scheme: scheme,
95102
Metrics: metricsserver.Options{
96103
BindAddress: metricsAddr,
@@ -115,23 +122,16 @@ func main() {
115122
os.Exit(1)
116123
}
117124

118-
kConfig, _, dynamicClient, crdClient, err := kubernetes.NewClients()
119-
if err != nil {
120-
setupLog.Error(err, "unable to create clients")
121-
os.Exit(1)
122-
}
123-
crdManager := kubernetes.NewCRDClient(crdClient, rootLogger)
124-
125125
dc := dynamiccontroller.NewDynamicController(rootLogger, dynamiccontroller.Config{
126126
Workers: dynamicControllerConcurrentReconciles,
127127
// TODO(a-hilaly): expose these as flags
128128
ShutdownTimeout: 60 * time.Second,
129129
ResyncPeriod: 10 * time.Hour,
130130
QueueMaxRetries: 20,
131-
}, dynamicClient)
131+
}, set.Dynamic())
132132

133133
resourceGroupGraphBuilder, err := graph.NewBuilder(
134-
kConfig,
134+
restConfig,
135135
)
136136
if err != nil {
137137
setupLog.Error(err, "unable to create resource group graph builder")
@@ -141,9 +141,8 @@ func main() {
141141
reconciler := resourcegroupctrl.NewResourceGroupReconciler(
142142
rootLogger,
143143
mgr.GetClient(),
144-
dynamicClient,
144+
set,
145145
allowCRDDeletion,
146-
crdManager,
147146
dc,
148147
resourceGroupGraphBuilder,
149148
)

internal/client/crd.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
package client
14+
15+
import (
16+
"context"
17+
"encoding/json"
18+
"fmt"
19+
"time"
20+
21+
"github.com/go-logr/logr"
22+
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
23+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
24+
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
)
29+
30+
const (
31+
// DefaultPollInterval is the default interval for polling CRD status
32+
defaultPollInterval = 150 * time.Millisecond
33+
// DefaultTimeout is the default timeout for waiting for CRD status
34+
defaultTimeout = 2 * time.Minute
35+
)
36+
37+
var _ CRDClient = &CRDWrapper{}
38+
39+
// CRDClient represents operations for managing CustomResourceDefinitions
40+
type CRDClient interface {
41+
// EnsureCreated ensures a CRD exists and is ready
42+
Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error
43+
44+
// Delete removes a CRD if it exists
45+
Delete(ctx context.Context, name string) error
46+
47+
// Get retrieves a CRD by name
48+
Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error)
49+
}
50+
51+
// CRDWrapper provides a simplified interface for CRD operations
52+
type CRDWrapper struct {
53+
client apiextensionsv1.CustomResourceDefinitionInterface
54+
log logr.Logger
55+
pollInterval time.Duration
56+
timeout time.Duration
57+
}
58+
59+
// CRDWrapperConfig contains configuration for the CRD wrapper
60+
type CRDWrapperConfig struct {
61+
Client *apiextensionsv1.ApiextensionsV1Client
62+
Log logr.Logger
63+
PollInterval time.Duration
64+
Timeout time.Duration
65+
}
66+
67+
// DefaultConfig returns a CRDWrapperConfig with default values
68+
func DefaultCRDWrapperConfig() CRDWrapperConfig {
69+
return CRDWrapperConfig{
70+
PollInterval: defaultPollInterval,
71+
Timeout: defaultTimeout,
72+
}
73+
}
74+
75+
// newCRDWrapper creates a new CRD wrapper
76+
func newCRDWrapper(cfg CRDWrapperConfig) *CRDWrapper {
77+
if cfg.PollInterval == 0 {
78+
cfg.PollInterval = defaultPollInterval
79+
}
80+
if cfg.Timeout == 0 {
81+
cfg.Timeout = defaultTimeout
82+
}
83+
84+
return &CRDWrapper{
85+
client: cfg.Client.CustomResourceDefinitions(),
86+
log: cfg.Log.WithName("crd-wrapper"),
87+
pollInterval: cfg.PollInterval,
88+
timeout: cfg.Timeout,
89+
}
90+
}
91+
92+
// Ensure ensures a CRD exists, up-to-date, and is ready. This can be
93+
// a dangerous operation as it will update the CRD if it already exists.
94+
//
95+
// The caller is responsible for ensuring the CRD, isn't introducing
96+
// breaking changes.
97+
func (w *CRDWrapper) Ensure(ctx context.Context, crd v1.CustomResourceDefinition) error {
98+
_, err := w.Get(ctx, crd.Name)
99+
if err != nil {
100+
if !apierrors.IsNotFound(err) {
101+
return fmt.Errorf("failed to check for existing CRD: %w", err)
102+
}
103+
104+
w.log.Info("Creating CRD", "name", crd.Name)
105+
if err := w.create(ctx, crd); err != nil {
106+
return fmt.Errorf("failed to create CRD: %w", err)
107+
}
108+
} else {
109+
w.log.Info("Updating existing CRD", "name", crd.Name)
110+
if err := w.patch(ctx, crd); err != nil {
111+
return fmt.Errorf("failed to patch CRD: %w", err)
112+
}
113+
}
114+
115+
return w.waitForReady(ctx, crd.Name)
116+
}
117+
118+
// Get retrieves a CRD by name
119+
func (w *CRDWrapper) Get(ctx context.Context, name string) (*v1.CustomResourceDefinition, error) {
120+
return w.client.Get(ctx, name, metav1.GetOptions{})
121+
}
122+
123+
func (w *CRDWrapper) create(ctx context.Context, crd v1.CustomResourceDefinition) error {
124+
_, err := w.client.Create(ctx, &crd, metav1.CreateOptions{})
125+
return err
126+
}
127+
128+
func (w *CRDWrapper) patch(ctx context.Context, newCRD v1.CustomResourceDefinition) error {
129+
patchBytes, err := json.Marshal(newCRD)
130+
if err != nil {
131+
return fmt.Errorf("failed to marshal CRD for patch: %w", err)
132+
}
133+
134+
_, err = w.client.Patch(
135+
ctx,
136+
newCRD.Name,
137+
types.MergePatchType,
138+
patchBytes,
139+
metav1.PatchOptions{},
140+
)
141+
return err
142+
}
143+
144+
// Delete removes a CRD if it exists
145+
func (w *CRDWrapper) Delete(ctx context.Context, name string) error {
146+
w.log.Info("Deleting CRD", "name", name)
147+
148+
err := w.client.Delete(ctx, name, metav1.DeleteOptions{})
149+
if err != nil && !apierrors.IsNotFound(err) {
150+
return fmt.Errorf("failed to delete CRD: %w", err)
151+
}
152+
return nil
153+
}
154+
155+
// waitForReady waits for a CRD to become ready
156+
func (w *CRDWrapper) waitForReady(ctx context.Context, name string) error {
157+
w.log.Info("Waiting for CRD to become ready", "name", name)
158+
159+
return wait.PollUntilContextTimeout(ctx, w.pollInterval, w.timeout, true,
160+
func(ctx context.Context) (bool, error) {
161+
crd, err := w.Get(ctx, name)
162+
if err != nil {
163+
if apierrors.IsNotFound(err) {
164+
return false, nil
165+
}
166+
return false, err
167+
}
168+
169+
for _, cond := range crd.Status.Conditions {
170+
if cond.Type == v1.Established && cond.Status == v1.ConditionTrue {
171+
return true, nil
172+
}
173+
}
174+
175+
return false, nil
176+
})
177+
}

internal/client/set.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
package client
14+
15+
import (
16+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
17+
"k8s.io/client-go/dynamic"
18+
"k8s.io/client-go/kubernetes"
19+
"k8s.io/client-go/rest"
20+
ctrlrtconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
21+
)
22+
23+
const (
24+
// defaultQPS is the default QPS for the client
25+
defaultQPS = 100
26+
// defaultBurst is the default burst for the client
27+
defaultBurst = 150
28+
)
29+
30+
// Set provides a unified interface for different Kubernetes clients
31+
type Set struct {
32+
config *rest.Config
33+
kubernetes *kubernetes.Clientset
34+
dynamic *dynamic.DynamicClient
35+
apiExtensionsV1 *apiextensionsv1.ApiextensionsV1Client
36+
}
37+
38+
// Config holds configuration for client creation
39+
type Config struct {
40+
RestConfig *rest.Config
41+
ImpersonateUser string
42+
}
43+
44+
// NewSet creates a new client Set with the given config
45+
func NewSet(cfg Config) (*Set, error) {
46+
var err error
47+
config := cfg.RestConfig
48+
49+
if config == nil {
50+
config, err = ctrlrtconfig.GetConfig()
51+
if err != nil {
52+
return nil, err
53+
}
54+
}
55+
56+
if cfg.ImpersonateUser != "" {
57+
config = rest.CopyConfig(config)
58+
config.Impersonate = rest.ImpersonationConfig{
59+
UserName: cfg.ImpersonateUser,
60+
}
61+
}
62+
63+
// Set default QPS and burst
64+
if config.QPS == 0 {
65+
config.QPS = defaultQPS
66+
}
67+
if config.Burst == 0 {
68+
config.Burst = defaultBurst
69+
}
70+
71+
c := &Set{config: config}
72+
if err := c.init(); err != nil {
73+
return nil, err
74+
}
75+
76+
return c, nil
77+
}
78+
79+
func (c *Set) init() error {
80+
var err error
81+
82+
c.kubernetes, err = kubernetes.NewForConfig(c.config)
83+
if err != nil {
84+
return err
85+
}
86+
87+
c.dynamic, err = dynamic.NewForConfig(c.config)
88+
if err != nil {
89+
return err
90+
}
91+
92+
c.apiExtensionsV1, err = apiextensionsv1.NewForConfig(c.config)
93+
if err != nil {
94+
return err
95+
}
96+
97+
return nil
98+
}
99+
100+
// Kubernetes returns the standard Kubernetes clientset
101+
func (c *Set) Kubernetes() *kubernetes.Clientset {
102+
return c.kubernetes
103+
}
104+
105+
// Dynamic returns the dynamic client
106+
func (c *Set) Dynamic() *dynamic.DynamicClient {
107+
return c.dynamic
108+
}
109+
110+
// APIExtensionsV1 returns the API extensions client
111+
func (c *Set) APIExtensionsV1() *apiextensionsv1.ApiextensionsV1Client {
112+
return c.apiExtensionsV1
113+
}
114+
115+
// RESTConfig returns a copy of the underlying REST config
116+
func (c *Set) RESTConfig() *rest.Config {
117+
return rest.CopyConfig(c.config)
118+
}
119+
120+
// CRD returns a new CRDWrapper instance
121+
func (s *Set) CRD(cfg CRDWrapperConfig) *CRDWrapper {
122+
if cfg.Client == nil {
123+
cfg.Client = s.apiExtensionsV1
124+
}
125+
126+
return newCRDWrapper(cfg)
127+
}
128+
129+
// WithImpersonation returns a new client that impersonates the given user
130+
func (c *Set) WithImpersonation(user string) (*Set, error) {
131+
return NewSet(Config{
132+
RestConfig: c.config,
133+
ImpersonateUser: user,
134+
})
135+
}

0 commit comments

Comments
 (0)