Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions pkg/agentscheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ const (
)

type Action struct {
fwk *framework.Framework
// configured flag for error cache
enablePredicateErrorCache bool
candidateNodeCount int
Expand Down Expand Up @@ -77,54 +76,58 @@ func (alloc *Action) Execute(fwk *framework.Framework, schedCtx *agentapi.Schedu
// 1. use predicateFn to filter out node that T can not be allocated on.
// 2. use ssn.NodeOrderFn to judge the best node and assign it to T

alloc.fwk = fwk
if err := alloc.allocateTask(schedCtx); err != nil {
alloc.recordTaskFailStatus(schedCtx, err)
alloc.failureHandler(schedCtx)
if err := alloc.allocateTask(fwk, schedCtx); err != nil {
alloc.recordTaskFailStatus(fwk, schedCtx, err)
alloc.failureHandler(fwk, schedCtx)
}
}

func (alloc *Action) allocateTask(schedCtx *agentapi.SchedulingContext) error {
func (alloc *Action) allocateTask(fwk *framework.Framework, schedCtx *agentapi.SchedulingContext) error {
task := schedCtx.Task

nodes := alloc.fwk.VolcanoNodeInfos()
nodes := fwk.VolcanoNodeInfos()

// TODO: check is pod allocatable
klog.V(3).Infof("There are <%d> nodes for task <%v/%v>", len(nodes), task.Namespace, task.Name)

if err := alloc.fwk.PrePredicateFn(task); err != nil {
if err := fwk.PrePredicateFn(task); err != nil {
err = fmt.Errorf("pre-predicate failed for task %s/%s: %w", task.Namespace, task.Name, err)
klog.ErrorS(err, "PrePredicate failed", "task", klog.KRef(task.Namespace, task.Name))
return err
}

predicatedNodes, err := alloc.predicateFeasibleNodes(task, nodes, schedCtx.NodesInShard)
predicatedNodes, err := alloc.predicateFeasibleNodes(fwk, task, nodes, schedCtx.NodesInShard)
if len(predicatedNodes) == 0 {
klog.ErrorS(err, "Predicate failed", "task", klog.KRef(task.Namespace, task.Name))
if err == nil {
return fmt.Errorf(api.AllNodeUnavailableMsg)
}
return err
}
bestNodes := alloc.prioritizeNodes(task, predicatedNodes, schedCtx.NodesInShard)
bestNodes := alloc.prioritizeNodes(fwk, task, predicatedNodes, schedCtx.NodesInShard)
result := &agentapi.PodScheduleResult{
SuggestedNodes: bestNodes,
SchedCtx: schedCtx,
BindContext: alloc.CreateBindContext(schedCtx),
BindContext: alloc.CreateBindContext(fwk, schedCtx),
}
alloc.SendResultToBinder(result)
alloc.SendResultToBinder(fwk, result)

return nil
}

func (alloc *Action) predicateFeasibleNodes(task *api.TaskInfo, allNodes []*api.NodeInfo, nodesInShard sets.Set[string]) ([]*api.NodeInfo, *api.FitErrors) {
func (alloc *Action) predicateFeasibleNodes(fwk *framework.Framework, task *api.TaskInfo, allNodes []*api.NodeInfo, nodesInShard sets.Set[string]) ([]*api.NodeInfo, *api.FitErrors) {
var predicateNodes []*api.NodeInfo
var fitErrors *api.FitErrors
ph := util.NewPredicateHelper()

// Create a predicate closure that captures fwk for this scheduling cycle.
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
return alloc.predicate(fwk, task, node)
}

// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(task.Pod.Status.NominatedNodeName) > 0 {
nominatedNodeInfo, err := alloc.fwk.GetVolcanoNodeInfo(task.Pod.Status.NominatedNodeName)
nominatedNodeInfo, err := fwk.GetVolcanoNodeInfo(task.Pod.Status.NominatedNodeName)
if err != nil {
fitErrors = api.NewFitErrors()
fitErrors.SetNodeError(task.Pod.Status.NominatedNodeName, err)
Expand All @@ -133,7 +136,7 @@ func (alloc *Action) predicateFeasibleNodes(task *api.TaskInfo, allNodes []*api.
}

if nominatedNodeInfo != nil {
predicateNodes, fitErrors = ph.PredicateNodes(task, []*api.NodeInfo{nominatedNodeInfo}, alloc.predicate, alloc.enablePredicateErrorCache, nodesInShard)
predicateNodes, fitErrors = ph.PredicateNodes(task, []*api.NodeInfo{nominatedNodeInfo}, predicateFn, alloc.enablePredicateErrorCache, nodesInShard)
if fitErrors != nil {
klog.ErrorS(fitErrors, "Predicate failed on nominated node", "node", task.Pod.Status.NominatedNodeName)
// Continue to find suitable nodes from all nodes.
Expand All @@ -146,7 +149,7 @@ func (alloc *Action) predicateFeasibleNodes(task *api.TaskInfo, allNodes []*api.

// If the nominated node is not found or the nominated node is not suitable for the task, we need to find a suitable node for the task from other nodes.
if len(predicateNodes) == 0 {
predicateNodes, fitErrors = ph.PredicateNodes(task, allNodes, alloc.predicate, alloc.enablePredicateErrorCache, nodesInShard)
predicateNodes, fitErrors = ph.PredicateNodes(task, allNodes, predicateFn, alloc.enablePredicateErrorCache, nodesInShard)
if fitErrors != nil {
return predicateNodes, fitErrors
}
Expand All @@ -156,7 +159,7 @@ func (alloc *Action) predicateFeasibleNodes(task *api.TaskInfo, allNodes []*api.
}

// prioritizeNodes selects the highest score node that idle resource meet task requirement.
func (alloc *Action) prioritizeNodes(task *api.TaskInfo, predicateNodes []*api.NodeInfo, nodesInShard sets.Set[string]) []*api.NodeInfo {
func (alloc *Action) prioritizeNodes(fwk *framework.Framework, task *api.TaskInfo, predicateNodes []*api.NodeInfo, nodesInShard sets.Set[string]) []*api.NodeInfo {
var candidateNodes [2][]*api.NodeInfo
var candidateNodesInShard []*api.NodeInfo
var candidateNodesInOtherShards []*api.NodeInfo
Expand Down Expand Up @@ -198,7 +201,7 @@ func (alloc *Action) prioritizeNodes(task *api.TaskInfo, predicateNodes []*api.N
case len(nodes) == 1: // If only one node after predicate, just use it.
bestNodes = append(bestNodes, nodes[0])
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, nodes, alloc.fwk.BatchNodeOrderFn, alloc.fwk.NodeOrderMapFn, alloc.fwk.NodeOrderReduceFn)
nodeScores := util.PrioritizeNodes(task, nodes, fwk.BatchNodeOrderFn, fwk.NodeOrderMapFn, fwk.NodeOrderReduceFn)
bestNodes, _ = util.SelectBestNodesAndScores(nodeScores, alloc.candidateNodeCount)
}
if len(bestNodes) > 0 {
Expand All @@ -208,28 +211,28 @@ func (alloc *Action) prioritizeNodes(task *api.TaskInfo, predicateNodes []*api.N
return bestNodes
}

func (alloc *Action) CreateBindContext(schedCtx *agentapi.SchedulingContext) *agentapi.BindContext {
func (alloc *Action) CreateBindContext(fwk *framework.Framework, schedCtx *agentapi.SchedulingContext) *agentapi.BindContext {
bindContext := &agentapi.BindContext{
SchedCtx: schedCtx,
Extensions: make(map[string]vcache.BindContextExtension),
}

for _, plugin := range alloc.fwk.Plugins {
for _, plugin := range fwk.Plugins {
// If the plugin implements the BindContextHandler interface, call the SetupBindContextExtension method.
if handler, ok := plugin.(framework.BindContextHandler); ok {
state := alloc.fwk.CurrentCycleState
state := fwk.CurrentCycleState
handler.SetupBindContextExtension(state, bindContext)
}
}

return bindContext
}

func (alloc *Action) SendResultToBinder(result *agentapi.PodScheduleResult) {
alloc.fwk.Cache.EnqueueScheduleResult(result)
func (alloc *Action) SendResultToBinder(fwk *framework.Framework, result *agentapi.PodScheduleResult) {
fwk.Cache.EnqueueScheduleResult(result)
}

func (alloc *Action) recordTaskFailStatus(schedCtx *agentapi.SchedulingContext, err error) {
func (alloc *Action) recordTaskFailStatus(fwk *framework.Framework, schedCtx *agentapi.SchedulingContext, err error) {
taskInfo := schedCtx.Task
// The pod of a scheduling gated task is given
// the ScheduleGated condition by the api-server. Do not change it.
Expand All @@ -247,31 +250,31 @@ func (alloc *Action) recordTaskFailStatus(schedCtx *agentapi.SchedulingContext,
if len(msg) == 0 {
msg = api.AllNodeUnavailableMsg
}
if err := alloc.fwk.Cache.TaskUnschedulable(taskInfo, reason, msg); err != nil {
if err := fwk.Cache.TaskUnschedulable(taskInfo, reason, msg); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
}

func (alloc *Action) failureHandler(schedCtx *agentapi.SchedulingContext) {
schedulingQueue := alloc.fwk.Cache.SchedulingQueue() // schedulingQueue will not be nil since we already checked in generateSchedulingContext before
func (alloc *Action) failureHandler(fwk *framework.Framework, schedCtx *agentapi.SchedulingContext) {
schedulingQueue := fwk.Cache.SchedulingQueue() // schedulingQueue will not be nil since we already checked in generateSchedulingContext before
if err := schedulingQueue.AddUnschedulableIfNotPresent(klog.Background(), schedCtx.QueuedPodInfo, schedulingQueue.SchedulingCycle()); err != nil {
klog.ErrorS(err, "Failed to add pod back to scheduling queue", "pod", klog.KObj(schedCtx.QueuedPodInfo.Pod))
}
}

func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) error {
func (alloc *Action) predicate(fwk *framework.Framework, task *api.TaskInfo, node *api.NodeInfo) error {
// Check for Resource Predicate
var statusSets api.StatusSets
if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.Idle, api.Zero); !ok {
statusSets = append(statusSets, &api.Status{Code: api.Unschedulable, Reason: api.WrapInsufficientResourceReason(resources)})
return api.NewFitErrWithStatus(task, node, statusSets...)
}
return alloc.predicateForAllocateAction(task, node)
return alloc.predicateForAllocateAction(fwk, task, node)
}

func (alloc *Action) predicateForAllocateAction(task *api.TaskInfo, node *api.NodeInfo) error {
err := alloc.fwk.PredicateFn(task, node)
func (alloc *Action) predicateForAllocateAction(fwk *framework.Framework, task *api.TaskInfo, node *api.NodeInfo) error {
err := fwk.PredicateFn(task, node)
if err == nil {
return nil
}
Expand Down
66 changes: 7 additions & 59 deletions pkg/agentscheduler/actions/allocate/allocate_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
scheduleroptions "volcano.sh/volcano/cmd/scheduler/app/options"
agentapi "volcano.sh/volcano/pkg/agentscheduler/api"
"volcano.sh/volcano/pkg/agentscheduler/framework"
"volcano.sh/volcano/pkg/agentscheduler/plugins/nodeorder"
"volcano.sh/volcano/pkg/agentscheduler/plugins/predicates"
agentuthelper "volcano.sh/volcano/pkg/agentscheduler/uthelper"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
Expand All @@ -37,60 +35,10 @@ import (

// TestAllocateWithShard tests agent scheduler allocate action behavior with shard configuration
func TestAllocateWithShard(t *testing.T) {
// Register plugins
framework.RegisterPluginBuilder(predicates.PluginName, predicates.New)
framework.RegisterPluginBuilder(nodeorder.PluginName, nodeorder.New)

// Initialize ServerOpts if nil (for agent scheduler)
if options.ServerOpts == nil {
options.ServerOpts = options.NewServerOption()
}
// Initialize scheduler ServerOpts if nil (for volume binding plugin)
// predicate_helper.go uses scheduler's options, so we need to set it
if scheduleroptions.ServerOpts == nil {
scheduleroptions.ServerOpts = scheduleroptions.NewServerOption()
}
// Save original options to restore after test
originalShardingMode := options.ServerOpts.ShardingMode
originalShardName := options.ServerOpts.ShardName
originalSchedulerShardingMode := scheduleroptions.ServerOpts.ShardingMode
originalSchedulerShardName := scheduleroptions.ServerOpts.ShardName
defer func() {
if options.ServerOpts != nil {
options.ServerOpts.ShardingMode = originalShardingMode
options.ServerOpts.ShardName = originalShardName
}
if scheduleroptions.ServerOpts != nil {
scheduleroptions.ServerOpts.ShardingMode = originalSchedulerShardingMode
scheduleroptions.ServerOpts.ShardName = originalSchedulerShardName
}
}()

scheduleroptions.ServerOpts.PercentageOfNodesToFind = 100

// Common setup shared across all test cases
trueValue := true
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: predicates.PluginName,
EnabledPredicate: &trueValue,
},
{
Name: nodeorder.PluginName,
EnabledNodeOrder: &trueValue,
Arguments: map[string]interface{}{
"leastrequested.weight": 1,
"mostrequested.weight": 0,
},
},
},
},
}
agentuthelper.InitTestEnv(t)

// Create test framework (shared setup)
testFwk, err := agentuthelper.NewTestFramework("test-scheduler", []framework.Action{New()}, tiers, []conf.Configuration{})
testFwk, err := agentuthelper.NewTestFramework("test-scheduler", 1, []framework.Action{New()}, agentuthelper.DefaultTiers(), []conf.Configuration{})
if err != nil {
t.Fatalf("Failed to create test framework: %v", err)
}
Expand All @@ -111,7 +59,7 @@ func TestAllocateWithShard(t *testing.T) {
testFwk.MockCache.AddOrUpdateNode(n4)

// Update snapshot after adding nodes.
snapshot := testFwk.Framework.GetSnapshot()
snapshot := testFwk.Frameworks[0].GetSnapshot()
if err := testFwk.MockCache.UpdateSnapshot(snapshot); err != nil {
t.Fatalf("Failed to update snapshot: %v", err)
}
Expand Down Expand Up @@ -164,10 +112,10 @@ func TestAllocateWithShard(t *testing.T) {
}

// Execute scheduling
testFwk.Framework.ClearCycleState()
testFwk.Framework.OnCycleStart()
testFwk.Action.Execute(testFwk.Framework, schedCtx)
testFwk.Framework.OnCycleEnd()
testFwk.Frameworks[0].ClearCycleState()
testFwk.Frameworks[0].OnCycleStart()
testFwk.Actions[0].Execute(testFwk.Frameworks[0], schedCtx)
testFwk.Frameworks[0].OnCycleEnd()

// Verify result
agentuthelper.VerifySchedulingResult(t, testFwk.MockCache, tt.expectedNode)
Expand Down
Loading
Loading