Skip to content

Commit f22a75a

Browse files
mtian29Mingyuan Tian
andauthored
[Feature] Support Volcano Network Topology Aware Scheduling for kuberay (#4105)
* Support Volcano Network Topology Aware Scheduling for kuberay * Address comments for tests * Fix precommit lint errors. --------- Co-authored-by: Mingyuan Tian <[email protected]>
1 parent 9e68367 commit f22a75a

File tree

2 files changed

+101
-8
lines changed

2 files changed

+101
-8
lines changed

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package volcano
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67

78
corev1 "k8s.io/api/core/v1"
89
"k8s.io/apimachinery/pkg/api/errors"
@@ -25,9 +26,11 @@ import (
2526
)
2627

2728
const (
28-
PodGroupName = "podgroups.scheduling.volcano.sh"
29-
pluginName = "volcano"
30-
QueueNameLabelKey = "volcano.sh/queue-name"
29+
PodGroupName = "podgroups.scheduling.volcano.sh"
30+
pluginName = "volcano"
31+
QueueNameLabelKey = "volcano.sh/queue-name"
32+
NetworkTopologyModeLabelKey = "volcano.sh/network-topology-mode"
33+
NetworkTopologyHighestTierAllowedLabelKey = "volcano.sh/network-topology-highest-tier-allowed"
3134
)
3235

3336
type VolcanoBatchScheduler struct {
@@ -155,7 +158,11 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O
155158
return err
156159
}
157160

158-
podGroup := createPodGroup(owner, podGroupName, size, totalResource)
161+
podGroup, err := createPodGroup(owner, podGroupName, size, totalResource)
162+
if err != nil {
163+
logger.Error(err, "Failed to create pod group specification", "PodGroup.Error", err)
164+
return err
165+
}
159166
if err := v.cli.Create(ctx, &podGroup); err != nil {
160167
if errors.IsAlreadyExists(err) {
161168
logger.Info("podGroup already exists, no need to create", "name", podGroupName)
@@ -187,7 +194,7 @@ func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, ray
187194
return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster)
188195
}
189196

190-
func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup {
197+
func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) (volcanoschedulingv1beta1.PodGroup, error) {
191198
var ownerRef metav1.OwnerReference
192199
switch obj := owner.(type) {
193200
case *rayv1.RayCluster:
@@ -211,14 +218,30 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR
211218
},
212219
}
213220

221+
// Handle network topology configuration
222+
mode, modeOk := owner.GetLabels()[NetworkTopologyModeLabelKey]
223+
if modeOk {
224+
podGroup.Spec.NetworkTopology = &volcanoschedulingv1beta1.NetworkTopologySpec{
225+
Mode: volcanoschedulingv1beta1.NetworkTopologyMode(mode),
226+
}
227+
highestTier, tierOk := owner.GetLabels()[NetworkTopologyHighestTierAllowedLabelKey]
228+
if tierOk {
229+
highestTierInt, err := strconv.Atoi(highestTier)
230+
if err != nil {
231+
return podGroup, fmt.Errorf("failed to convert %s label to int: %w for podgroup %s in namespace %s", NetworkTopologyHighestTierAllowedLabelKey, err, podGroupName, owner.GetNamespace())
232+
}
233+
podGroup.Spec.NetworkTopology.HighestTierAllowed = &highestTierInt
234+
}
235+
}
236+
214237
if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok {
215238
podGroup.Spec.Queue = queue
216239
}
217240
if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok {
218241
podGroup.Spec.PriorityClassName = priorityClassName
219242
}
220243

221-
return podGroup
244+
return podGroup, nil
222245
}
223246

224247
func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) {

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ func TestCreatePodGroupForRayCluster(t *testing.T) {
161161

162162
minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
163163
totalResource := utils.CalculateDesiredResources(&cluster)
164-
pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
164+
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
165+
require.NoError(t, err)
165166

166167
a.Equal(cluster.Namespace, pg.Namespace)
167168

@@ -185,7 +186,8 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) {
185186

186187
minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
187188
totalResource := utils.CalculateDesiredResources(&cluster)
188-
pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
189+
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
190+
require.NoError(t, err)
189191

190192
a.Equal(cluster.Namespace, pg.Namespace)
191193

@@ -206,6 +208,74 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) {
206208
a.Equal("4", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String())
207209
}
208210

211+
func createTestRayClusterWithLabels(labels map[string]string) rayv1.RayCluster {
212+
cluster := createTestRayCluster(1)
213+
if cluster.ObjectMeta.Labels == nil {
214+
cluster.ObjectMeta.Labels = make(map[string]string)
215+
}
216+
for k, v := range labels {
217+
cluster.ObjectMeta.Labels[k] = v
218+
}
219+
return cluster
220+
}
221+
222+
func TestCreatePodGroup_NetworkTopologyBothLabels(t *testing.T) {
223+
a := assert.New(t)
224+
225+
// Test with both network topology mode and highest tier allowed
226+
cluster := createTestRayClusterWithLabels(map[string]string{
227+
NetworkTopologyModeLabelKey: "soft",
228+
NetworkTopologyHighestTierAllowedLabelKey: "3",
229+
})
230+
231+
minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
232+
totalResource := utils.CalculateDesiredResources(&cluster)
233+
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
234+
require.NoError(t, err)
235+
236+
a.Equal(cluster.Namespace, pg.Namespace)
237+
a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("soft"), pg.Spec.NetworkTopology.Mode)
238+
a.NotNil(pg.Spec.NetworkTopology.HighestTierAllowed)
239+
a.Equal(3, *pg.Spec.NetworkTopology.HighestTierAllowed)
240+
}
241+
242+
func TestCreatePodGroup_NetworkTopologyOnlyModeLabel(t *testing.T) {
243+
a := assert.New(t)
244+
245+
// Test with only network topology mode set
246+
cluster := createTestRayClusterWithLabels(map[string]string{
247+
NetworkTopologyModeLabelKey: "hard",
248+
})
249+
250+
minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
251+
totalResource := utils.CalculateDesiredResources(&cluster)
252+
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
253+
require.NoError(t, err)
254+
255+
a.Equal(cluster.Namespace, pg.Namespace)
256+
a.NotNil(pg.Spec.NetworkTopology)
257+
a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("hard"), pg.Spec.NetworkTopology.Mode)
258+
a.Nil(pg.Spec.NetworkTopology.HighestTierAllowed)
259+
}
260+
261+
func TestCreatePodGroup_NetworkTopologyHighestTierAllowedNotInt(t *testing.T) {
262+
a := assert.New(t)
263+
264+
// Test with network topology mode set and highest tier allowed is not an int
265+
cluster := createTestRayClusterWithLabels(map[string]string{
266+
NetworkTopologyModeLabelKey: "soft",
267+
NetworkTopologyHighestTierAllowedLabelKey: "not-an-int",
268+
})
269+
270+
minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
271+
totalResource := utils.CalculateDesiredResources(&cluster)
272+
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
273+
274+
require.Error(t, err)
275+
a.Contains(err.Error(), "failed to convert "+NetworkTopologyHighestTierAllowedLabelKey+" label to int")
276+
a.Equal(cluster.Namespace, pg.Namespace)
277+
}
278+
209279
func TestCreatePodGroupForRayJob(t *testing.T) {
210280
a := assert.New(t)
211281
ctx := context.Background()

0 commit comments

Comments
 (0)