Skip to content

Commit 37f2f0b

Browse files
feat: add concurrency support in applyset operations (#712)
* feat: add concurrency support in applyset operations Introduce configurable concurrency limit for applyset operations using `sync/errgroup`. This improves performance by enabling concurrent processing of apply api calls. Defaults to the number of objects in the applyset if unspecified. Signed-off-by: Jakob Möller <[email protected]> * feat: remove hardcoded prune concurrency limit in favor of concurrency spec Replaced the fixed `PruneGVKParallelizationLimit` constant with the existing configurable `concurrency` field to allow dynamic adjustment of prune operation parallelism. Updated comments for clarity. Signed-off-by: Jakob Möller <[email protected]> --------- Signed-off-by: Jakob Möller <[email protected]>
1 parent a260048 commit 37f2f0b

File tree

2 files changed

+39
-20
lines changed

2 files changed

+39
-20
lines changed

pkg/applyset/applyset.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ import (
2929
"reflect"
3030
"sort"
3131
"strings"
32+
"sync"
3233

3334
"github.com/go-logr/logr"
35+
"golang.org/x/sync/errgroup"
3436
apierrors "k8s.io/apimachinery/pkg/api/errors"
3537
"k8s.io/apimachinery/pkg/api/meta"
3638
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -116,6 +118,10 @@ type Config struct {
116118

117119
// Log is used to inject the calling reconciler's logger
118120
Log logr.Logger
121+
122+
// Concurrency is the maximum number of concurrent apply and prune operations in a single applyset.
123+
// If not provided, the default value is the number of objects in the applyset.
124+
Concurrency int
119125
}
120126

121127
/*
@@ -186,9 +192,10 @@ func New(
186192
FieldManager: config.FieldManager,
187193
Force: true,
188194
},
189-
//deleteOptions: metav1.DeleteOptions{},
195+
// deleteOptions: metav1.DeleteOptions{},
190196
},
191-
log: config.Log,
197+
log: config.Log,
198+
concurrency: config.Concurrency,
192199
}
193200

194201
gvk := parent.GroupVersionKind()
@@ -259,6 +266,10 @@ type applySet struct {
259266
serverOptions
260267

261268
log logr.Logger
269+
270+
// concurrency is the maximum number of concurrent apply and prune operations in a single applyset.
271+
// If not provided, the default value is the number of objects in the applyset.
272+
concurrency int
262273
}
263274

264275
func (a *applySet) getAndRecordNamespace(obj ApplyableObject, restMapping *meta.RESTMapping) error {
@@ -544,19 +555,35 @@ func (a *applySet) apply(ctx context.Context, dryRun bool) (*ApplyResult, error)
544555
if dryRun {
545556
options.DryRun = []string{"All"}
546557
}
547-
for _, obj := range a.desired.objects {
548558

559+
concurrency := a.concurrency
560+
if concurrency <= 0 {
561+
concurrency = len(a.desired.objects)
562+
}
563+
564+
eg, egctx := errgroup.WithContext(ctx)
565+
eg.SetLimit(concurrency)
566+
567+
// protect concurrent access to write the apply results
568+
var mu sync.Mutex
569+
570+
for _, obj := range a.desired.objects {
549571
dynResource, err := a.resourceClient(obj)
550572
if err != nil {
551573
return results, err
552574
}
553-
lastApplied, err := dynResource.Apply(ctx, obj.GetName(), obj.Unstructured, options)
554-
results.recordApplied(obj, lastApplied, err)
555-
a.log.V(2).Info("applied object", "object", obj.String(), "applied-revision", lastApplied.GetResourceVersion(),
556-
"error", err)
557-
}
558-
559-
return results, nil
575+
eg.Go(func() error {
576+
lastApplied, err := dynResource.Apply(egctx, obj.GetName(), obj.Unstructured, options)
577+
mu.Lock()
578+
defer mu.Unlock()
579+
results.recordApplied(obj, lastApplied, err)
580+
a.log.V(2).Info("applied object", "object", obj.String(), "applied-revision", lastApplied.GetResourceVersion(),
581+
"error", err)
582+
return nil
583+
})
584+
}
585+
586+
return results, eg.Wait()
560587
}
561588

562589
func (a *applySet) prune(ctx context.Context, results *ApplyResult, dryRun bool) (*ApplyResult, error) {

pkg/applyset/prune.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,6 @@ import (
3636
"k8s.io/client-go/dynamic"
3737
)
3838

39-
const (
40-
// This is set to an arbitrary number here for now.
41-
// This ensures we are no unbounded when pruning many GVKs.
42-
// Could be parameterized later on
43-
// TODO (barney-s): Possible parameterization target
44-
PruneGVKParallelizationLimit = 1
45-
)
46-
4739
// PruneObject is an apiserver object that should be deleted as part of prune.
4840
type PruneObject struct {
4941
*unstructured.Unstructured
@@ -124,7 +116,7 @@ func (a *applySet) findAllObjectsToPrune(
124116
}
125117
}
126118

127-
if PruneGVKParallelizationLimit <= 1 {
119+
if a.concurrency <= 1 {
128120
for i := range tasks {
129121
task := tasks[i]
130122
results, err := a.findObjectsToPrune(ctx, dynamicClient, visitedUIDs, task.namespace, task.restMapping)
@@ -135,7 +127,7 @@ func (a *applySet) findAllObjectsToPrune(
135127
}
136128
} else {
137129
group, ctx := errgroup.WithContext(ctx)
138-
group.SetLimit(PruneGVKParallelizationLimit)
130+
group.SetLimit(a.concurrency)
139131
for i := range tasks {
140132
task := tasks[i]
141133
group.Go(func() error {

0 commit comments

Comments
 (0)