Skip to content

Commit b07ffd8

Browse files
authored
Use WorkloadRequestUseMergePatch on UpdateStatus(). (#8029)
* Use WorkloadRequestUseMergePatch on UpdateStatus(). * Add unit tests.
1 parent ee02f0f commit b07ffd8

File tree

9 files changed

+108
-41
lines changed

9 files changed

+108
-41
lines changed

pkg/controller/jobframework/reconciler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
516516
condition := generatePodsReadyCondition(ctx, job, wl, r.clock)
517517
if !workload.HasConditionWithTypeAndReason(wl, &condition) {
518518
log.V(3).Info("Updating the PodsReady condition", "reason", condition.Reason, "status", condition.Status)
519-
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
520-
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName, r.clock)
519+
err := workload.SetConditionAndUpdate(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName, r.clock)
521520
if err != nil {
522521
log.Error(err, "Updating workload status")
523522
return ctrl.Result{}, client.IgnoreNotFound(err)

pkg/workload/workload.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -561,29 +561,60 @@ func totalRequestsFromAdmission(wl *kueue.Workload) []PodSetResources {
561561
return res
562562
}
563563

564-
// UpdateStatus updates the condition of a workload with ssa,
565-
// fieldManager being set to managerPrefix + "-" + conditionType
566-
func UpdateStatus(ctx context.Context,
564+
// SetConditionAndUpdate sets (or replaces) a single condition in a Workload's status.
565+
//
566+
// Behaviour depends on the feature gate WorkloadRequestUseMergePatch:
567+
//
568+
// - Enabled → uses merge-patch via PatchStatus (preserves other conditions,
569+
// safe for concurrent controllers).
570+
//
571+
// - Disabled → uses server-side apply with field manager
572+
// "<managerPrefix>-<conditionType>" (legacy path; only the written condition
573+
// is managed by this controller).
574+
//
575+
// The condition gets:
576+
// - ObservedGeneration = wl.Generation
577+
// - LastTransitionTime = clock.Now()
578+
// - Message truncated to the allowed length
579+
func SetConditionAndUpdate(ctx context.Context,
567580
c client.Client,
568581
wl *kueue.Workload,
569582
conditionType string,
570583
conditionStatus metav1.ConditionStatus,
571584
reason, message string,
572585
managerPrefix string,
573-
clock clock.Clock) error {
574-
now := metav1.NewTime(clock.Now())
586+
clock clock.Clock,
587+
) error {
575588
condition := metav1.Condition{
576589
Type: conditionType,
577590
Status: conditionStatus,
578-
LastTransitionTime: now,
591+
ObservedGeneration: wl.Generation,
592+
LastTransitionTime: metav1.NewTime(clock.Now()),
579593
Reason: reason,
580594
Message: api.TruncateConditionMessage(message),
581-
ObservedGeneration: wl.Generation,
582595
}
583596

584-
newWl := BaseSSAWorkload(wl, false)
585-
newWl.Status.Conditions = []metav1.Condition{condition}
586-
return c.Status().Patch(ctx, newWl, client.Apply, client.FieldOwner(managerPrefix+"-"+condition.Type))
597+
var (
598+
wlCopy *kueue.Workload
599+
err error
600+
)
601+
602+
if features.Enabled(features.WorkloadRequestUseMergePatch) {
603+
wlCopy = wl.DeepCopy()
604+
err = clientutil.PatchStatus(ctx, c, wlCopy, func() (bool, error) {
605+
return apimeta.SetStatusCondition(&wlCopy.Status.Conditions, condition), nil
606+
})
607+
} else {
608+
wlCopy = BaseSSAWorkload(wl, true)
609+
wlCopy.Status.Conditions = []metav1.Condition{condition}
610+
err = c.Status().Patch(ctx, wlCopy, client.Apply, client.FieldOwner(managerPrefix+"-"+condition.Type))
611+
}
612+
if err != nil {
613+
return err
614+
}
615+
616+
wlCopy.DeepCopyInto(wl)
617+
return nil
587618
}
588619

589620
// UnsetQuotaReservationWithCondition sets the QuotaReserved condition to false, clears
@@ -1223,7 +1254,7 @@ func CreatePodsReadyCondition(status metav1.ConditionStatus, reason, message str
12231254
Reason: reason,
12241255
Message: message,
12251256
LastTransitionTime: metav1.NewTime(clock.Now()),
1226-
// ObservedGeneration is added via workload.UpdateStatus
1257+
// ObservedGeneration is added via workload.SetConditionAndUpdate
12271258
}
12281259
}
12291260

pkg/workload/workload_test.go

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package workload
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"testing"
@@ -32,6 +33,7 @@ import (
3233
testingclock "k8s.io/utils/clock/testing"
3334
"k8s.io/utils/ptr"
3435
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
3537

3638
config "sigs.k8s.io/kueue/apis/config/v1beta2"
3739
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
@@ -44,6 +46,10 @@ import (
4446
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
4547
)
4648

49+
var (
50+
errTest = errors.New("test error")
51+
)
52+
4753
func TestNewInfo(t *testing.T) {
4854
cases := map[string]struct {
4955
workload kueue.Workload
@@ -450,7 +456,7 @@ func TestNewInfo(t *testing.T) {
450456
}
451457
}
452458

453-
func TestUpdateWorkloadStatus(t *testing.T) {
459+
func TestSetConditionAndUpdate(t *testing.T) {
454460
now := time.Now()
455461
fakeClock := testingclock.NewFakeClock(now)
456462
cases := map[string]struct {
@@ -459,7 +465,9 @@ func TestUpdateWorkloadStatus(t *testing.T) {
459465
condStatus metav1.ConditionStatus
460466
reason string
461467
message string
468+
err error
462469
wantStatus kueue.WorkloadStatus
470+
wantErr error
463471
}{
464472
"initial empty": {
465473
condType: kueue.WorkloadQuotaReserved,
@@ -478,6 +486,15 @@ func TestUpdateWorkloadStatus(t *testing.T) {
478486
},
479487
},
480488
},
489+
"initial empty with error": {
490+
condType: kueue.WorkloadQuotaReserved,
491+
condStatus: metav1.ConditionFalse,
492+
reason: "Pending",
493+
message: "didn't fit",
494+
err: errTest,
495+
wantStatus: kueue.WorkloadStatus{},
496+
wantErr: errTest,
497+
},
481498
"same condition type": {
482499
oldStatus: kueue.WorkloadStatus{
483500
Conditions: []metav1.Condition{
@@ -505,27 +522,47 @@ func TestUpdateWorkloadStatus(t *testing.T) {
505522
},
506523
}
507524
for name, tc := range cases {
508-
t.Run(name, func(t *testing.T) {
509-
ctx, _ := utiltesting.ContextWithLog(t)
510-
workload := utiltestingapi.MakeWorkload("foo", "bar").Generation(1).Obj()
511-
workload.Status = tc.oldStatus
512-
cl := utiltesting.NewFakeClientSSAAsSM(workload)
513-
err := UpdateStatus(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix", fakeClock)
514-
if err != nil {
515-
t.Fatalf("Failed updating status: %v", err)
516-
}
517-
var updatedWl kueue.Workload
518-
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
519-
t.Fatalf("Failed obtaining updated object: %v", err)
520-
}
521-
if diff := cmp.Diff(
522-
tc.wantStatus,
523-
updatedWl.Status,
524-
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
525-
); diff != "" {
526-
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
527-
}
528-
})
525+
for _, useMergePatch := range []bool{false, true} {
526+
t.Run(name, func(t *testing.T) {
527+
features.SetFeatureGateDuringTest(t, features.WorkloadRequestUseMergePatch, useMergePatch)
528+
529+
ctx, _ := utiltesting.ContextWithLog(t)
530+
531+
workload := utiltestingapi.MakeWorkload("foo", "bar").Generation(1).Obj()
532+
workload.Status = tc.oldStatus
533+
534+
cl := utiltesting.NewClientBuilder().
535+
WithObjects(workload).
536+
WithStatusSubresource(&kueue.Workload{}).
537+
WithInterceptorFuncs(interceptor.Funcs{
538+
SubResourcePatch: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
539+
if tc.err != nil {
540+
return tc.err
541+
}
542+
return utiltesting.TreatSSAAsStrategicMerge(ctx, c, subResourceName, obj, patch, opts...)
543+
},
544+
}).
545+
Build()
546+
547+
err := SetConditionAndUpdate(ctx, cl, workload, tc.condType, tc.condStatus, tc.reason, tc.message, "manager-prefix", fakeClock)
548+
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
549+
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
550+
}
551+
552+
var updatedWl kueue.Workload
553+
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
554+
t.Fatalf("Failed obtaining updated object: %v", err)
555+
}
556+
557+
if diff := cmp.Diff(
558+
tc.wantStatus,
559+
updatedWl.Status,
560+
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
561+
); diff != "" {
562+
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
563+
}
564+
})
565+
}
529566
}
530567
}
531568

test/integration/singlecluster/controller/jobs/appwrapper/appwrapper_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ var _ = ginkgo.Describe("AppWrapper controller", ginkgo.Ordered, ginkgo.Continue
283283
ginkgo.By("preempt the workload", func() {
284284
gomega.Eventually(func(g gomega.Gomega) {
285285
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
286-
g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
286+
g.Expect(workload.SetConditionAndUpdate(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
287287
}, util.Timeout, util.Interval).Should(gomega.Succeed())
288288
})
289289

test/integration/singlecluster/controller/jobs/job/job_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
630630
ginkgo.By("preempt the workload", func() {
631631
gomega.Eventually(func(g gomega.Gomega) {
632632
g.Expect(k8sClient.Get(ctx, wlLookupKey, wl)).To(gomega.Succeed())
633-
g.Expect(workload.UpdateStatus(
633+
g.Expect(workload.SetConditionAndUpdate(
634634
ctx,
635635
k8sClient,
636636
wl,

test/integration/singlecluster/controller/jobs/jobset/jobset_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ var _ = ginkgo.Describe("JobSet controller", ginkgo.Ordered, ginkgo.ContinueOnFa
352352
ginkgo.By("preempt the workload", func() {
353353
gomega.Eventually(func(g gomega.Gomega) {
354354
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
355-
g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
355+
g.Expect(workload.SetConditionAndUpdate(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
356356
}, util.Timeout, util.Interval).Should(gomega.Succeed())
357357
})
358358

test/integration/singlecluster/controller/jobs/pod/pod_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
329329
ginkgo.By("checking that pod is stopped when workload is evicted")
330330

331331
gomega.Expect(
332-
workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue,
332+
workload.SetConditionAndUpdate(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue,
333333
kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock),
334334
).Should(gomega.Succeed())
335335
util.FinishEvictionForWorkloads(ctx, k8sClient, createdWorkload)

test/integration/singlecluster/controller/jobs/trainjob/trainjob_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ var _ = ginkgo.Describe("Trainjob controller", ginkgo.Ordered, ginkgo.ContinueOn
259259
ginkgo.By("preempt the workload", func() {
260260
gomega.Eventually(func(g gomega.Gomega) {
261261
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
262-
g.Expect(workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
262+
g.Expect(workload.SetConditionAndUpdate(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "By test", "evict", util.RealClock)).To(gomega.Succeed())
263263
}, util.Timeout, util.Interval).Should(gomega.Succeed())
264264
})
265265

test/performance/scheduler/runner/controller/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
143143
if remaining > 0 {
144144
return reconcile.Result{RequeueAfter: remaining}, nil
145145
} else {
146-
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName, r.clock)
146+
err := workload.SetConditionAndUpdate(ctx, r.client, &wl, kueue.WorkloadFinished, metav1.ConditionTrue, "ByTest", "By test runner", constants.JobControllerName, r.clock)
147147
if err == nil {
148148
log.V(5).Info("Finish Workload")
149149
}

0 commit comments

Comments
 (0)