Fix queue status conflicts in multi-scheduler by adding per-scheduler allocation tracking#5135
Fix queue status conflicts in multi-scheduler by adding per-scheduler allocation tracking#5135rishisulakhe wants to merge 8 commits intovolcano-sh:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Add SchedulerAllocation type and SchedulerAllocations field to QueueStatus Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
Signed-off-by: Rishi Prasad Sulakhe <rishiprasadsulakhe@gmail.com>
7737dcc to
1dcee55
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses multi-scheduler queue status update conflicts in Volcano by introducing per-scheduler allocation reporting in QueueStatus, switching schedulers to Server-Side Apply (SSA) status updates, and having the queue controller aggregate those per-scheduler values back into the existing status.allocated field for backward compatibility.
Changes:
- Add
QueueStatus.SchedulerAllocations(map keyed by scheduler identity) plus theSchedulerAllocationtype in the API. - Update scheduler session close logic to patch only the current scheduler’s allocation entry via SSA with a per-scheduler
FieldManager. - Update queue controller to aggregate active scheduler entries into
status.allocatedand garbage-collect stale entries; add tests for aggregation logic.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| staging/src/volcano.sh/apis/pkg/client/applyconfiguration/utils.go | Registers apply-configuration kind lookup for SchedulerAllocation. |
| staging/src/volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1/schedulerallocation.go | New generated apply-configuration type for SSA updates of scheduler allocation entries. |
| staging/src/volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1/queuestatus.go | Extends queue status apply-config to include schedulerAllocations and builder method. |
| staging/src/volcano.sh/apis/pkg/apis/scheduling/zz_generated.deepcopy.go | Adds deep-copy support for SchedulerAllocations and SchedulerAllocation (internal API). |
| staging/src/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.deepcopy.go | Adds deep-copy support for SchedulerAllocations and SchedulerAllocation (v1beta1 API). |
| staging/src/volcano.sh/apis/pkg/apis/scheduling/v1beta1/zz_generated.conversion.go | Adds conversions for SchedulerAllocation and maps between v1beta1 and internal types. |
| staging/src/volcano.sh/apis/pkg/apis/scheduling/v1beta1/types.go | Introduces QueueStatus.SchedulerAllocations and SchedulerAllocation in v1beta1 API. |
| staging/src/volcano.sh/apis/pkg/apis/scheduling/types.go | Introduces QueueStatus.SchedulerAllocations and SchedulerAllocation in internal API. |
| pkg/scheduler/framework/session.go | Switches queue status update on session close to SSA patch of per-scheduler allocation entry. |
| pkg/scheduler/cache/interface.go | Adds cache API for scheduler identity and patching scheduler allocation via SSA. |
| pkg/scheduler/cache/cache.go | Implements scheduler identity and SSA-based PatchSchedulerAllocation. |
| pkg/scheduler/cache/cache_test.go | Adds unit test coverage for SchedulerIdentity. |
| pkg/controllers/queue/queue_controller_action.go | Aggregates SchedulerAllocations into status.allocated and garbage-collects stale entries. |
| pkg/controllers/queue/queue_controller_test.go | Adds tests for aggregation behavior and stale entry exclusion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if sc.c != nil && sc.schedulerPodName != "" { | ||
| return sc.schedulerPodName | ||
| } | ||
| return sc.schedulerNames[0] |
There was a problem hiding this comment.
SchedulerIdentity can panic if sc.schedulerNames is empty (sc.schedulerNames[0]). Even if current callers always pass at least one name, this is an exported Cache method and is used in multiple contexts (including tests/mocks). Please add a defensive check (e.g., return sc.schedulerPodName if set, otherwise return "" or a well-defined default / error) to avoid potential panics.
| return sc.schedulerNames[0] | |
| if len(sc.schedulerNames) > 0 { | |
| return sc.schedulerNames[0] | |
| } | |
| return "" |
| assert.True(t, ok, "expected resource %s in Allocated", resName) | ||
| assert.True(t, expected.Equal(actual), | ||
| "resource %s: expected %s, got %s", resName, expected.String(), actual.String()) | ||
| } |
There was a problem hiding this comment.
This test doesn't assert anything when ExpectAllocated is empty (the loop over tc.ExpectAllocated is a no-op), so it won't catch regressions where item.Status.Allocated incorrectly retains old values. Please add an assertion on the overall map length/content (e.g., assert.Empty(t, item.Status.Allocated) or assert.Equal(t, tc.ExpectAllocated, item.Status.Allocated)).
| } | |
| } | |
| assert.Equal(t, tc.ExpectAllocated, item.Status.Allocated) |
|
|
||
| if stateChanged || allocatedChanged { | ||
| queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State) | ||
| if len(aggregatedAllocated) > 0 { |
There was a problem hiding this comment.
When the aggregated allocation is empty (e.g., no active SchedulerAllocations or all entries stale), this code skips calling WithAllocated, so a previously-populated status.allocated will never be cleared. That can leave stale Allocated values indefinitely and also makes allocatedChanged true without actually updating the field. Consider always applying the Allocated field (including empty/nil) when allocatedChanged is true so the controller can clear it correctly.
| if len(aggregatedAllocated) > 0 { | |
| if allocatedChanged { |
| var staleKeys []string | ||
| staleTTL := 5 * time.Minute | ||
| now := time.Now() |
There was a problem hiding this comment.
staleTTL := 5 * time.Minute is a hard-coded policy value embedded in the controller loop. Please consider promoting this to a named constant (or controller config/flag) so it's easier to discover, document, and adjust without editing logic.
| patchBytes, err := json.Marshal(patch) | ||
| if err == nil { | ||
| _, patchErr := c.vcClient.SchedulingV1beta1().Queues().Patch( | ||
| context.TODO(), queue.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") | ||
| if patchErr != nil { | ||
| klog.Errorf("Failed to clean up stale scheduler allocations for queue %s: %v", queue.Name, patchErr) | ||
| } | ||
| } |
There was a problem hiding this comment.
The merge-patch cleanup path ignores json.Marshal errors (it just silently skips cleanup). Please handle the marshal error explicitly (at least log it) so failures to GC stale schedulerAllocations entries are visible during operations.
There was a problem hiding this comment.
Code Review
This pull request introduces a per-scheduler allocation tracking system within the Queue status to support multi-scheduler and hash-ring deployments. It implements logic to aggregate individual scheduler allocations into a total Allocated field, adds a garbage collection mechanism for stale entries using a 5-minute TTL, and migrates status updates to use Server-Side Apply (SSA). The review feedback identifies a bug where the total allocation is not cleared when it becomes empty and suggests improvements for error handling during JSON marshaling, context propagation instead of using context.TODO(), and more rigorous unit testing to verify resource list sizes and clearing behavior.
| queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State) | ||
| if len(aggregatedAllocated) > 0 { | ||
| queueStatusApply = queueStatusApply.WithAllocated(aggregatedAllocated) | ||
| } |
There was a problem hiding this comment.
The logic for updating status.allocated is flawed. The check if len(aggregatedAllocated) > 0 prevents clearing status.allocated when it becomes empty (e.g., when all scheduler allocations expire). This can lead to an inaccurate status.allocated field that retains stale values. The WithAllocated method should be called whenever allocatedChanged is true.
queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State).WithAllocated(aggregatedAllocated)| // Aggregate per-scheduler allocations into total Allocated and garbage-collect stale entries | ||
| aggregatedAllocated := v1.ResourceList{} | ||
| var staleKeys []string | ||
| staleTTL := 5 * time.Minute |
| patchBytes, err := json.Marshal(patch) | ||
| if err == nil { | ||
| _, patchErr := c.vcClient.SchedulingV1beta1().Queues().Patch( | ||
| context.TODO(), queue.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") | ||
| if patchErr != nil { | ||
| klog.Errorf("Failed to clean up stale scheduler allocations for queue %s: %v", queue.Name, patchErr) | ||
| } | ||
| } |
There was a problem hiding this comment.
The error from json.Marshal is silently ignored. If marshaling fails, the patch for cleaning up stale scheduler allocations will not be applied, and no error will be logged. This can lead to stale entries never being removed. The error should be handled by logging it.
patchBytes, err := json.Marshal(patch)
if err != nil {
klog.Errorf("Failed to marshal patch for stale scheduler allocations in queue %s: %v", queue.Name, err)
} else {
_, patchErr := c.vcClient.SchedulingV1beta1().Queues().Patch(
context.TODO(), queue.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if patchErr != nil {
klog.Errorf("Failed to clean up stale scheduler allocations for queue %s: %v", queue.Name, patchErr)
}
}| Name: "Empty scheduler allocations produces empty Allocated", | ||
| queue: &schedulingv1beta1.Queue{ | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "root", | ||
| }, | ||
| Status: schedulingv1beta1.QueueStatus{ | ||
| State: schedulingv1beta1.QueueStateOpen, | ||
| }, | ||
| }, | ||
| ExpectAllocated: v1.ResourceList{}, | ||
| }, |
There was a problem hiding this comment.
The test case "Empty scheduler allocations produces empty Allocated" is not fully testing the scenario it's named after. It would pass even with a bug where an empty aggregatedAllocated fails to clear a pre-existing Status.Allocated. To make this test more robust, the initial queue.Status.Allocated should be non-empty, and the test should assert that it becomes empty.
Name: "Empty scheduler allocations produces empty Allocated",
queue: &schedulingv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "root",
},
Status: schedulingv1beta1.QueueStatus{
State: schedulingv1beta1.QueueStateOpen,
Allocated: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
},
},
},
ExpectAllocated: v1.ResourceList{},| for resName, expected := range tc.ExpectAllocated { | ||
| actual, ok := item.Status.Allocated[resName] | ||
| assert.True(t, ok, "expected resource %s in Allocated", resName) | ||
| assert.True(t, expected.Equal(actual), | ||
| "resource %s: expected %s, got %s", resName, expected.String(), actual.String()) | ||
| } |
There was a problem hiding this comment.
The current assertion only checks for the presence and value of expected resources. It does not verify if there are unexpected extra resources in item.Status.Allocated. To make the test more robust, you should also assert that the number of resource types in the actual and expected Allocated lists is the same.
assert.Len(t, item.Status.Allocated, len(tc.ExpectAllocated), "mismatched number of resource types in Allocated")
for resName, expected := range tc.ExpectAllocated {
actual, ok := item.Status.Allocated[resName]
assert.True(t, ok, "expected resource %s in Allocated", resName)
assert.True(t, expected.Equal(actual),
"resource %s: expected %s, got %s", resName, expected.String(), actual.String())
}|
|
||
| fieldManager := "volcano-scheduler-" + identity | ||
| _, err := sc.vcClient.SchedulingV1beta1().Queues().ApplyStatus( | ||
| context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: fieldManager}) |
There was a problem hiding this comment.
Using context.TODO() is generally discouraged as it provides an empty, non-cancellable context which can lead to issues with request tracing, cancellation, and deadline propagation. It would be better to pass a context.Context from the caller. Please consider adding a context.Context parameter to PatchSchedulerAllocation and passing it to the ApplyStatus call. This would require updating the Cache interface and the call site in pkg/scheduler/framework/session.go.
| if err := ssn.cache.PatchSchedulerAllocation( | ||
| string(queueID), identity, queueAllocated); err != nil { | ||
| klog.Errorf("failed to patch scheduler allocation for queue <%s>: %s", | ||
| ssn.Queues[queueID].Name, err.Error()) | ||
| } |
There was a problem hiding this comment.
Using context.TODO() is generally discouraged. It would be better to pass a context.Context down from the updateQueueStatus function's caller. Since PatchSchedulerAllocation is being modified to accept a context, this call site should be updated to pass a meaningful context instead of context.TODO().
| if err := ssn.cache.PatchSchedulerAllocation( | |
| string(queueID), identity, queueAllocated); err != nil { | |
| klog.Errorf("failed to patch scheduler allocation for queue <%s>: %s", | |
| ssn.Queues[queueID].Name, err.Error()) | |
| } | |
| if err := ssn.cache.PatchSchedulerAllocation( | |
| context.TODO(), string(queueID), identity, queueAllocated); err != nil { | |
| klog.Errorf("failed to patch scheduler allocation for queue <%s>: %s", | |
| ssn.Queues[queueID].Name, err.Error()) | |
| } |
|
Hi @hajnalmt @JesseStutler Could you pls review this? |
What type of PR is this?
/kind bug
Which issue(s) this PR fixes:
Fixes #5084 (Solution 2)
What this PR does / why we need it:
When running multiple schedulers in Volcano v1.14.0+, each scheduler computes Queue.Status.Allocated from only its own local view, then does a full UpdateStatus() call. This causes two issues:
This happens both in the Agent Scheduler + Volcano Scheduler setup and in hash-ring based multi-scheduler deployments.
What changed and why
New API field —
QueueStatus.SchedulerAllocationsAdded a
map[string]SchedulerAllocationfield where each key is a scheduler's identity (scheduler name, or pod name for hash-ring deployments). Each entry holds that scheduler's Allocated resources and aLastUpdateTimetimestamp. This lets multiple schedulers write to the same queue without stepping on each other.Scheduler side — Instead of calling
UpdateStatus()(which replaces the entire status), each scheduler now callsApplyStatus()(SSA) with a uniqueFieldManagerper scheduler instance. This means scheduler-A writing its entry never conflicts with scheduler-B writing its entry — Kubernetes SSA handles this natively.The queue controller's
syncQueue()now sums up all activeSchedulerAllocationsentries into the existingStatus.Allocatedfield. This keeps backward compatibility — any tool readingStatus.Allocatedstill sees the correct total. Entries older than 5 minutes are considered stale and are excluded from the sum, then garbage-collected via merge patch.The existing
Status.Allocatedfield is preserved and still works exactly as before. Single-scheduler deployments see no behavior change — there's just one entry in the map, and the total equals that entry