Skip to content

Commit f274ec1

Browse files
committed
Use WorkloadRequestUseMergePatch in UpdateReclaimablePods().
1 parent 1f509f4 commit f274ec1

File tree

6 files changed

+129
-38
lines changed

6 files changed

+129
-38
lines changed

pkg/workload/workload.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,9 +1104,28 @@ func HasQuotaReservation(w *kueue.Workload) bool {
11041104

11051105
// UpdateReclaimablePods updates the ReclaimablePods list for the workload with SSA.
11061106
func UpdateReclaimablePods(ctx context.Context, c client.Client, w *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) error {
1107-
patch := BaseSSAWorkload(w, false)
1108-
patch.Status.ReclaimablePods = reclaimablePods
1109-
return c.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(constants.ReclaimablePodsMgr))
1107+
var (
1108+
wlCopy *kueue.Workload
1109+
err error
1110+
)
1111+
1112+
if features.Enabled(features.WorkloadRequestUseMergePatch) {
1113+
wlCopy = w.DeepCopy()
1114+
err = clientutil.PatchStatus(ctx, c, wlCopy, func() (bool, error) {
1115+
wlCopy.Status.ReclaimablePods = reclaimablePods
1116+
return true, nil
1117+
})
1118+
} else {
1119+
wlCopy = BaseSSAWorkload(w, true)
1120+
wlCopy.Status.ReclaimablePods = reclaimablePods
1121+
err = c.Status().Patch(ctx, wlCopy, client.Apply, client.FieldOwner(constants.ReclaimablePodsMgr))
1122+
}
1123+
if err != nil {
1124+
return err
1125+
}
1126+
1127+
wlCopy.DeepCopyInto(w)
1128+
return nil
11101129
}
11111130

11121131
// ReclaimablePodsAreEqual checks if two Reclaimable pods are semantically equal

pkg/workload/workload_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,77 @@ func TestSetConditionAndUpdate(t *testing.T) {
566566
}
567567
}
568568

569+
func TestUpdateReclaimablePods(t *testing.T) {
570+
cases := map[string]struct {
571+
oldStatus kueue.WorkloadStatus
572+
reclaimablePods []kueue.ReclaimablePod
573+
err error
574+
wantStatus kueue.WorkloadStatus
575+
wantErr error
576+
}{
577+
"set reclaimable pods": {
578+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
579+
wantStatus: kueue.WorkloadStatus{
580+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
581+
},
582+
},
583+
"set reclaimable pods with error": {
584+
err: errTest,
585+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
586+
wantStatus: kueue.WorkloadStatus{},
587+
wantErr: errTest,
588+
},
589+
"update reclaimable pods": {
590+
oldStatus: kueue.WorkloadStatus{
591+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
592+
},
593+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
594+
wantStatus: kueue.WorkloadStatus{
595+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
596+
},
597+
},
598+
}
599+
for name, tc := range cases {
600+
for _, useMergePatch := range []bool{false, true} {
601+
t.Run(fmt.Sprintf("%s with WorkloadRequestUseMergePatch enabled %t", name, useMergePatch), func(t *testing.T) {
602+
features.SetFeatureGateDuringTest(t, features.WorkloadRequestUseMergePatch, useMergePatch)
603+
604+
ctx, _ := utiltesting.ContextWithLog(t)
605+
606+
workload := utiltestingapi.MakeWorkload("foo", metav1.NamespaceDefault).Obj()
607+
workload.Status = tc.oldStatus
608+
609+
cl := utiltesting.NewClientBuilder().
610+
WithObjects(workload).
611+
WithStatusSubresource(&kueue.Workload{}).
612+
WithInterceptorFuncs(interceptor.Funcs{
613+
SubResourcePatch: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
614+
if tc.err != nil {
615+
return tc.err
616+
}
617+
return utiltesting.TreatSSAAsStrategicMerge(ctx, c, subResourceName, obj, patch, opts...)
618+
},
619+
}).
620+
Build()
621+
622+
err := UpdateReclaimablePods(ctx, cl, workload, tc.reclaimablePods)
623+
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
624+
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
625+
}
626+
627+
var updatedWl kueue.Workload
628+
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
629+
t.Fatalf("Failed obtaining updated object: %v", err)
630+
}
631+
632+
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status); diff != "" {
633+
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
634+
}
635+
})
636+
}
637+
}
638+
}
639+
569640
func TestGetQueueOrderTimestamp(t *testing.T) {
570641
var (
571642
evictionOrdering = Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}

test/integration/singlecluster/controller/core/clusterqueue_controller_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/kueue/pkg/metrics"
3131
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
3232
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
33-
"sigs.k8s.io/kueue/pkg/workload"
3433
"sigs.k8s.io/kueue/test/integration/framework"
3534
"sigs.k8s.io/kueue/test/util"
3635
)
@@ -513,8 +512,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
513512
}, util.Timeout, util.Interval).Should(gomega.Succeed())
514513

515514
ginkgo.By("Mark two workers as reclaimable", func() {
516-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
517-
515+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
518516
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
519517
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
520518
gomega.Eventually(func(g gomega.Gomega) {
@@ -552,8 +550,8 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
552550
})
553551

554552
ginkgo.By("Mark all workers and a driver as reclaimable", func() {
555-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}})).To(gomega.Succeed())
556-
553+
reclaimablePods := []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}}
554+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
557555
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
558556
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
559557
gomega.Eventually(func(g gomega.Gomega) {
@@ -940,7 +938,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Ordered, ginkgo.Contin
940938
})
941939

942940
ginkgo.By("Marking two workers as reclaimable", func() {
943-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
941+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
944942
})
945943

946944
ginkgo.By("Validating CQ status hasn't changed", func() {

test/integration/singlecluster/scheduler/scheduler_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
470470
})
471471

472472
ginkgo.By("Reclaim one pod from the first workload", func() {
473-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
474-
473+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
475474
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
476475
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
477476
})
@@ -494,8 +493,8 @@ var _ = ginkgo.Describe("Scheduler", func() {
494493
})
495494

496495
ginkgo.By("Reclaim two pods from the second workload so that the first workload is resumed", func() {
497-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, secondWl, []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}})).To(gomega.Succeed())
498-
496+
reclaimablePods := []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}}
497+
util.UpdateReclaimablePods(ctx, k8sClient, secondWl, reclaimablePods)
499498
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl, secondWl)
500499
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
501500
util.ExpectReservingActiveWorkloadsMetric(preemptionClusterQ, 2)
@@ -531,7 +530,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
531530
})
532531

533532
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
534-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
533+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
535534
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
536535
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 2)
537536
})
@@ -567,7 +566,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
567566
})
568567

569568
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
570-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
569+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
571570
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl)
572571
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 1)
573572
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
@@ -734,8 +733,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
734733
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 0)
735734
})
736735
ginkgo.By("Mark one pod as reclaimable", func() {
737-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})).To(gomega.Succeed())
738-
736+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})
739737
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wl)
740738
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
741739
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 1)

test/integration/singlecluster/webhook/core/workload_test.go

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
528528
Obj()
529529
util.MustCreate(ctx, k8sClient, wl)
530530

531-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
532-
{Name: "ps1", Count: 4},
533-
{Name: "ps2", Count: 1},
534-
})
535-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
531+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 4}, {Name: "ps2", Count: 1}}
532+
expectUpdateReclaimablePodsBeForbiddenError(client.ObjectKeyFromObject(wl), reclaimablePods)
536533
})
537534
})
538535

@@ -1139,9 +1136,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11391136
).
11401137
Obj()
11411138
util.MustCreate(ctx, k8sClient, wl)
1142-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1143-
{Name: "ps1", Count: 1},
1144-
})).Should(gomega.Succeed())
1139+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11451140

11461141
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11471142
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1151,11 +1146,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11511146
Obj())
11521147

11531148
ginkgo.By("Updating reclaimable pods")
1154-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1155-
{Name: "ps1", Count: 2},
1156-
{Name: "ps2", Count: 1},
1157-
})
1158-
gomega.Expect(err).Should(gomega.Succeed())
1149+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1150+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11591151
})
11601152

11611153
ginkgo.It("reclaimable pod count cannot change down", func() {
@@ -1167,10 +1159,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11671159
).
11681160
Obj()
11691161
util.MustCreate(ctx, k8sClient, wl)
1170-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1171-
{Name: "ps1", Count: 2},
1172-
{Name: "ps2", Count: 1},
1173-
})).Should(gomega.Succeed())
1162+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1163+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11741164

11751165
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11761166
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1180,10 +1170,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11801170
Obj())
11811171

11821172
ginkgo.By("Updating reclaimable pods")
1183-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1184-
{Name: "ps1", Count: 1},
1185-
})
1186-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
1173+
expectUpdateReclaimablePodsBeForbiddenError(client.ObjectKeyFromObject(wl), []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11871174
})
11881175

11891176
ginkgo.It("podSetUpdates should be immutable when state is ready", func() {
@@ -1724,3 +1711,12 @@ func validSliceFor(levels []string, suffix int) kueue.TopologyAssignmentSlice {
17241711
}
17251712
return res
17261713
}
1714+
1715+
func expectUpdateReclaimablePodsBeForbiddenError(wlKey client.ObjectKey, reclaimablePods []kueue.ReclaimablePod) {
1716+
ginkgo.GinkgoHelper()
1717+
wl := &kueue.Workload{}
1718+
gomega.Eventually(func(g gomega.Gomega) {
1719+
g.Expect(k8sClient.Get(ctx, wlKey, wl))
1720+
g.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)).Should(utiltesting.BeForbiddenError())
1721+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1722+
}

test/util/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,3 +1385,12 @@ func ExpectJobToBeCompleted(ctx context.Context, c client.Client, job *batchv1.J
13851385
cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastTransitionTime", "LastProbeTime", "Reason", "Message"))))
13861386
}, LongTimeout, Interval).Should(gomega.Succeed())
13871387
}
1388+
1389+
func UpdateReclaimablePods(ctx context.Context, c client.Client, wl *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) {
1390+
ginkgo.GinkgoHelper()
1391+
createdWl := &kueue.Workload{}
1392+
gomega.Eventually(func(g gomega.Gomega) {
1393+
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(wl), createdWl)).To(gomega.Succeed())
1394+
g.Expect(workload.UpdateReclaimablePods(ctx, c, createdWl, reclaimablePods)).To(gomega.Succeed())
1395+
}, Timeout, Interval).Should(gomega.Succeed())
1396+
}

0 commit comments

Comments
 (0)