Skip to content

Commit 536ca35

Browse files
authored
[Feature][APIServer v2] Support Compute Template in APIServer v2 (#3959)
* feat: extract compute template from spec Signed-off-by: machichima <[email protected]> * feat: add compute template middleware Signed-off-by: machichima <[email protected]> * fix: extract cluster spec from request correctly Signed-off-by: machichima <[email protected]> * feat: convert request to map then to clusterSpec Signed-off-by: machichima <[email protected]> * feat: parse anno/cpu/mem from compute template to request Signed-off-by: machichima <[email protected]> * fix: get compute template name without converting to ClusterSpec struct Signed-off-by: machichima <[email protected]> * feat: apply gpu/extend/toleration Signed-off-by: machichima <[email protected]> * feat: forward modified request body Signed-off-by: machichima <[email protected]> * feat: deal with case with no head/workerGroupSpec set Signed-off-by: machichima <[email protected]> * feat: also support for rayjob and rayservice Signed-off-by: machichima <[email protected]> * test: structure for test compute template middleware Just a structure, test does not work now Signed-off-by: machichima <[email protected]> * fix: update field and match types Signed-off-by: machichima <[email protected]> * feat: use remote execute for apiserver sdk e2e test Signed-off-by: machichima <[email protected]> * feat: pass through if no spec & convert to json Signed-off-by: machichima <[email protected]> * feat: check request body type for unmarshal Signed-off-by: machichima <[email protected]> * test: enable setitng content type in execCommandWithCurlInPod * Trigger CI Signed-off-by: machichima <[email protected]> * fix: extract client manager for mocking * Trigger CI Signed-off-by: machichima <[email protected]> * fix: add missing methods & clean up code * refactor: clean up print * build: go mod tidy * Trigger CI Signed-off-by: machichima <[email protected]> * Trigger CI Signed-off-by: machichima <[email protected]> * refactor: remove unused args * refactor: interface to any * refactor: remove redundant else block * refactor: status code 500 to 422 * feat: support memory unit * fix: use ProxyRoundTripper is apiserver client & remove remote execute client * fix: base url use kubernetesConfig.Host --------- Signed-off-by: machichima <[email protected]>
1 parent eb66a26 commit 536ca35

File tree

12 files changed

+746
-81
lines changed

12 files changed

+746
-81
lines changed

apiserver/cmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ func startHttpProxy() {
173173
KubernetesConfig: kubernetesConfig,
174174
Middleware: corsHandler, // Always set, even if it's a no-op
175175
}
176-
177-
topMux, err = apiserversdk.NewMux(muxConfig)
176+
clientManager := manager.NewClientManager()
177+
topMux, err = apiserversdk.NewMux(muxConfig, &clientManager)
178178
if err != nil {
179179
klog.Fatalf("Failed to create API server mux: %v", err)
180180
}

apiserver/pkg/http/client.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type KuberayAPIServerClient struct {
2525
// See https://github.com/ray-project/kuberay/pull/3334/files#r2041183495 for details.
2626
//
2727
// Store http request handling function for unit test purpose.
28-
executeHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
28+
ExecuteHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
2929
baseURL string
3030
retryCfg apiserversdkutil.RetryConfig
3131
}
@@ -68,13 +68,13 @@ func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg
6868
},
6969
retryCfg: retryCfg,
7070
}
71-
client.executeHttpRequest = client.executeRequest
71+
client.ExecuteHttpRequest = client.executeRequest
7272
return client
7373
}
7474

7575
// Setter function for setting executeHttpRequest method
7676
func (krc *KuberayAPIServerClient) SetExecuteHttpRequest(fn func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)) {
77-
krc.executeHttpRequest = fn
77+
krc.ExecuteHttpRequest = fn
7878
}
7979

8080
// CreateComputeTemplate creates a new compute template.
@@ -94,7 +94,7 @@ func (krc *KuberayAPIServerClient) CreateComputeTemplate(request *api.CreateComp
9494
httpRequest.Header.Add("Accept", "application/json")
9595
httpRequest.Header.Add("Content-Type", "application/json")
9696

97-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
97+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
9898
if err != nil {
9999
return nil, status, err
100100
}
@@ -122,7 +122,7 @@ func (krc *KuberayAPIServerClient) GetComputeTemplate(request *api.GetComputeTem
122122

123123
httpRequest.Header.Add("Accept", "application/json")
124124

125-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
125+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
126126
if err != nil {
127127
return nil, status, err
128128
}
@@ -143,7 +143,7 @@ func (krc *KuberayAPIServerClient) GetAllComputeTemplates() (*api.ListAllCompute
143143

144144
httpRequest.Header.Add("Accept", "application/json")
145145

146-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
146+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
147147
if err != nil {
148148
return nil, status, err
149149
}
@@ -164,7 +164,7 @@ func (krc *KuberayAPIServerClient) GetAllComputeTemplatesInNamespace(request *ap
164164

165165
httpRequest.Header.Add("Accept", "application/json")
166166

167-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
167+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
168168
if err != nil {
169169
return nil, status, err
170170
}
@@ -192,7 +192,7 @@ func (krc *KuberayAPIServerClient) CreateCluster(request *api.CreateClusterReque
192192
httpRequest.Header.Add("Accept", "application/json")
193193
httpRequest.Header.Add("Content-Type", "application/json")
194194

195-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
195+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
196196
if err != nil {
197197
return nil, status, err
198198
}
@@ -219,7 +219,7 @@ func (krc *KuberayAPIServerClient) GetCluster(request *api.GetClusterRequest) (*
219219

220220
httpRequest.Header.Add("Accept", "application/json")
221221

222-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
222+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
223223
if err != nil {
224224
return nil, status, err
225225
}
@@ -245,7 +245,7 @@ func (krc *KuberayAPIServerClient) ListClusters(request *api.ListClustersRequest
245245

246246
httpRequest.Header.Add("Accept", "application/json")
247247

248-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
248+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
249249
if err != nil {
250250
return nil, status, err
251251
}
@@ -271,7 +271,7 @@ func (krc *KuberayAPIServerClient) ListAllClusters(request *api.ListAllClustersR
271271

272272
httpRequest.Header.Add("Accept", "application/json")
273273

274-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
274+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
275275
if err != nil {
276276
return nil, status, err
277277
}
@@ -298,7 +298,7 @@ func (krc *KuberayAPIServerClient) CreateRayJob(request *api.CreateRayJobRequest
298298
httpRequest.Header.Add("Accept", "application/json")
299299
httpRequest.Header.Add("Content-Type", "application/json")
300300

301-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
301+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
302302
if err != nil {
303303
return nil, status, err
304304
}
@@ -319,7 +319,7 @@ func (krc *KuberayAPIServerClient) GetRayJob(request *api.GetRayJobRequest) (*ap
319319

320320
httpRequest.Header.Add("Accept", "application/json")
321321

322-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
322+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
323323
if err != nil {
324324
return nil, status, err
325325
}
@@ -345,7 +345,7 @@ func (krc *KuberayAPIServerClient) ListRayJobs(request *api.ListRayJobsRequest)
345345

346346
httpRequest.Header.Add("Accept", "application/json")
347347

348-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
348+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
349349
if err != nil {
350350
return nil, status, err
351351
}
@@ -370,7 +370,7 @@ func (krc *KuberayAPIServerClient) ListAllRayJobs(request *api.ListAllRayJobsReq
370370
httpRequest.URL.RawQuery = q.Encode()
371371
httpRequest.Header.Add("Accept", "application/json")
372372

373-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
373+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
374374
if err != nil {
375375
return nil, status, err
376376
}
@@ -403,7 +403,7 @@ func (krc *KuberayAPIServerClient) CreateRayService(request *api.CreateRayServic
403403
httpRequest.Header.Add("Accept", "application/json")
404404
httpRequest.Header.Add("Content-Type", "application/json")
405405

406-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
406+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
407407
if err != nil {
408408
return nil, status, err
409409
}
@@ -430,7 +430,7 @@ func (krc *KuberayAPIServerClient) UpdateRayService(request *api.UpdateRayServic
430430
httpRequest.Header.Add("Accept", "application/json")
431431
httpRequest.Header.Add("Content-Type", "application/json")
432432

433-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, updateURL)
433+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, updateURL)
434434
if err != nil {
435435
return nil, status, err
436436
}
@@ -451,7 +451,7 @@ func (krc *KuberayAPIServerClient) GetRayService(request *api.GetRayServiceReque
451451

452452
httpRequest.Header.Add("Accept", "application/json")
453453

454-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
454+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
455455
if err != nil {
456456
return nil, status, err
457457
}
@@ -476,7 +476,7 @@ func (krc *KuberayAPIServerClient) ListRayServices(request *api.ListRayServicesR
476476
httpRequest.URL.RawQuery = q.Encode()
477477
httpRequest.Header.Add("Accept", "application/json")
478478

479-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
479+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
480480
if err != nil {
481481
return nil, status, err
482482
}
@@ -502,7 +502,7 @@ func (krc *KuberayAPIServerClient) ListAllRayServices(request *api.ListAllRaySer
502502
httpRequest.URL.RawQuery = q.Encode()
503503
httpRequest.Header.Add("Accept", "application/json")
504504

505-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
505+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
506506
if err != nil {
507507
return nil, status, err
508508
}
@@ -535,7 +535,7 @@ func (krc *KuberayAPIServerClient) SubmitRayJob(request *api.SubmitRayJobRequest
535535
httpRequest.Header.Add("Accept", "application/json")
536536
httpRequest.Header.Add("Content-Type", "application/json")
537537

538-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
538+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
539539
if err != nil {
540540
return nil, status, err
541541
}
@@ -556,7 +556,7 @@ func (krc *KuberayAPIServerClient) GetRayJobDetails(request *api.GetJobDetailsRe
556556

557557
httpRequest.Header.Add("Accept", "application/json")
558558

559-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
559+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
560560
if err != nil {
561561
return nil, status, err
562562
}
@@ -577,7 +577,7 @@ func (krc *KuberayAPIServerClient) GetRayJobLog(request *api.GetJobLogRequest) (
577577

578578
httpRequest.Header.Add("Accept", "application/json")
579579

580-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
580+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
581581
if err != nil {
582582
return nil, status, err
583583
}
@@ -598,7 +598,7 @@ func (krc *KuberayAPIServerClient) ListRayJobsCluster(request *api.ListJobDetail
598598

599599
httpRequest.Header.Add("Accept", "application/json")
600600

601-
bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
601+
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
602602
if err != nil {
603603
return nil, status, err
604604
}
@@ -621,7 +621,7 @@ func (krc *KuberayAPIServerClient) StopRayJob(request *api.StopRayJobSubmissionR
621621
httpRequest.Header.Add("Accept", "application/json")
622622
httpRequest.Header.Add("Content-Type", "application/json")
623623

624-
_, status, err := krc.executeHttpRequest(httpRequest, createURL)
624+
_, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
625625
if err != nil {
626626
return status, err
627627
}
@@ -640,7 +640,7 @@ func (krc *KuberayAPIServerClient) doDelete(deleteURL string) (*rpcStatus.Status
640640
return nil, fmt.Errorf("failed to create http request for url '%s': %w", deleteURL, err)
641641
}
642642
httpRequest.Header.Add("Accept", "application/json")
643-
_, status, err := krc.executeHttpRequest(httpRequest, deleteURL)
643+
_, status, err := krc.ExecuteHttpRequest(httpRequest, deleteURL)
644644
return status, err
645645
}
646646

apiserver/pkg/http/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestUnmarshalHttpResponseOK(t *testing.T) {
6262
}
6363

6464
client := NewKuberayAPIServerClient("baseurl", nil /*httpClient*/, retryCfg)
65-
client.executeHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
65+
client.ExecuteHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
6666
resp := &api.ListClustersResponse{
6767
Clusters: []*api.Cluster{
6868
{
@@ -98,7 +98,7 @@ func TestUnmarshalHttpResponseFails(t *testing.T) {
9898
}
9999

100100
client := NewKuberayAPIServerClient("baseurl", nil /*httpClient*/, retryCfg)
101-
client.executeHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
101+
client.ExecuteHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
102102
// Intentionall returning a bad response.
103103
return []byte("helloworld"), nil, nil
104104
}

apiserver/pkg/manager/resource_manager.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (r *ResourceManager) getKubernetesNamespaceClient() clientv1.NamespaceInter
5858
// clusters
5959
func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*rayv1api.RayCluster, error) {
6060
// populate cluster map
61-
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster.ClusterSpec, apiCluster.Namespace)
61+
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiCluster.ClusterSpec, apiCluster.Namespace)
6262
if err != nil {
6363
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiCluster.Namespace, apiCluster.Name)
6464
}
@@ -82,13 +82,13 @@ func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Clu
8282
}
8383

8484
// Compute template
85-
func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
85+
func (r *ResourceManager) PopulateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
8686
dict := map[string]*api.ComputeTemplate{}
8787
// populate head compute template
8888
name := clusterSpec.HeadGroupSpec.ComputeTemplate
8989
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
9090
if err != nil {
91-
return nil, err
91+
return nil, fmt.Errorf("Cannot get compute template for name '%s' in namespace '%s', error: %w", name, nameSpace, err)
9292
}
9393
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
9494
dict[name] = computeTemplate
@@ -99,7 +99,7 @@ func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSp
9999
if _, exist := dict[name]; !exist {
100100
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
101101
if err != nil {
102-
return nil, err
102+
return nil, fmt.Errorf("Cannot get compute template for name '%s' in namespace '%s', error: %w", name, nameSpace, err)
103103
}
104104
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
105105
dict[name] = computeTemplate
@@ -160,7 +160,7 @@ func (r *ResourceManager) CreateJob(ctx context.Context, apiJob *api.RayJob) (*r
160160

161161
// populate cluster map
162162
if apiJob.ClusterSpec != nil {
163-
computeTemplateMap, err = r.populateComputeTemplate(ctx, apiJob.ClusterSpec, apiJob.Namespace)
163+
computeTemplateMap, err = r.PopulateComputeTemplate(ctx, apiJob.ClusterSpec, apiJob.Namespace)
164164
if err != nil {
165165
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiJob.Namespace, apiJob.JobId)
166166
}
@@ -227,7 +227,7 @@ func (r *ResourceManager) DeleteJob(ctx context.Context, jobName string, namespa
227227

228228
func (r *ResourceManager) CreateService(ctx context.Context, apiService *api.RayService) (*rayv1api.RayService, error) {
229229
// populate cluster map
230-
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
230+
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
231231
if err != nil {
232232
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
233233
}
@@ -254,7 +254,7 @@ func (r *ResourceManager) UpdateRayService(ctx context.Context, apiService *api.
254254
return nil, util.Wrap(err, fmt.Sprintf("Update service fail, no service named: %s ", name))
255255
}
256256
// populate cluster map
257-
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
257+
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
258258
if err != nil {
259259
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
260260
}

apiserver/pkg/manager/resource_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestPopulateComputeTemplate(t *testing.T) {
7272

7373
// Run
7474
resourceManager := NewResourceManager(mockClientManager)
75-
computeTemplates, err := resourceManager.populateComputeTemplate(ctx, clusterSpec, namespace)
75+
computeTemplates, err := resourceManager.PopulateComputeTemplate(ctx, clusterSpec, namespace)
7676

7777
// Assert
7878
require.NoError(t, err)

0 commit comments

Comments
 (0)