Skip to content

Commit 057214e

Browse files
committed
Use WorkloadRequestUseMergePatch in UpdateReclaimablePods().
1 parent 50d89e0 commit 057214e

File tree

6 files changed

+112
-39
lines changed

6 files changed

+112
-39
lines changed

pkg/workload/workload.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,10 +1111,11 @@ func HasQuotaReservation(w *kueue.Workload) bool {
11111111
}
11121112

11131113
// UpdateReclaimablePods updates the ReclaimablePods list for the workload with SSA.
1114-
func UpdateReclaimablePods(ctx context.Context, c client.Client, w *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) error {
1115-
patch := BaseSSAWorkload(w, false)
1116-
patch.Status.ReclaimablePods = reclaimablePods
1117-
return c.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(constants.ReclaimablePodsMgr))
1114+
func UpdateReclaimablePods(ctx context.Context, c client.Client, wl *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) error {
1115+
return PatchStatus(ctx, c, wl, constants.ReclaimablePodsMgr, func(wl *kueue.Workload) (bool, error) {
1116+
wl.Status.ReclaimablePods = reclaimablePods
1117+
return true, nil
1118+
})
11181119
}
11191120

11201121
// ReclaimablePodsAreEqual checks if two Reclaimable pods are semantically equal

pkg/workload/workload_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,77 @@ func TestSetConditionAndUpdate(t *testing.T) {
578578
}
579579
}
580580

581+
func TestUpdateReclaimablePods(t *testing.T) {
582+
cases := map[string]struct {
583+
oldStatus kueue.WorkloadStatus
584+
reclaimablePods []kueue.ReclaimablePod
585+
err error
586+
wantStatus kueue.WorkloadStatus
587+
wantErr error
588+
}{
589+
"set reclaimable pods": {
590+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
591+
wantStatus: kueue.WorkloadStatus{
592+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
593+
},
594+
},
595+
"set reclaimable pods with error": {
596+
err: errTest,
597+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
598+
wantStatus: kueue.WorkloadStatus{},
599+
wantErr: errTest,
600+
},
601+
"update reclaimable pods": {
602+
oldStatus: kueue.WorkloadStatus{
603+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 1}},
604+
},
605+
reclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
606+
wantStatus: kueue.WorkloadStatus{
607+
ReclaimablePods: []kueue.ReclaimablePod{{Name: "ps1", Count: 2}},
608+
},
609+
},
610+
}
611+
for name, tc := range cases {
612+
for _, useMergePatch := range []bool{false, true} {
613+
t.Run(fmt.Sprintf("%s with WorkloadRequestUseMergePatch enabled %t", name, useMergePatch), func(t *testing.T) {
614+
features.SetFeatureGateDuringTest(t, features.WorkloadRequestUseMergePatch, useMergePatch)
615+
616+
ctx, _ := utiltesting.ContextWithLog(t)
617+
618+
workload := utiltestingapi.MakeWorkload("foo", metav1.NamespaceDefault).Obj()
619+
workload.Status = tc.oldStatus
620+
621+
cl := utiltesting.NewClientBuilder().
622+
WithObjects(workload).
623+
WithStatusSubresource(&kueue.Workload{}).
624+
WithInterceptorFuncs(interceptor.Funcs{
625+
SubResourcePatch: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
626+
if tc.err != nil {
627+
return tc.err
628+
}
629+
return utiltesting.TreatSSAAsStrategicMerge(ctx, c, subResourceName, obj, patch, opts...)
630+
},
631+
}).
632+
Build()
633+
634+
err := UpdateReclaimablePods(ctx, cl, workload, tc.reclaimablePods)
635+
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
636+
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
637+
}
638+
639+
var updatedWl kueue.Workload
640+
if err := cl.Get(ctx, client.ObjectKeyFromObject(workload), &updatedWl); err != nil {
641+
t.Fatalf("Failed obtaining updated object: %v", err)
642+
}
643+
644+
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status); diff != "" {
645+
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
646+
}
647+
})
648+
}
649+
}
650+
}
651+
581652
func TestGetQueueOrderTimestamp(t *testing.T) {
582653
var (
583654
evictionOrdering = Ordering{PodsReadyRequeuingTimestamp: config.EvictionTimestamp}

test/integration/singlecluster/controller/core/clusterqueue_controller_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/kueue/pkg/metrics"
3131
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
3232
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
33-
"sigs.k8s.io/kueue/pkg/workload"
3433
"sigs.k8s.io/kueue/test/integration/framework"
3534
"sigs.k8s.io/kueue/test/util"
3635
)
@@ -513,8 +512,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Label("controller:clus
513512
}, util.Timeout, util.Interval).Should(gomega.Succeed())
514513

515514
ginkgo.By("Mark two workers as reclaimable", func() {
516-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
517-
515+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
518516
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
519517
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
520518
gomega.Eventually(func(g gomega.Gomega) {
@@ -552,8 +550,8 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Label("controller:clus
552550
})
553551

554552
ginkgo.By("Mark all workers and a driver as reclaimable", func() {
555-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}})).To(gomega.Succeed())
556-
553+
reclaimablePods := []kueue.ReclaimablePod{{Name: "workers", Count: 5}, {Name: "driver", Count: 1}}
554+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
557555
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
558556
util.ExpectLQReservingActiveWorkloadsMetric(localQueue, 1)
559557
gomega.Eventually(func(g gomega.Gomega) {
@@ -940,7 +938,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", ginkgo.Label("controller:clus
940938
})
941939

942940
ginkgo.By("Marking two workers as reclaimable", func() {
943-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})).To(gomega.Succeed())
941+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "workers", Count: 2}})
944942
})
945943

946944
ginkgo.By("Validating CQ status hasn't changed", func() {

test/integration/singlecluster/scheduler/scheduler_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
470470
})
471471

472472
ginkgo.By("Reclaim one pod from the first workload", func() {
473-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
474-
473+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
475474
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
476475
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
477476
})
@@ -494,8 +493,8 @@ var _ = ginkgo.Describe("Scheduler", func() {
494493
})
495494

496495
ginkgo.By("Reclaim two pods from the second workload so that the first workload is resumed", func() {
497-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, secondWl, []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}})).To(gomega.Succeed())
498-
496+
reclaimablePods := []kueue.ReclaimablePod{{Name: "first", Count: 1}, {Name: "second", Count: 1}}
497+
util.UpdateReclaimablePods(ctx, k8sClient, secondWl, reclaimablePods)
499498
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl, secondWl)
500499
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
501500
util.ExpectReservingActiveWorkloadsMetric(preemptionClusterQ, 2)
@@ -531,7 +530,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
531530
})
532531

533532
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
534-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
533+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
535534
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 0)
536535
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 2)
537536
})
@@ -567,7 +566,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
567566
})
568567

569568
ginkgo.By("Reclaim one pod from the first workload and admitting the second one", func() {
570-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})).To(gomega.Succeed())
569+
util.UpdateReclaimablePods(ctx, k8sClient, firstWl, []kueue.ReclaimablePod{{Name: "third", Count: 1}})
571570
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, firstWl)
572571
util.ExpectPendingWorkloadsMetric(preemptionClusterQ, 0, 1)
573572
util.ExpectAdmittedWorkloadsTotalMetric(preemptionClusterQ, "", 1)
@@ -734,8 +733,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
734733
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 0)
735734
})
736735
ginkgo.By("Mark one pod as reclaimable", func() {
737-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})).To(gomega.Succeed())
738-
736+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: kueue.DefaultPodSetName, Count: 1}})
739737
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wl)
740738
util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0)
741739
util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, "", 1)

test/integration/singlecluster/webhook/core/workload_test.go

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -528,11 +528,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
528528
Obj()
529529
util.MustCreate(ctx, k8sClient, wl)
530530

531-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
532-
{Name: "ps1", Count: 4},
533-
{Name: "ps2", Count: 1},
534-
})
535-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
531+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 4}, {Name: "ps2", Count: 1}}
532+
expectUpdateReclaimablePodsBeForbiddenError(client.ObjectKeyFromObject(wl), reclaimablePods)
536533
})
537534
})
538535

@@ -1139,9 +1136,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11391136
).
11401137
Obj()
11411138
util.MustCreate(ctx, k8sClient, wl)
1142-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1143-
{Name: "ps1", Count: 1},
1144-
})).Should(gomega.Succeed())
1139+
util.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11451140

11461141
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11471142
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1151,11 +1146,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11511146
Obj())
11521147

11531148
ginkgo.By("Updating reclaimable pods")
1154-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1155-
{Name: "ps1", Count: 2},
1156-
{Name: "ps2", Count: 1},
1157-
})
1158-
gomega.Expect(err).Should(gomega.Succeed())
1149+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1150+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11591151
})
11601152

11611153
ginkgo.It("reclaimable pod count cannot change down", func() {
@@ -1167,10 +1159,8 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11671159
).
11681160
Obj()
11691161
util.MustCreate(ctx, k8sClient, wl)
1170-
gomega.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1171-
{Name: "ps1", Count: 2},
1172-
{Name: "ps2", Count: 1},
1173-
})).Should(gomega.Succeed())
1162+
reclaimablePods := []kueue.ReclaimablePod{{Name: "ps1", Count: 2}, {Name: "ps2", Count: 1}}
1163+
util.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)
11741164

11751165
util.SetQuotaReservation(ctx, k8sClient, client.ObjectKeyFromObject(wl),
11761166
utiltestingapi.MakeAdmission("cluster-queue").
@@ -1180,10 +1170,7 @@ var _ = ginkgo.Describe("Workload validating webhook", ginkgo.Ordered, func() {
11801170
Obj())
11811171

11821172
ginkgo.By("Updating reclaimable pods")
1183-
err := workload.UpdateReclaimablePods(ctx, k8sClient, wl, []kueue.ReclaimablePod{
1184-
{Name: "ps1", Count: 1},
1185-
})
1186-
gomega.Expect(err).Should(utiltesting.BeForbiddenError())
1173+
expectUpdateReclaimablePodsBeForbiddenError(client.ObjectKeyFromObject(wl), []kueue.ReclaimablePod{{Name: "ps1", Count: 1}})
11871174
})
11881175

11891176
ginkgo.It("podSetUpdates should be immutable when state is ready", func() {
@@ -1724,3 +1711,12 @@ func validSliceFor(levels []string, suffix int) kueue.TopologyAssignmentSlice {
17241711
}
17251712
return res
17261713
}
1714+
1715+
func expectUpdateReclaimablePodsBeForbiddenError(wlKey client.ObjectKey, reclaimablePods []kueue.ReclaimablePod) {
1716+
ginkgo.GinkgoHelper()
1717+
wl := &kueue.Workload{}
1718+
gomega.Eventually(func(g gomega.Gomega) {
1719+
g.Expect(k8sClient.Get(ctx, wlKey, wl)).To(gomega.Succeed())
1720+
g.Expect(workload.UpdateReclaimablePods(ctx, k8sClient, wl, reclaimablePods)).Should(utiltesting.BeForbiddenError())
1721+
}, util.Timeout, util.Interval).Should(gomega.Succeed())
1722+
}

test/util/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,3 +1395,12 @@ func ExpectJobToBeCompleted(ctx context.Context, c client.Client, job *batchv1.J
13951395
cmpopts.IgnoreFields(batchv1.JobCondition{}, "LastTransitionTime", "LastProbeTime", "Reason", "Message"))))
13961396
}, LongTimeout, Interval).Should(gomega.Succeed())
13971397
}
1398+
1399+
func UpdateReclaimablePods(ctx context.Context, c client.Client, wl *kueue.Workload, reclaimablePods []kueue.ReclaimablePod) {
1400+
ginkgo.GinkgoHelper()
1401+
createdWl := &kueue.Workload{}
1402+
gomega.Eventually(func(g gomega.Gomega) {
1403+
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(wl), createdWl)).To(gomega.Succeed())
1404+
g.Expect(workload.UpdateReclaimablePods(ctx, c, createdWl, reclaimablePods)).To(gomega.Succeed())
1405+
}, Timeout, Interval).Should(gomega.Succeed())
1406+
}

0 commit comments

Comments
 (0)