Skip to content

Commit 0ffee62

Browse files
committed
(chore) optmize classifier deployment
1 parent bf32f7a commit 0ffee62

2 files changed

Lines changed: 144 additions & 59 deletions

File tree

controllers/classifier_deployer.go

Lines changed: 140 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controllers
1919
import (
2020
"context"
2121
"crypto/sha256"
22+
"encoding/base64"
2223
"errors"
2324
"fmt"
2425
"reflect"
@@ -54,6 +55,7 @@ import (
5455
logs "github.com/projectsveltos/libsveltos/lib/logsettings"
5556
"github.com/projectsveltos/libsveltos/lib/patcher"
5657
"github.com/projectsveltos/libsveltos/lib/pullmode"
58+
"github.com/projectsveltos/libsveltos/lib/sharding"
5759
)
5860

5961
type getCurrentHash func(classifier *libsveltosv1beta1.Classifier) []byte
@@ -75,6 +77,22 @@ const (
7577
sveltosAgentClusterTypeLabel = "cluster-type"
7678
)
7779

80+
func (r *ClassifierReconciler) isClusterAShardMatch(ctx context.Context,
81+
clusterInfo *libsveltosv1beta1.ClusterInfo) (bool, error) {
82+
83+
clusterType := clusterproxy.GetClusterType(&clusterInfo.Cluster)
84+
cluster, err := clusterproxy.GetCluster(ctx, r.Client, clusterInfo.Cluster.Namespace,
85+
clusterInfo.Cluster.Name, clusterType)
86+
if err != nil {
87+
if apierrors.IsNotFound(err) {
88+
return true, nil
89+
}
90+
return false, err
91+
}
92+
93+
return sharding.IsShardAMatch(r.ShardKey, cluster), nil
94+
}
95+
7896
const (
7997
// This optional annotation enables **per-cluster configuration overrides** for Sveltos sveltos-agent,
8098
// addressing the limitation of the global configuration.
@@ -120,30 +138,68 @@ func (r *ClassifierReconciler) deployClassifier(ctx context.Context, classifierS
120138
logger.V(logs.LogDebug).Info("request to deploy")
121139

122140
var errorSeen error
123-
allDeployed := true
124-
clusterInfo := make([]libsveltosv1beta1.ClusterInfo, 0)
141+
allProcessed := true
142+
125143
for i := range classifier.Status.ClusterInfo {
126-
c := classifier.Status.ClusterInfo[i]
127-
cInfo, err := r.processClassifier(ctx, classifierScope, r.ControlPlaneEndpoint, &c.Cluster, f, logger)
144+
c := &classifier.Status.ClusterInfo[i]
145+
146+
l := logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s",
147+
c.Cluster.Kind, c.Cluster.Namespace, c.Cluster.Name))
148+
149+
shardMatch, err := r.isClusterAShardMatch(ctx, c)
128150
if err != nil {
129-
errorSeen = err
151+
return err
130152
}
131-
if cInfo != nil {
132-
clusterInfo = append(clusterInfo, *cInfo)
133-
if cInfo.Status != libsveltosv1beta1.SveltosStatusProvisioned {
134-
allDeployed = false
153+
154+
var clusterInfo *libsveltosv1beta1.ClusterInfo
155+
if !shardMatch {
156+
l.V(logs.LogDebug).Info("cluster is not a shard match")
157+
// Since cluster is not a shard match, another deployment will deploy and update
158+
// this specific clusterInfo status. Here we simply return current status.
159+
clusterInfo = c
160+
if clusterInfo.Status != libsveltosv1beta1.SveltosStatusProvisioned {
161+
allProcessed = false
162+
}
163+
// This is a required parameter. It is set by the deployment matching the
164+
// cluster shard. if not set yet, set it to empty
165+
if clusterInfo.Hash == nil {
166+
str := base64.StdEncoding.EncodeToString([]byte("empty"))
167+
clusterInfo.Hash = []byte(str)
168+
}
169+
} else {
170+
clusterInfo, err = r.processClassifier(ctx, classifierScope, r.ControlPlaneEndpoint, &c.Cluster, f, l)
171+
if err != nil {
172+
errorSeen = err
173+
}
174+
if clusterInfo != nil {
175+
classifier.Status.ClusterInfo[i] = *clusterInfo
176+
if clusterInfo.Status != libsveltosv1beta1.SveltosStatusProvisioned {
177+
allProcessed = false
178+
}
135179
}
136180
}
137181
}
138182

183+
// Filter out entries with Statuslibsveltosv1beta1.SveltosStatusRemoved
184+
n := 0
185+
for i := range classifier.Status.ClusterInfo {
186+
if classifier.Status.ClusterInfo[i].Status != libsveltosv1beta1.SveltosStatusRemoved {
187+
classifier.Status.ClusterInfo[n] = classifier.Status.ClusterInfo[i]
188+
n++
189+
}
190+
}
191+
// Truncate the slice to the new length
192+
classifier.Status.ClusterInfo = classifier.Status.ClusterInfo[:n]
193+
139194
// Update Classifier Status
140-
classifierScope.SetClusterInfo(clusterInfo)
195+
logger.V(logs.LogDebug).Info("set clusterInfo")
196+
classifierScope.SetClusterInfo(classifier.Status.ClusterInfo)
141197

142198
if errorSeen != nil {
143199
return errorSeen
144200
}
145201

146-
if !allDeployed {
202+
if !allProcessed {
147203
return fmt.Errorf("request to deploy Classifier is still queued in one ore more clusters")
148204
}
149205

@@ -948,23 +1004,35 @@ func (r *ClassifierReconciler) processClassifier(ctx context.Context, classifier
9481004
cpEndpoint string, cluster *corev1.ObjectReference, f feature, logger logr.Logger,
9491005
) (*libsveltosv1beta1.ClusterInfo, error) {
9501006

951-
logger = logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s", cluster.Kind, cluster.Namespace, cluster.Name))
952-
9531007
// Get Classifier Spec hash (at this very precise moment)
9541008
currentHash, err := r.getCurrentHash(ctx, classifierScope, cpEndpoint, cluster, f, logger)
9551009
if err != nil {
9561010
return nil, err
9571011
}
9581012
classifier := classifierScope.Classifier
9591013

960-
var proceed bool
961-
proceed, err = r.canProceed(ctx, classifierScope, cluster, logger)
1014+
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1015+
Cluster: *cluster,
1016+
Hash: currentHash,
1017+
FailureMessage: nil,
1018+
Status: libsveltosv1beta1.SveltosStatusProvisioning,
1019+
}
1020+
1021+
proceed, err := r.canProceed(ctx, classifierScope, cluster, logger)
9621022
if err != nil {
963-
return nil, err
1023+
failureMessage := err.Error()
1024+
clusterInfo.FailureMessage = &failureMessage
1025+
return clusterInfo, err
9641026
} else if !proceed {
965-
return nil, nil
1027+
failureMessage := "cannot proceed deploying. Either cluster is paused or not ready."
1028+
clusterInfo.FailureMessage = &failureMessage
1029+
return clusterInfo, nil
9661030
}
9671031

1032+
// Remove any queued entry to cleanup
1033+
r.Deployer.CleanupEntries(cluster.Namespace, cluster.Name, classifier.Name, f.id,
1034+
clusterproxy.GetClusterType(cluster), true)
1035+
9681036
// If undeploying feature is in progress, wait for it to complete.
9691037
// Otherwise, if we redeploy feature while same feature is still being cleaned up, if two workers process
9701038
// those request in parallel some resources might end up missing.
@@ -981,14 +1049,17 @@ func (r *ClassifierReconciler) processClassifier(ctx context.Context, classifier
9811049
if !isConfigSame {
9821050
logger.V(logs.LogDebug).Info(fmt.Sprintf("Classifier has changed. Current hash %x. Previous hash %x",
9831051
currentHash, hash))
1052+
} else {
1053+
logger.V(logs.LogDebug).Info("Classifier has not changed")
9841054
}
9851055

9861056
isPullMode, err := clusterproxy.IsClusterInPullMode(ctx, r.Client, cluster.Namespace,
9871057
cluster.Name, clusterproxy.GetClusterType(cluster), logger)
9881058
if err != nil {
9891059
msg := fmt.Sprintf("failed to verify if Cluster is in pull mode: %v", err)
9901060
logger.V(logs.LogDebug).Info(msg)
991-
return nil, err
1061+
clusterInfo.FailureMessage = &msg
1062+
return clusterInfo, err
9921063
}
9931064

9941065
return r.proceedProcessingClassifier(ctx, classifier, cluster, isPullMode, isConfigSame, currentHash, f, logger)
@@ -998,6 +1069,13 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
9981069
cluster *corev1.ObjectReference, isPullMode, isConfigSame bool, currentHash []byte, f feature, logger logr.Logger,
9991070
) (*libsveltosv1beta1.ClusterInfo, error) {
10001071

1072+
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1073+
Cluster: *cluster,
1074+
Hash: currentHash,
1075+
FailureMessage: stringPtr(""),
1076+
Status: libsveltosv1beta1.SveltosStatusProvisioning,
1077+
}
1078+
10011079
_, currentStatus := r.getClassifierInClusterHashAndStatus(classifier, cluster)
10021080

10031081
var deployerStatus *libsveltosv1beta1.SveltosFeatureStatus
@@ -1011,16 +1089,13 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
10111089
}
10121090

10131091
if deployerStatus != nil {
1014-
logger.V(logs.LogDebug).Info("result is available. updating status.")
1015-
var errorMessage string
1092+
logger.V(logs.LogDebug).Info(fmt.Sprintf("result is available %q. updating status.", *deployerStatus))
1093+
1094+
clusterInfo.Status = *deployerStatus
1095+
10161096
if result.Err != nil {
1017-
errorMessage = result.Err.Error()
1018-
}
1019-
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1020-
Cluster: *cluster,
1021-
Status: *deployerStatus,
1022-
Hash: currentHash,
1023-
FailureMessage: &errorMessage,
1097+
errorMessage := result.Err.Error()
1098+
clusterInfo.FailureMessage = &errorMessage
10241099
}
10251100

10261101
if *deployerStatus == libsveltosv1beta1.SveltosStatusProvisioned {
@@ -1037,12 +1112,11 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
10371112
}
10381113
} else if isConfigSame && currentStatus != nil && *currentStatus == libsveltosv1beta1.SveltosStatusProvisioned {
10391114
logger.V(logs.LogDebug).Info("already deployed")
1040-
s := libsveltosv1beta1.SveltosStatusProvisioned
1041-
deployerStatus = &s
1115+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioned
1116+
clusterInfo.FailureMessage = nil
10421117
} else {
10431118
logger.V(logs.LogDebug).Info("no result is available. queue job and mark status as provisioning")
1044-
s := libsveltosv1beta1.SveltosStatusProvisioning
1045-
deployerStatus = &s
1119+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning
10461120

10471121
options := deployer.Options{HandlerOptions: make(map[string]any)}
10481122
options.HandlerOptions[configurationHash] = currentHash
@@ -1060,17 +1134,12 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
10601134
if err := r.Deployer.Deploy(ctx, cluster.Namespace, cluster.Name,
10611135
classifier.Name, f.id, clusterproxy.GetClusterType(cluster), false, handler,
10621136
programDuration, options); err != nil {
1063-
return nil, err
1137+
failureMessage := err.Error()
1138+
clusterInfo.FailureMessage = &failureMessage
1139+
return clusterInfo, err
10641140
}
10651141
}
10661142

1067-
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1068-
Cluster: *cluster,
1069-
Status: *deployerStatus,
1070-
Hash: currentHash,
1071-
FailureMessage: nil,
1072-
}
1073-
10741143
if clusterInfo.Hash == nil {
10751144
panic(1)
10761145
}
@@ -1084,14 +1153,24 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.
10841153

10851154
var pullmodeStatus *libsveltosv1beta1.FeatureStatus
10861155

1156+
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1157+
Cluster: *cluster,
1158+
Hash: currentHash,
1159+
FailureMessage: nil,
1160+
Status: libsveltosv1beta1.SveltosStatusProvisioning,
1161+
}
1162+
10871163
if isConfigSame {
10881164
pullmodeHash, err := pullmode.GetRequestorHash(ctx, getManagementClusterClient(),
10891165
cluster.Namespace, cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger)
10901166
if err != nil {
10911167
if !apierrors.IsNotFound(err) {
10921168
msg := fmt.Sprintf("failed to get pull mode hash: %v", err)
10931169
logger.V(logs.LogDebug).Info(msg)
1094-
return nil, err
1170+
clusterInfo.FailureMessage = &msg
1171+
return clusterInfo, err
1172+
} else {
1173+
isConfigSame = false
10951174
}
10961175
} else {
10971176
isConfigSame = reflect.DeepEqual(pullmodeHash, currentHash)
@@ -1104,46 +1183,40 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.
11041183
var err error
11051184
pullmodeStatus, err = r.proceesAgentDeploymentStatus(ctx, classifier, cluster, f, logger)
11061185
if err != nil {
1107-
return nil, err
1186+
failureMessage := err.Error()
1187+
clusterInfo.FailureMessage = &failureMessage
1188+
return clusterInfo, err
11081189
}
11091190
}
11101191

1111-
clusterInfo := &libsveltosv1beta1.ClusterInfo{
1112-
Cluster: *cluster,
1113-
Hash: currentHash,
1114-
FailureMessage: nil,
1115-
}
1116-
11171192
if pullmodeStatus != nil {
11181193
logger.V(logs.LogDebug).Info(fmt.Sprintf("agent result is available. updating status: %v", *pullmodeStatus))
11191194

11201195
switch *pullmodeStatus {
11211196
case libsveltosv1beta1.FeatureStatusProvisioned:
11221197
if err := pullmode.TerminateDeploymentTracking(ctx, r.Client, cluster.Namespace,
11231198
cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger); err != nil {
1124-
logger.V(logs.LogDebug).Info(fmt.Sprintf("failed to terminate tracking: %v", err))
1125-
return nil, err
1199+
failureMessage := fmt.Sprintf("failed to terminate tracking: %v", err)
1200+
logger.V(logs.LogDebug).Info(failureMessage)
1201+
clusterInfo.FailureMessage = &failureMessage
1202+
return clusterInfo, err
11261203
}
1127-
provisioned := libsveltosv1beta1.SveltosStatusProvisioned
1128-
clusterInfo.Status = provisioned
1204+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioned
11291205
return clusterInfo, nil
11301206
case libsveltosv1beta1.FeatureStatusProvisioning:
11311207
msg := "agent is provisioning the content"
11321208
logger.V(logs.LogDebug).Info(msg)
1133-
provisioning := libsveltosv1beta1.SveltosStatusProvisioning
1134-
clusterInfo.Status = provisioning
1209+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning
11351210
return clusterInfo, nil
11361211
case libsveltosv1beta1.FeatureStatusFailed:
11371212
logger.V(logs.LogDebug).Info("agent failed provisioning the content")
1138-
failed := libsveltosv1beta1.SveltosStatusFailed
1139-
clusterInfo.Status = failed
1213+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusFailed
11401214
case libsveltosv1beta1.FeatureStatusFailedNonRetriable, libsveltosv1beta1.FeatureStatusRemoving,
11411215
libsveltosv1beta1.FeatureStatusAgentRemoving, libsveltosv1beta1.FeatureStatusRemoved:
11421216
logger.V(logs.LogDebug).Info("proceed deploying")
11431217
}
11441218
} else {
1145-
provisioning := libsveltosv1beta1.SveltosStatusProvisioning
1146-
clusterInfo.Status = provisioning
1219+
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning
11471220
}
11481221

11491222
// Getting here means either agent failed to deploy feature or configuration has changed.
@@ -1155,7 +1228,9 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.
11551228
if err := r.Deployer.Deploy(ctx, cluster.Namespace, cluster.Name,
11561229
classifier.Name, f.id, clusterproxy.GetClusterType(cluster), false,
11571230
deployClassifierInCluster, programDuration, options); err != nil {
1158-
return nil, err
1231+
failureMessage := err.Error()
1232+
clusterInfo.FailureMessage = &failureMessage
1233+
return clusterInfo, err
11591234
}
11601235

11611236
return clusterInfo, fmt.Errorf("request to deploy queued")
@@ -1175,7 +1250,11 @@ func (r *ClassifierReconciler) proceesAgentDeploymentStatus(ctx context.Context,
11751250
if pullmode.IsProcessingMismatch(err) {
11761251
provisioning := libsveltosv1beta1.FeatureStatusProvisioning
11771252
return &provisioning, nil
1253+
} else if pullmode.IsActionNotSetToDeploy(err) {
1254+
_ = pullmode.TerminateDeploymentTracking(ctx, r.Client, cluster.Namespace,
1255+
cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger)
11781256
}
1257+
return nil, err
11791258
}
11801259

11811260
return status.DeploymentStatus, err
@@ -2147,10 +2226,12 @@ func getPerClusterPatches(ctx context.Context, c client.Client,
21472226

21482227
patches, err := getSveltosApplierPatchesNew(ctx, c, configMapNamespace, configMapName, logger)
21492228
if err != nil {
2150-
logger.Error(err, "failed to get ConfigMap with drift-detection patches",
2229+
logger.Error(err, "failed to get ConfigMap with patches",
21512230
"configMapNamespace", configMapNamespace, "configMapName", configMapName)
21522231
return nil, err
21532232
}
21542233

2234+
logger.V(logs.LogDebug).Info(fmt.Sprintf("got patches from ConfigMap %s/%s",
2235+
configMapNamespace, configMapName))
21552236
return patches, nil
21562237
}

controllers/utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,7 @@ func deplAssociatedClusterExist(ctx context.Context, c client.Client, depl *apps
216216

217217
return true, "", "", ""
218218
}
219+
220+
func stringPtr(s string) *string {
221+
return &s
222+
}

0 commit comments

Comments
 (0)