Skip to content

Commit 509bf8a

Browse files
committed
Use WorkloadRequestUseMergePatch in UpdateReclaimablePods().
1 parent b07ffd8 commit 509bf8a

File tree

6 files changed

+122
-38
lines changed

6 files changed

+122
-38
lines changed

pkg/workload/workload.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,9 +1109,28 @@ func HasQuotaReservation(w *kueue.Workload) bool {
11091109

11101110
// UpdateReclaimablePods updates the ReclaimablePods list for the workload with SSA.
11111111
func UpdateReclaimablePods(ctx context.Context, c client.Client, w *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) error {
1112-
patch := BaseSSAWorkload(w, false)
1113-
patch.Status.ReclaimablePods = reclaimablePods
1114-
return c.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(constants.ReclaimablePodsMgr))
1112+
var (
1113+
wlCopy *kueue.Workload
1114+
err error
1115+
)
1116+
1117+
if features.Enabled(features.WorkloadRequestUseMergePatch) {
1118+
wlCopy = w.DeepCopy()
1119+
err = clientutil.PatchStatus(ctx, c, wlCopy, func() (bool, error) {
1120+
wlCopy.Status.ReclaimablePods = reclaimablePods
1121+
return true, nil
1122+
})
1123+
} else {
1124+
wlCopy = BaseSSAWorkload(w, true)
1125+
wlCopy.Status.ReclaimablePods = reclaimablePods
1126+
err = c.Status().Patch(ctx, wlCopy, client.Apply, client.FieldOwner(constants.ReclaimablePodsMgr))
1127+
}
1128+
if err != nil {
1129+
return err
1130+
}
1131+
1132+
wlCopy.DeepCopyInto(w)
1133+
return nil
11151134
}
11161135

11171136
// ReclaimablePodsAreEqual checks if two Reclaimable pods are semantically equal

pkg/workload/workload_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,77 @@ func TestSetConditionAndUpdate(t *testing.T) {
566566
}
567567
}
568568

569+
func TestUpdateReclaimablePods(t *testing.T) {
570+
cases := map[string]struct {
571+
oldStatus kueue.WorkloadStatus
572+
reclaimablePods []kueue.ReclaimablePod
573+
err error
574+
wantStatus kueue.WorkloadStatus
575+
wantErr error
576+
}{
577+
"set reclaimable pods": {
578+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
579+
wantStatus: kueue.WorkloadStatus{
580+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
581+
},
582+
},
583+
"set reclaimable pods with error": {
584+
err: errTest,
585+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
586+
wantStatus: kueue.WorkloadStatus{},
587+
wantErr: errTest,
588+
},
589+
"update reclaimable pods": {
590+
oldStatus: kueue.WorkloadStatus{
591+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
592+
},
593+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
594+
wantStatus: kueue.WorkloadStatus{
595+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
596+
},
597+
},
598+
}
599+
for name, tc := range cases {
600+
for _, useMergePatch := range []bool{false, true} {
601+
t.Run(fmt.Sprintf("%s with WorkloadRequestUseMergePatch enabled %t", name, useMergePatch), func(t *testing.T) {
602+
features.SetFeatureGateDuringTest(t, features.WorkloadRequestUseMergePatch, useMergePatch)
603+
604+
ctx, _ := utiltesting.ContextWithLog(t)
605+
606+
workload := utiltestingapi.MakeWorkload("foo", metav1.NamespaceDefault).Obj()
607+
workload.Status = tc.oldStatus
608+
609+
cl := utiltesting.NewClientBuilder().
610+
WithObjects(workload).
611+
WithStatusSubresource(&kueue.Workload{}).
612+
WithInterceptorFuncs(interceptor.Funcs{
613+
SubResourcePatch: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
614+
if tc.err != nil {
615+
return tc.err
616+
}
617+
return utiltesting.TreatSSAAsStrategicMerge(ctx, c, subResourceName, obj, patch, opts...)
618+
},
619+
}).
620+
Build()
621+
622+
err := UpdateReclaimablePods(ctx, cl, workload, tc.reclaimablePods)
623+
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
624+
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
625+
}
626+
627+
var updatedWl kueue.Workload
628+
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
629+
t.Fatalf("Failed obtaining updated object: %v", err)
630+
}
631+
632+
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status); diff != "" {
633+
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
634+
}
635+
})
636+
}
637+
}
638+
}
639+
569640
func TestGetQueueOrderTimestamp(t *testing.T) {
570641
var (
571642
evictionOrdering = Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}

test/integration/singlecluster/controller/core/clusterqueue_controller_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/kueue/pkg/metrics"
3131
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
3232
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
33-
"sigs.k8s.io/kueue/pkg/workload"
3433
"sigs.k8s.io/kueue/test/integration/framework"
3534
"sigs.k8s.io/kueue/test/util"
3635
)
@@ -513,8 +512,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
513512
}, util.Timeout, util.Interval).Should(gomega.Succeed())
514513

515514
ginkgo.By("Mark two workers as reclaimable", func() {
516-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
517-
515+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
518516
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
519517
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
520518
gomega.Eventually(func(g gomega.Gomega) {
@@ -552,8 +550,8 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
552550
})
553551

554552
ginkgo.By("Mark all workers and a driver as reclaimable", func() {
555-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}})).To(gomega.Succeed())
556-
553+
reclaimablePods := []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}}
554+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
557555
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
558556
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
559557
gomega.Eventually(func(g gomega.Gomega) {
@@ -940,7 +938,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
940938
})
941939

942940
ginkgo.By("Marking two workers as reclaimable", func() {
943-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
941+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
944942
})
945943

946944
ginkgo.By("Validating CQ status hasn't changed", func() {

test/integration/singlecluster/scheduler/scheduler_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
470470
})
471471

472472
ginkgo.By("Reclaim one pod from the first workload", func() {
473-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
474-
473+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
475474
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
476475
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
477476
})
@@ -494,8 +493,8 @@ var _ = ginkgo.Describe("Scheduler", func() {
494493
})
495494

496495
ginkgo.By("Reclaim two pods from the second workload so that the first workload is resumed", func() {
497-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, secondWl, []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}})).To(gomega.Succeed())
498-
496+
reclaimablePods := []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}}
497+
util.UpdateReclaimablePods(ctx, k8sClient, secondWl, reclaimablePods)
499498
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl, secondWl)
500499
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
501500
util.ExpectReservingActiveWorkloadsMetric(preemptionClusterQ, 2)
@@ -531,7 +530,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
531530
})
532531

533532
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
534-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
533+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
535534
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
536535
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 2)
537536
})
@@ -567,7 +566,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
567566
})
568567

569568
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
570-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
569+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
571570
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl)
572571
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 1)
573572
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
@@ -734,8 +733,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
734733
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 0)
735734
})
736735
ginkgo.By("Mark one pod as reclaimable", func() {
737-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})).To(gomega.Succeed())
738-
736+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})
739737
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wl)
740738
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
741739
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 1)

test/integration/singlecluster/webhook/core/workload_test.go

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,10 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
528528
Obj()
529529
util.MustCreate(ctx, k8sClient, wl)
530530

531-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
532-
{Name: "ps1", Count: 4},
533-
{Name: "ps2", Count: 1},
534-
})
535-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
531+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 4}, {Name: "ps2", Count: 1}}
532+
gomega.Eventually(func(g gomega.Gomega) {
533+
g.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)).Should(utiltesting.BeForbiddenError())
534+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
536535
})
537536
})
538537

@@ -1139,9 +1138,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11391138
).
11401139
Obj()
11411140
util.MustCreate(ctx, k8sClient, wl)
1142-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1143-
{Name: "ps1", Count: 1},
1144-
})).Should(gomega.Succeed())
1141+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11451142

11461143
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11471144
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1151,11 +1148,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11511148
Obj())
11521149

11531150
ginkgo.By("Updating reclaimable pods")
1154-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1155-
{Name: "ps1", Count: 2},
1156-
{Name: "ps2", Count: 1},
1157-
})
1158-
gomega.Expect(err).Should(gomega.Succeed())
1151+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1152+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11591153
})
11601154

11611155
ginkgo.It("reclaimable pod count cannot change down", func() {
@@ -1167,10 +1161,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11671161
).
11681162
Obj()
11691163
util.MustCreate(ctx, k8sClient, wl)
1170-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1171-
{Name: "ps1", Count: 2},
1172-
{Name: "ps2", Count: 1},
1173-
})).Should(gomega.Succeed())
1164+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1165+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11741166

11751167
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11761168
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1180,10 +1172,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11801172
Obj())
11811173

11821174
ginkgo.By("Updating reclaimable pods")
1183-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1184-
{Name: "ps1", Count: 1},
1185-
})
1186-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
1175+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11871176
})
11881177

11891178
ginkgo.It("podSetUpdates should be immutable when state is ready", func() {

test/util/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,3 +1385,12 @@ func ExpectJobToBeCompleted(ctx context.Context, c client.Client, job *batchv1.J
13851385
cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastTransitionTime", "LastProbeTime", "Reason", "Message"))))
13861386
}, LongTimeout, Interval).Should(gomega.Succeed())
13871387
}
1388+
1389+
func UpdateReclaimablePods(ctx context.Context, c client.Client, wl *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) {
1390+
ginkgo.GinkgoHelper()
1391+
createdWl := &kueue.Workload{}
1392+
gomega.Eventually(func(g gomega.Gomega) {
1393+
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(wl), createdWl)).To(gomega.Succeed())
1394+
g.Expect(workload.UpdateReclaimablePods(ctx, c, createdWl, reclaimablePods)).To(gomega.Succeed())
1395+
}, Timeout, Interval).Should(gomega.Succeed())
1396+
}

0 commit comments

Comments
 (0)