diff --git a/docs/reference/api.md b/docs/reference/api.md index fb6a624645e..cc953cbca50 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -108,6 +108,9 @@ _Appears in:_ DeletionCondition specifies the trigger conditions for a deletion action. +Exactly one of JobStatus or JobDeploymentStatus must be specified: + - JobStatus (application-level): Match the Ray job execution status. + - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted. @@ -116,7 +119,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `ttlSeconds` _integer_ | TTLSeconds is the time in seconds from when the JobStatus
reaches the specified terminal state to when this deletion action should be triggered.
The value must be a non-negative integer. | 0 | Minimum: 0
| +| `ttlSeconds` _integer_ | TTLSeconds is the time in seconds from when the JobStatus or JobDeploymentStatus
reaches the specified terminal state to when this deletion action should be triggered.
The value must be a non-negative integer. | 0 | Minimum: 0
| #### DeletionPolicy diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 1f4c8432168..6268ab28fc6 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -65,6 +65,10 @@ spec: properties: condition: properties: + jobDeploymentStatus: + enum: + - Failed + type: string jobStatus: enum: - SUCCEEDED @@ -75,9 +79,14 @@ spec: format: int32 minimum: 0 type: integer - required: - - jobStatus type: object + x-kubernetes-validations: + - message: JobStatus and JobDeploymentStatus cannot be used + together within the same deletion condition. + rule: '!(has(self.jobStatus) && has(self.jobDeploymentStatus))' + - message: the deletion condition requires either the JobStatus + or the JobDeploymentStatus field. + rule: has(self.jobStatus) || has(self.jobDeploymentStatus) policy: enum: - DeleteCluster diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 705d50dfd40..298da3de91d 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -140,13 +140,26 @@ type DeletionRule struct { } // DeletionCondition specifies the trigger conditions for a deletion action. +// Exactly one of JobStatus or JobDeploymentStatus must be specified: +// - JobStatus (application-level): Match the Ray job execution status. +// - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted. +// +// +kubebuilder:validation:XValidation:rule="!(has(self.jobStatus) && has(self.jobDeploymentStatus))",message="JobStatus and JobDeploymentStatus cannot be used together within the same deletion condition." +// +kubebuilder:validation:XValidation:rule="has(self.jobStatus) || has(self.jobDeploymentStatus)",message="the deletion condition requires either the JobStatus or the JobDeploymentStatus field." type DeletionCondition struct { - // JobStatus is the terminal status of the RayJob that triggers this condition. This field is required. + // JobStatus is the terminal status of the RayJob that triggers this condition. // For the initial implementation, only "SUCCEEDED" and "FAILED" are supported. // +kubebuilder:validation:Enum=SUCCEEDED;FAILED - JobStatus JobStatus `json:"jobStatus"` + // +optional + JobStatus *JobStatus `json:"jobStatus,omitempty"` + + // JobDeploymentStatus is the terminal status of the RayJob deployment that triggers this condition. + // For the initial implementation, only "Failed" is supported. + // +kubebuilder:validation:Enum=Failed + // +optional + JobDeploymentStatus *JobDeploymentStatus `json:"jobDeploymentStatus,omitempty"` - // TTLSeconds is the time in seconds from when the JobStatus + // TTLSeconds is the time in seconds from when the JobStatus or JobDeploymentStatus // reaches the specified terminal state to when this deletion action should be triggered. // The value must be a non-negative integer. // +kubebuilder:default=0 diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index cd710592d98..84fc05a86ab 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -151,6 +151,16 @@ func (in *ClusterUpgradeOptions) DeepCopy() *ClusterUpgradeOptions { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeletionCondition) DeepCopyInto(out *DeletionCondition) { *out = *in + if in.JobStatus != nil { + in, out := &in.JobStatus, &out.JobStatus + *out = new(JobStatus) + **out = **in + } + if in.JobDeploymentStatus != nil { + in, out := &in.JobDeploymentStatus, &out.JobDeploymentStatus + *out = new(JobDeploymentStatus) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletionCondition. @@ -186,7 +196,7 @@ func (in *DeletionPolicy) DeepCopy() *DeletionPolicy { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeletionRule) DeepCopyInto(out *DeletionRule) { *out = *in - out.Condition = in.Condition + in.Condition.DeepCopyInto(&out.Condition) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletionRule. @@ -215,7 +225,9 @@ func (in *DeletionStrategy) DeepCopyInto(out *DeletionStrategy) { if in.DeletionRules != nil { in, out := &in.DeletionRules, &out.DeletionRules *out = make([]DeletionRule, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 1f4c8432168..6268ab28fc6 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -65,6 +65,10 @@ spec: properties: condition: properties: + jobDeploymentStatus: + enum: + - Failed + type: string jobStatus: enum: - SUCCEEDED @@ -75,9 +79,14 @@ spec: format: int32 minimum: 0 type: integer - required: - - jobStatus type: object + x-kubernetes-validations: + - message: JobStatus and JobDeploymentStatus cannot be used + together within the same deletion condition. + rule: '!(has(self.jobStatus) && has(self.jobDeploymentStatus))' + - message: the deletion condition requires either the JobStatus + or the JobDeploymentStatus field. + rule: has(self.jobStatus) || has(self.jobDeploymentStatus) policy: enum: - DeleteCluster diff --git a/ray-operator/config/samples/ray-job.deletion-rules.yaml b/ray-operator/config/samples/ray-job.deletion-rules.yaml index 9bc8ed75a61..f67a2a491ea 100644 --- a/ray-operator/config/samples/ray-job.deletion-rules.yaml +++ b/ray-operator/config/samples/ray-job.deletion-rules.yaml @@ -12,6 +12,12 @@ spec: # DeletionStrategy defines the deletion policies for a RayJob. # It allows for fine-grained control over resource cleanup after a job finishes. # DeletionRules is a list of deletion rules, processed based on their trigger conditions. + # Currently, both JobStatus and JobDeploymentStatus are supported as deletion conditions: + # - JobStatus (application-level): Match the Ray job execution status. + # - Currently, only "SUCCEEDED" and "FAILED" are supported. + # - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted. + # - Currently, only "Failed" is supported. + # For each deletion rule, exactly one of JobStatus and JobDeploymentStatus must be specified. # While the rules can be used to define a sequence, if multiple rules are overdue (e.g., due to controller downtime), # the most impactful rule (e.g., DeleteCluster) will be executed first to prioritize resource cleanup and cost savings. deletionStrategy: diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index e09810022f0..3d3db5a2b4b 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -1216,8 +1216,8 @@ func (r *RayJobReconciler) handleDeletionRules(ctx context.Context, rayJob *rayv // Categorize all applicable and incomplete rules into "overdue" or "pending". for _, rule := range rayJob.Spec.DeletionStrategy.DeletionRules { - // Skip rules that don't match the current job status. - if rule.Condition.JobStatus != rayJob.Status.JobStatus { + // Skip rules that don't match the current job status or job deployment status. + if !isDeletionRuleMatched(rule, rayJob) { continue } @@ -1382,6 +1382,15 @@ func (r *RayJobReconciler) executeDeletionPolicy(ctx context.Context, rayJob *ra return ctrl.Result{}, nil } +// isDeletionRuleMatched checks if the deletion rule matches the current job status or job deployment status. +func isDeletionRuleMatched(rule rayv1.DeletionRule, rayJob *rayv1.RayJob) bool { + // It's guaranteed that exactly one of JobStatus and JobDeploymentStatus is specified. + if rule.Condition.JobStatus != nil { + return *rule.Condition.JobStatus == rayJob.Status.JobStatus + } + return *rule.Condition.JobDeploymentStatus == rayJob.Status.JobDeploymentStatus +} + // isDeletionActionCompleted checks if the state corresponding to a deletion policy is already achieved. // This is crucial for making the reconciliation loop idempotent by checking the actual cluster state. func (r *RayJobReconciler) isDeletionActionCompleted(ctx context.Context, rayJob *rayv1.RayJob, policy rayv1.DeletionPolicyType) (bool, error) { diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index e8ac2be20e3..b72f39e6bee 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -2071,7 +2071,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2084,7 +2084,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2228,7 +2228,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -2241,7 +2241,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -2385,7 +2385,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2398,7 +2398,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2525,7 +2525,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -2538,7 +2538,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -2665,7 +2665,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2780,7 +2780,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -2895,7 +2895,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteNone, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -2908,7 +2908,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteNone, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), }, }, }, @@ -3057,7 +3057,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteNone, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -3070,7 +3070,7 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteNone, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), }, }, }, @@ -3220,21 +3220,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, @@ -3248,21 +3248,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, @@ -3379,21 +3379,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, @@ -3407,21 +3407,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, @@ -3538,21 +3538,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, // Stage 1: Delete workers after 0 seconds }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 5, // Stage 2: Delete cluster after 5 seconds }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, // Stage 3: Delete self after 10 seconds }, }, @@ -3566,21 +3566,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 5, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -3746,21 +3746,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, // Stage 1: Delete workers after 0 seconds }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 5, // Stage 2: Delete cluster after 5 seconds }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 10, // Stage 3: Delete self after 10 seconds }, }, @@ -3774,21 +3774,21 @@ var _ = Context("RayJob with different submission modes", func() { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 5, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 10, }, }, diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 1bcda9af903..c936564a61a 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -470,15 +470,16 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error { rules := rayJob.Spec.DeletionStrategy.DeletionRules isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0 - // Group TTLs by JobStatus for cross-rule validation and uniqueness checking. - rulesByStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32) + // Group TTLs by condition type for cross-rule validation and uniqueness checking. + // We separate JobStatus and JobDeploymentStatus to avoid confusion. + rulesByJobStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32) + rulesByJobDeploymentStatus := make(map[rayv1.JobDeploymentStatus]map[rayv1.DeletionPolicyType]int32) var errs []error // Single pass: Validate each rule individually and group for later consistency checks. for i, rule := range rules { - // Validate TTL is non-negative. - if rule.Condition.TTLSeconds < 0 { - errs = append(errs, fmt.Errorf("deletionRules[%d]: TTLSeconds must be non-negative", i)) + if err := validateDeletionCondition(&rule.Condition); err != nil { + errs = append(errs, fmt.Errorf("deletionRules[%d]: %w", i, err)) continue } @@ -494,24 +495,43 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error { } // Group valid rule for consistency check. - policyTTLs, ok := rulesByStatus[rule.Condition.JobStatus] - if !ok { - policyTTLs = make(map[rayv1.DeletionPolicyType]int32) - rulesByStatus[rule.Condition.JobStatus] = policyTTLs - } + if rule.Condition.JobStatus != nil { + if _, exists := rulesByJobStatus[*rule.Condition.JobStatus]; !exists { + rulesByJobStatus[*rule.Condition.JobStatus] = make(map[rayv1.DeletionPolicyType]int32) + } - // Check for uniqueness of (JobStatus, DeletionPolicyType) pair. - if _, exists := policyTTLs[rule.Policy]; exists { - errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobStatus '%s'", i, rule.Policy, rule.Condition.JobStatus)) - continue - } + // Check for uniqueness of the current deletion rule, which can be identified by the (JobStatus, DeletionPolicyType) pair. + if _, exists := rulesByJobStatus[*rule.Condition.JobStatus][rule.Policy]; exists { + errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobStatus '%s'", i, rule.Policy, *rule.Condition.JobStatus)) + continue + } - policyTTLs[rule.Policy] = rule.Condition.TTLSeconds + rulesByJobStatus[*rule.Condition.JobStatus][rule.Policy] = rule.Condition.TTLSeconds + } else { + if _, exists := rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus]; !exists { + rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus] = make(map[rayv1.DeletionPolicyType]int32) + } + + // Check for uniqueness of the current deletion rule, which can be identified by the (JobDeploymentStatus, DeletionPolicyType) pair. + if _, exists := rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus][rule.Policy]; exists { + errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobDeploymentStatus '%s'", i, rule.Policy, *rule.Condition.JobDeploymentStatus)) + continue + } + + rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus][rule.Policy] = rule.Condition.TTLSeconds + } } // Second pass: Validate TTL consistency per JobStatus. - for status, policyTTLs := range rulesByStatus { - if err := validateTTLConsistency(policyTTLs, status); err != nil { + for jobStatus, policyTTLs := range rulesByJobStatus { + if err := validateTTLConsistency(policyTTLs, "JobStatus", string(jobStatus)); err != nil { + errs = append(errs, err) + } + } + + // Second pass: Validate TTL consistency per JobDeploymentStatus. + for jobDeploymentStatus, policyTTLs := range rulesByJobDeploymentStatus { + if err := validateTTLConsistency(policyTTLs, "JobDeploymentStatus", string(jobDeploymentStatus)); err != nil { errs = append(errs, err) } } @@ -519,9 +539,29 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error { return errstd.Join(errs...) } +// validateDeletionCondition ensures exactly one of JobStatus and JobDeploymentStatus is specified and TTLSeconds is non-negative. +func validateDeletionCondition(deletionCondition *rayv1.DeletionCondition) error { + // Validate that exactly one of JobStatus and JobDeploymentStatus is specified. + hasJobStatus := deletionCondition.JobStatus != nil + hasJobDeploymentStatus := deletionCondition.JobDeploymentStatus != nil + if hasJobStatus && hasJobDeploymentStatus { + return fmt.Errorf("cannot set both JobStatus and JobDeploymentStatus at the same time") + } + if !hasJobStatus && !hasJobDeploymentStatus { + return fmt.Errorf("exactly one of JobStatus and JobDeploymentStatus must be set") + } + + // Validate TTL is non-negative. + if deletionCondition.TTLSeconds < 0 { + return fmt.Errorf("TTLSeconds must be non-negative") + } + + return nil +} + // validateTTLConsistency ensures TTLs follow the deletion hierarchy: Workers <= Cluster <= Self. // (Lower TTL means deletes earlier.) -func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, status rayv1.JobStatus) error { +func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, conditionType string, conditionValue string) error { // Define the required deletion order. TTLs must be non-decreasing along this sequence. deletionOrder := []rayv1.DeletionPolicyType{ rayv1.DeleteWorkers, @@ -543,8 +583,8 @@ func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, statu if hasPrev && ttl < prevTTL { errs = append(errs, fmt.Errorf( - "for JobStatus '%s': %s TTL (%d) must be >= %s TTL (%d)", - status, policy, ttl, prevPolicy, prevTTL, + "for %s '%s': %s TTL (%d) must be >= %s TTL (%d)", + conditionType, conditionValue, policy, ttl, prevPolicy, prevTTL, )) } diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 014d0917d68..c97ba6eb32b 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1335,7 +1335,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1354,7 +1354,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1375,7 +1375,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1411,14 +1411,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 20, }, }, @@ -1436,7 +1436,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: -10, }, }, @@ -1455,7 +1455,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1473,7 +1473,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1490,7 +1490,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1511,14 +1511,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 20, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1536,14 +1536,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 20, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, @@ -1561,28 +1561,28 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { { Policy: rayv1.DeleteWorkers, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 10, }, }, { Policy: rayv1.DeleteCluster, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 20, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusSucceeded, + JobStatus: ptr.To(rayv1.JobStatusSucceeded), TTLSeconds: 30, }, }, { Policy: rayv1.DeleteSelf, Condition: rayv1.DeletionCondition{ - JobStatus: rayv1.JobStatusFailed, + JobStatus: ptr.To(rayv1.JobStatusFailed), TTLSeconds: 0, }, }, @@ -1592,6 +1592,142 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) { }, expectError: false, }, + { + name: "valid deletionRules with JobDeploymentStatus=Failed", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 10, + }, + }, + { + Policy: rayv1.DeleteSelf, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 20, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: false, + }, + { + name: "invalid: both JobStatus and JobDeploymentStatus set", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobStatus: ptr.To(rayv1.JobStatusFailed), + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 10, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: true, + }, + { + name: "invalid: neither JobStatus nor JobDeploymentStatus set", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + TTLSeconds: 10, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: true, + }, + { + name: "duplicate rule with JobDeploymentStatus", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 10, + }, + }, + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 20, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: true, + }, + { + name: "valid: mixed JobStatus and JobDeploymentStatus rules", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteWorkers, + Condition: rayv1.DeletionCondition{ + JobStatus: ptr.To(rayv1.JobStatusFailed), + TTLSeconds: 10, + }, + }, + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 20, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: false, + }, + { + name: "inconsistent TTLs with JobDeploymentStatus (DeleteCluster < DeleteWorkers)", + spec: rayv1.RayJobSpec{ + DeletionStrategy: &rayv1.DeletionStrategy{ + DeletionRules: []rayv1.DeletionRule{ + { + Policy: rayv1.DeleteWorkers, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 20, + }, + }, + { + Policy: rayv1.DeleteCluster, + Condition: rayv1.DeletionCondition{ + JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed), + TTLSeconds: 10, + }, + }, + }, + }, + RayClusterSpec: createBasicRayClusterSpec(), + }, + expectError: true, + }, } features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true) diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go index 36b8c006209..ee816ecf5bf 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go @@ -9,8 +9,9 @@ import ( // DeletionConditionApplyConfiguration represents a declarative configuration of the DeletionCondition type for use // with apply. type DeletionConditionApplyConfiguration struct { - JobStatus *rayv1.JobStatus `json:"jobStatus,omitempty"` - TTLSeconds *int32 `json:"ttlSeconds,omitempty"` + JobStatus *rayv1.JobStatus `json:"jobStatus,omitempty"` + JobDeploymentStatus *rayv1.JobDeploymentStatus `json:"jobDeploymentStatus,omitempty"` + TTLSeconds *int32 `json:"ttlSeconds,omitempty"` } // DeletionConditionApplyConfiguration constructs a declarative configuration of the DeletionCondition type for use with @@ -27,6 +28,14 @@ func (b *DeletionConditionApplyConfiguration) WithJobStatus(value rayv1.JobStatu return b } +// WithJobDeploymentStatus sets the JobDeploymentStatus field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the JobDeploymentStatus field is set to the value of the last call. +func (b *DeletionConditionApplyConfiguration) WithJobDeploymentStatus(value rayv1.JobDeploymentStatus) *DeletionConditionApplyConfiguration { + b.JobDeploymentStatus = &value + return b +} + // WithTTLSeconds sets the TTLSeconds field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the TTLSeconds field is set to the value of the last call. diff --git a/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go b/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go index 49718d3544b..533e77665fc 100644 --- a/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go +++ b/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go @@ -22,7 +22,7 @@ func TestDeletionStrategy(t *testing.T) { // Job scripts - using existing counter.py for successful jobs and fail.py for failed jobs // Note: This test suite requires the RayJobDeletionPolicy feature gate to be enabled - jobsAC := NewConfigMap(namespace.Name, Files(test, "counter.py", "fail.py")) + jobsAC := NewConfigMap(namespace.Name, Files(test, "counter.py", "fail.py", "long_running.py")) jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name) @@ -501,4 +501,295 @@ env_vars: g.Eventually(func() error { _, err := GetRayJob(test, job.Namespace, job.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) LogWithTimestamp(test.T(), "Cleanup after legacy success scenario complete") }) + + test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteWorkers policy should delete only worker pods", func(_ *testing.T) { + // Create a RayJob with DeleteWorkers policy, short activeDeadlineSeconds, and short TTL for faster testing. + rayJobAC := rayv1ac.RayJob("delete-workers-after-jobdeploymentstatus-failed", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running + WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy + WithDeletionStrategy(rayv1ac.DeletionStrategy(). + WithDeletionRules( + rayv1ac.DeletionRule(). + WithPolicy(rayv1.DeleteWorkers). + WithCondition(rayv1ac.DeletionCondition(). + WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed). + WithTTLSeconds(10)), // 10 second TTL for testing + )). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout. + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name) + + // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded. + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning)) + g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded)) + + // Get the associated RayCluster name. We assert it's non-empty explicitly so that + // test failures surface here (clear message) rather than later when using an empty name. + rayClusterName := rayJob.Status.RayClusterName + g.Expect(rayClusterName).NotTo(BeEmpty()) + + // Verify cluster and workers exist initially. + g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutShort). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + // Count initial worker pods. + cluster, err := GetRayCluster(test, namespace.Name, rayClusterName) + g.Expect(err).NotTo(HaveOccurred()) + initialWorkerPods, err := GetWorkerPods(test, cluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(initialWorkerPods).ToNot(BeEmpty()) + LogWithTimestamp(test.T(), "Found %d worker pods initially", len(initialWorkerPods)) + + // Verify resources persist during TTL wait period (first 8 seconds of 10s TTL). + LogWithTimestamp(test.T(), "Verifying resources persist during TTL wait period...") + g.Consistently(func(gg Gomega) { + cluster, err := GetRayCluster(test, namespace.Name, rayClusterName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cluster).NotTo(BeNil()) + workerPods, err := GetWorkerPods(test, cluster) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(workerPods).ToNot(BeEmpty()) + headPod, err := GetHeadPod(test, cluster) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(headPod).NotTo(BeNil()) + jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(jobObj).NotTo(BeNil()) + }, 8*time.Second, 2*time.Second).Should(Succeed()) // Check every 2s for 8s + LogWithTimestamp(test.T(), "Resources confirmed stable during TTL wait period") + + // Wait for TTL to expire and workers to be deleted. + LogWithTimestamp(test.T(), "Waiting for TTL to expire and workers to be deleted...") + g.Eventually(func(gg Gomega) { + cluster, err := GetRayCluster(test, namespace.Name, rayClusterName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cluster).NotTo(BeNil()) + workerPods, err := GetWorkerPods(test, cluster) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(workerPods).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed()) + LogWithTimestamp(test.T(), "Worker pods deleted successfully") + + // Verify cluster still exists (head pod should remain). + g.Consistently(RayCluster(test, namespace.Name, rayClusterName), 10*time.Second). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + // Verify head pod still exists. + cluster, err = GetRayCluster(test, namespace.Name, rayClusterName) + g.Expect(err).NotTo(HaveOccurred()) + headPod, err := GetHeadPod(test, cluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(headPod).NotTo(BeNil()) + LogWithTimestamp(test.T(), "Head pod preserved as expected") + + // Verify RayJob still exists. + jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(jobObj).NotTo(BeNil()) + LogWithTimestamp(test.T(), "RayJob preserved as expected") + + // Cleanup: delete RayJob to free resources (cluster should be GC'd eventually if owned). + LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteWorkers scenario", jobObj.Namespace, jobObj.Name) + err = test.Client().Ray().RayV1().RayJobs(jobObj.Namespace).Delete(test.Ctx(), jobObj.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func() error { _, err := GetRayJob(test, jobObj.Namespace, jobObj.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + // Cluster may take a moment to be garbage collected; tolerate already-deleted state. + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "Cleanup after DeleteWorkers scenario complete") + }) + + test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteCluster policy should delete entire cluster", func(_ *testing.T) { + // Create a RayJob with DeleteCluster policy, short activeDeadlineSeconds, and short TTL for faster testing. + rayJobAC := rayv1ac.RayJob("delete-cluster-after-jobdeploymentstatus-failed", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running + WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy + WithDeletionStrategy(rayv1ac.DeletionStrategy(). + WithDeletionRules( + rayv1ac.DeletionRule(). + WithPolicy(rayv1.DeleteCluster). + WithCondition(rayv1ac.DeletionCondition(). + WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed). + WithTTLSeconds(10)), // 10 second TTL for testing + )). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout. + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name) + + // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded. + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning)) + g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded)) + + // Get the associated RayCluster name (early assertion for clearer diagnostics). + rayClusterName := rayJob.Status.RayClusterName + g.Expect(rayClusterName).NotTo(BeEmpty()) + + // Verify cluster exists initially. + g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutShort). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + // Wait for TTL to expire and cluster to be deleted. + LogWithTimestamp(test.T(), "Waiting for TTL to expire and cluster to be deleted...") + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "RayCluster deleted successfully") + + // Verify RayJob still exists. + jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(jobObj).NotTo(BeNil()) + LogWithTimestamp(test.T(), "RayJob preserved as expected") + + // Cleanup: delete RayJob (cluster already deleted by policy). + LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteCluster scenario", jobObj.Namespace, jobObj.Name) + err = test.Client().Ray().RayV1().RayJobs(jobObj.Namespace).Delete(test.Ctx(), jobObj.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func() error { _, err := GetRayJob(test, jobObj.Namespace, jobObj.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "Cleanup after DeleteCluster scenario complete") + }) + + test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteSelf policy should delete RayJob and cluster", func(_ *testing.T) { + // Create a RayJob with DeleteSelf policy, short activeDeadlineSeconds, and short TTL for faster testing. + rayJobAC := rayv1ac.RayJob("delete-self-after-jobdeploymentstatus-failed", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running + WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy + WithDeletionStrategy(rayv1ac.DeletionStrategy(). + WithDeletionRules( + rayv1ac.DeletionRule(). + WithPolicy(rayv1.DeleteSelf). + WithCondition(rayv1ac.DeletionCondition(). + WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed). + WithTTLSeconds(10)), // 10 second TTL for testing + )). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout. + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name) + + // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded. + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning)) + g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded)) + + // Get the associated RayCluster name (early assertion for clearer diagnostics). + rayClusterName := rayJob.Status.RayClusterName + g.Expect(rayClusterName).NotTo(BeEmpty()) + + // Wait for TTL to expire and RayJob (and cluster) to be deleted. + LogWithTimestamp(test.T(), "Waiting for TTL to expire and RayJob to be deleted...") + g.Eventually(func() error { + _, err := GetRayJob(test, rayJob.Namespace, rayJob.Name) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "RayJob deleted successfully") + + // Verify associated cluster is also deleted. + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "Associated RayCluster deleted successfully") + }) + + test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteNone policy should preserve all resources", func(_ *testing.T) { + // Create a RayJob with DeleteNone policy, short activeDeadlineSeconds, and short TTL for faster testing. + rayJobAC := rayv1ac.RayJob("delete-none-after-jobdeploymentstatus-failed", namespace.Name). + WithSpec(rayv1ac.RayJobSpec(). + WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). + WithEntrypoint("python /home/ray/jobs/long_running.py"). + WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running + WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy + WithDeletionStrategy(rayv1ac.DeletionStrategy(). + WithDeletionRules( + rayv1ac.DeletionRule(). + WithPolicy(rayv1.DeleteNone). + WithCondition(rayv1ac.DeletionCondition(). + WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed). + WithTTLSeconds(10)), // 10 second TTL for testing + )). + WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) + + rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) + + // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout. + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) + LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name) + + // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded. + rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning)) + g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded)) + + // Get the associated RayCluster name (early assertion for clearer diagnostics). + rayClusterName := rayJob.Status.RayClusterName + g.Expect(rayClusterName).NotTo(BeEmpty()) + + // Wait well past the TTL and verify everything is preserved. + LogWithTimestamp(test.T(), "Waiting past TTL to verify resources are preserved...") + g.Consistently(func(gg Gomega) { + jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(jobObj).NotTo(BeNil()) + cluster, err := GetRayCluster(test, namespace.Name, rayClusterName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cluster).NotTo(BeNil()) + workerPods, err := GetWorkerPods(test, cluster) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(workerPods).ToNot(BeEmpty()) + }, 10*time.Second, 2*time.Second).Should(Succeed()) + LogWithTimestamp(test.T(), "All resources preserved as expected with DeleteNone policy") + + // Cleanup: delete RayJob to release cluster and pods. + LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteNone scenario", rayJob.Namespace, rayJob.Name) + err = test.Client().Ray().RayV1().RayJobs(rayJob.Namespace).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func() error { _, err := GetRayJob(test, rayJob.Namespace, rayJob.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, rayClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) + LogWithTimestamp(test.T(), "Cleanup after DeleteNone scenario complete") + }) }