diff --git a/docs/reference/api.md b/docs/reference/api.md
index fb6a624645e..cc953cbca50 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -108,6 +108,9 @@ _Appears in:_
DeletionCondition specifies the trigger conditions for a deletion action.
+Exactly one of JobStatus or JobDeploymentStatus must be specified:
+ - JobStatus (application-level): Match the Ray job execution status.
+ - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted.
@@ -116,7 +119,7 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
-| `ttlSeconds` _integer_ | TTLSeconds is the time in seconds from when the JobStatus
reaches the specified terminal state to when this deletion action should be triggered.
The value must be a non-negative integer. | 0 | Minimum: 0
|
+| `ttlSeconds` _integer_ | TTLSeconds is the time in seconds from when the JobStatus or JobDeploymentStatus
reaches the specified terminal state to when this deletion action should be triggered.
The value must be a non-negative integer. | 0 | Minimum: 0
|
#### DeletionPolicy
diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
index 1f4c8432168..6268ab28fc6 100644
--- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
+++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
@@ -65,6 +65,10 @@ spec:
properties:
condition:
properties:
+ jobDeploymentStatus:
+ enum:
+ - Failed
+ type: string
jobStatus:
enum:
- SUCCEEDED
@@ -75,9 +79,14 @@ spec:
format: int32
minimum: 0
type: integer
- required:
- - jobStatus
type: object
+ x-kubernetes-validations:
+ - message: JobStatus and JobDeploymentStatus cannot be used
+ together within the same deletion condition.
+ rule: '!(has(self.jobStatus) && has(self.jobDeploymentStatus))'
+ - message: the deletion condition requires either the JobStatus
+ or the JobDeploymentStatus field.
+ rule: has(self.jobStatus) || has(self.jobDeploymentStatus)
policy:
enum:
- DeleteCluster
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index 705d50dfd40..298da3de91d 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -140,13 +140,26 @@ type DeletionRule struct {
}
// DeletionCondition specifies the trigger conditions for a deletion action.
+// Exactly one of JobStatus or JobDeploymentStatus must be specified:
+// - JobStatus (application-level): Match the Ray job execution status.
+// - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted.
+//
+// +kubebuilder:validation:XValidation:rule="!(has(self.jobStatus) && has(self.jobDeploymentStatus))",message="JobStatus and JobDeploymentStatus cannot be used together within the same deletion condition."
+// +kubebuilder:validation:XValidation:rule="has(self.jobStatus) || has(self.jobDeploymentStatus)",message="the deletion condition requires either the JobStatus or the JobDeploymentStatus field."
type DeletionCondition struct {
- // JobStatus is the terminal status of the RayJob that triggers this condition. This field is required.
+ // JobStatus is the terminal status of the RayJob that triggers this condition.
// For the initial implementation, only "SUCCEEDED" and "FAILED" are supported.
// +kubebuilder:validation:Enum=SUCCEEDED;FAILED
- JobStatus JobStatus `json:"jobStatus"`
+ // +optional
+ JobStatus *JobStatus `json:"jobStatus,omitempty"`
+
+ // JobDeploymentStatus is the terminal status of the RayJob deployment that triggers this condition.
+ // For the initial implementation, only "Failed" is supported.
+ // +kubebuilder:validation:Enum=Failed
+ // +optional
+ JobDeploymentStatus *JobDeploymentStatus `json:"jobDeploymentStatus,omitempty"`
- // TTLSeconds is the time in seconds from when the JobStatus
+ // TTLSeconds is the time in seconds from when the JobStatus or JobDeploymentStatus
// reaches the specified terminal state to when this deletion action should be triggered.
// The value must be a non-negative integer.
// +kubebuilder:default=0
diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
index cd710592d98..84fc05a86ab 100644
--- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
+++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
@@ -151,6 +151,16 @@ func (in *ClusterUpgradeOptions) DeepCopy() *ClusterUpgradeOptions {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeletionCondition) DeepCopyInto(out *DeletionCondition) {
*out = *in
+ if in.JobStatus != nil {
+ in, out := &in.JobStatus, &out.JobStatus
+ *out = new(JobStatus)
+ **out = **in
+ }
+ if in.JobDeploymentStatus != nil {
+ in, out := &in.JobDeploymentStatus, &out.JobDeploymentStatus
+ *out = new(JobDeploymentStatus)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletionCondition.
@@ -186,7 +196,7 @@ func (in *DeletionPolicy) DeepCopy() *DeletionPolicy {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeletionRule) DeepCopyInto(out *DeletionRule) {
*out = *in
- out.Condition = in.Condition
+ in.Condition.DeepCopyInto(&out.Condition)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletionRule.
@@ -215,7 +225,9 @@ func (in *DeletionStrategy) DeepCopyInto(out *DeletionStrategy) {
if in.DeletionRules != nil {
in, out := &in.DeletionRules, &out.DeletionRules
*out = make([]DeletionRule, len(*in))
- copy(*out, *in)
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
}
}
diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
index 1f4c8432168..6268ab28fc6 100644
--- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
+++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
@@ -65,6 +65,10 @@ spec:
properties:
condition:
properties:
+ jobDeploymentStatus:
+ enum:
+ - Failed
+ type: string
jobStatus:
enum:
- SUCCEEDED
@@ -75,9 +79,14 @@ spec:
format: int32
minimum: 0
type: integer
- required:
- - jobStatus
type: object
+ x-kubernetes-validations:
+ - message: JobStatus and JobDeploymentStatus cannot be used
+ together within the same deletion condition.
+ rule: '!(has(self.jobStatus) && has(self.jobDeploymentStatus))'
+ - message: the deletion condition requires either the JobStatus
+ or the JobDeploymentStatus field.
+ rule: has(self.jobStatus) || has(self.jobDeploymentStatus)
policy:
enum:
- DeleteCluster
diff --git a/ray-operator/config/samples/ray-job.deletion-rules.yaml b/ray-operator/config/samples/ray-job.deletion-rules.yaml
index 9bc8ed75a61..f67a2a491ea 100644
--- a/ray-operator/config/samples/ray-job.deletion-rules.yaml
+++ b/ray-operator/config/samples/ray-job.deletion-rules.yaml
@@ -12,6 +12,12 @@ spec:
# DeletionStrategy defines the deletion policies for a RayJob.
# It allows for fine-grained control over resource cleanup after a job finishes.
# DeletionRules is a list of deletion rules, processed based on their trigger conditions.
+ # Currently, both JobStatus and JobDeploymentStatus are supported as deletion conditions:
+ # - JobStatus (application-level): Match the Ray job execution status.
+ # - Currently, only "SUCCEEDED" and "FAILED" are supported.
+ # - JobDeploymentStatus (infrastructure-level): Match the RayJob deployment lifecycle status. This is particularly useful for cleaning up resources when Ray jobs fail to be submitted.
+ # - Currently, only "Failed" is supported.
+ # For each deletion rule, exactly one of JobStatus and JobDeploymentStatus must be specified.
# While the rules can be used to define a sequence, if multiple rules are overdue (e.g., due to controller downtime),
# the most impactful rule (e.g., DeleteCluster) will be executed first to prioritize resource cleanup and cost savings.
deletionStrategy:
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index e09810022f0..3d3db5a2b4b 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -1216,8 +1216,8 @@ func (r *RayJobReconciler) handleDeletionRules(ctx context.Context, rayJob *rayv
// Categorize all applicable and incomplete rules into "overdue" or "pending".
for _, rule := range rayJob.Spec.DeletionStrategy.DeletionRules {
- // Skip rules that don't match the current job status.
- if rule.Condition.JobStatus != rayJob.Status.JobStatus {
+ // Skip rules that don't match the current job status or job deployment status.
+ if !isDeletionRuleMatched(rule, rayJob) {
continue
}
@@ -1382,6 +1382,15 @@ func (r *RayJobReconciler) executeDeletionPolicy(ctx context.Context, rayJob *ra
return ctrl.Result{}, nil
}
+// isDeletionRuleMatched checks if the deletion rule matches the current job status or job deployment status.
+func isDeletionRuleMatched(rule rayv1.DeletionRule, rayJob *rayv1.RayJob) bool {
+ // It's guaranteed that exactly one of JobStatus and JobDeploymentStatus is specified.
+ if rule.Condition.JobStatus != nil {
+ return *rule.Condition.JobStatus == rayJob.Status.JobStatus
+ }
+ return *rule.Condition.JobDeploymentStatus == rayJob.Status.JobDeploymentStatus
+}
+
// isDeletionActionCompleted checks if the state corresponding to a deletion policy is already achieved.
// This is crucial for making the reconciliation loop idempotent by checking the actual cluster state.
func (r *RayJobReconciler) isDeletionActionCompleted(ctx context.Context, rayJob *rayv1.RayJob, policy rayv1.DeletionPolicyType) (bool, error) {
diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go
index e8ac2be20e3..b72f39e6bee 100644
--- a/ray-operator/controllers/ray/rayjob_controller_test.go
+++ b/ray-operator/controllers/ray/rayjob_controller_test.go
@@ -2071,7 +2071,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2084,7 +2084,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2228,7 +2228,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -2241,7 +2241,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -2385,7 +2385,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2398,7 +2398,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2525,7 +2525,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -2538,7 +2538,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -2665,7 +2665,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2780,7 +2780,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -2895,7 +2895,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteNone,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -2908,7 +2908,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteNone,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
},
},
},
@@ -3057,7 +3057,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteNone,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -3070,7 +3070,7 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteNone,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
},
},
},
@@ -3220,21 +3220,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
@@ -3248,21 +3248,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
@@ -3379,21 +3379,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
@@ -3407,21 +3407,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
@@ -3538,21 +3538,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0, // Stage 1: Delete workers after 0 seconds
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 5, // Stage 2: Delete cluster after 5 seconds
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10, // Stage 3: Delete self after 10 seconds
},
},
@@ -3566,21 +3566,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 5,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -3746,21 +3746,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0, // Stage 1: Delete workers after 0 seconds
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 5, // Stage 2: Delete cluster after 5 seconds
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 10, // Stage 3: Delete self after 10 seconds
},
},
@@ -3774,21 +3774,21 @@ var _ = Context("RayJob with different submission modes", func() {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 5,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 10,
},
},
diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go
index 1bcda9af903..c936564a61a 100644
--- a/ray-operator/controllers/ray/utils/validation.go
+++ b/ray-operator/controllers/ray/utils/validation.go
@@ -470,15 +470,16 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error {
rules := rayJob.Spec.DeletionStrategy.DeletionRules
isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
- // Group TTLs by JobStatus for cross-rule validation and uniqueness checking.
- rulesByStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32)
+ // Group TTLs by condition type for cross-rule validation and uniqueness checking.
+ // We separate JobStatus and JobDeploymentStatus to avoid confusion.
+ rulesByJobStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32)
+ rulesByJobDeploymentStatus := make(map[rayv1.JobDeploymentStatus]map[rayv1.DeletionPolicyType]int32)
var errs []error
// Single pass: Validate each rule individually and group for later consistency checks.
for i, rule := range rules {
- // Validate TTL is non-negative.
- if rule.Condition.TTLSeconds < 0 {
- errs = append(errs, fmt.Errorf("deletionRules[%d]: TTLSeconds must be non-negative", i))
+ if err := validateDeletionCondition(&rule.Condition); err != nil {
+ errs = append(errs, fmt.Errorf("deletionRules[%d]: %w", i, err))
continue
}
@@ -494,24 +495,43 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error {
}
// Group valid rule for consistency check.
- policyTTLs, ok := rulesByStatus[rule.Condition.JobStatus]
- if !ok {
- policyTTLs = make(map[rayv1.DeletionPolicyType]int32)
- rulesByStatus[rule.Condition.JobStatus] = policyTTLs
- }
+ if rule.Condition.JobStatus != nil {
+ if _, exists := rulesByJobStatus[*rule.Condition.JobStatus]; !exists {
+ rulesByJobStatus[*rule.Condition.JobStatus] = make(map[rayv1.DeletionPolicyType]int32)
+ }
- // Check for uniqueness of (JobStatus, DeletionPolicyType) pair.
- if _, exists := policyTTLs[rule.Policy]; exists {
- errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobStatus '%s'", i, rule.Policy, rule.Condition.JobStatus))
- continue
- }
+ // Check for uniqueness of the current deletion rule, which can be identified by the (JobStatus, DeletionPolicyType) pair.
+ if _, exists := rulesByJobStatus[*rule.Condition.JobStatus][rule.Policy]; exists {
+ errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobStatus '%s'", i, rule.Policy, *rule.Condition.JobStatus))
+ continue
+ }
- policyTTLs[rule.Policy] = rule.Condition.TTLSeconds
+ rulesByJobStatus[*rule.Condition.JobStatus][rule.Policy] = rule.Condition.TTLSeconds
+ } else {
+ if _, exists := rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus]; !exists {
+ rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus] = make(map[rayv1.DeletionPolicyType]int32)
+ }
+
+ // Check for uniqueness of the current deletion rule, which can be identified by the (JobDeploymentStatus, DeletionPolicyType) pair.
+ if _, exists := rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus][rule.Policy]; exists {
+ errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for DeletionPolicyType '%s' and JobDeploymentStatus '%s'", i, rule.Policy, *rule.Condition.JobDeploymentStatus))
+ continue
+ }
+
+ rulesByJobDeploymentStatus[*rule.Condition.JobDeploymentStatus][rule.Policy] = rule.Condition.TTLSeconds
+ }
}
// Second pass: Validate TTL consistency per JobStatus.
- for status, policyTTLs := range rulesByStatus {
- if err := validateTTLConsistency(policyTTLs, status); err != nil {
+ for jobStatus, policyTTLs := range rulesByJobStatus {
+ if err := validateTTLConsistency(policyTTLs, "JobStatus", string(jobStatus)); err != nil {
+ errs = append(errs, err)
+ }
+ }
+
+ // Second pass: Validate TTL consistency per JobDeploymentStatus.
+ for jobDeploymentStatus, policyTTLs := range rulesByJobDeploymentStatus {
+ if err := validateTTLConsistency(policyTTLs, "JobDeploymentStatus", string(jobDeploymentStatus)); err != nil {
errs = append(errs, err)
}
}
@@ -519,9 +539,29 @@ func validateDeletionRules(rayJob *rayv1.RayJob) error {
return errstd.Join(errs...)
}
+// validateDeletionCondition ensures exactly one of JobStatus and JobDeploymentStatus is specified and TTLSeconds is non-negative.
+func validateDeletionCondition(deletionCondition *rayv1.DeletionCondition) error {
+ // Validate that exactly one of JobStatus and JobDeploymentStatus is specified.
+ hasJobStatus := deletionCondition.JobStatus != nil
+ hasJobDeploymentStatus := deletionCondition.JobDeploymentStatus != nil
+ if hasJobStatus && hasJobDeploymentStatus {
+ return fmt.Errorf("cannot set both JobStatus and JobDeploymentStatus at the same time")
+ }
+ if !hasJobStatus && !hasJobDeploymentStatus {
+ return fmt.Errorf("exactly one of JobStatus and JobDeploymentStatus must be set")
+ }
+
+ // Validate TTL is non-negative.
+ if deletionCondition.TTLSeconds < 0 {
+ return fmt.Errorf("TTLSeconds must be non-negative")
+ }
+
+ return nil
+}
+
// validateTTLConsistency ensures TTLs follow the deletion hierarchy: Workers <= Cluster <= Self.
// (Lower TTL means deletes earlier.)
-func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, status rayv1.JobStatus) error {
+func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, conditionType string, conditionValue string) error {
// Define the required deletion order. TTLs must be non-decreasing along this sequence.
deletionOrder := []rayv1.DeletionPolicyType{
rayv1.DeleteWorkers,
@@ -543,8 +583,8 @@ func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, statu
if hasPrev && ttl < prevTTL {
errs = append(errs, fmt.Errorf(
- "for JobStatus '%s': %s TTL (%d) must be >= %s TTL (%d)",
- status, policy, ttl, prevPolicy, prevTTL,
+ "for %s '%s': %s TTL (%d) must be >= %s TTL (%d)",
+ conditionType, conditionValue, policy, ttl, prevPolicy, prevTTL,
))
}
diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go
index 014d0917d68..c97ba6eb32b 100644
--- a/ray-operator/controllers/ray/utils/validation_test.go
+++ b/ray-operator/controllers/ray/utils/validation_test.go
@@ -1335,7 +1335,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1354,7 +1354,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1375,7 +1375,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1411,14 +1411,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 20,
},
},
@@ -1436,7 +1436,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: -10,
},
},
@@ -1455,7 +1455,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1473,7 +1473,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1490,7 +1490,7 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1511,14 +1511,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 20,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1536,14 +1536,14 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 20,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
@@ -1561,28 +1561,28 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
{
Policy: rayv1.DeleteWorkers,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 10,
},
},
{
Policy: rayv1.DeleteCluster,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 20,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusSucceeded,
+ JobStatus: ptr.To(rayv1.JobStatusSucceeded),
TTLSeconds: 30,
},
},
{
Policy: rayv1.DeleteSelf,
Condition: rayv1.DeletionCondition{
- JobStatus: rayv1.JobStatusFailed,
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
TTLSeconds: 0,
},
},
@@ -1592,6 +1592,142 @@ func TestValidateRayJobSpecWithFeatureGate(t *testing.T) {
},
expectError: false,
},
+ {
+ name: "valid deletionRules with JobDeploymentStatus=Failed",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 10,
+ },
+ },
+ {
+ Policy: rayv1.DeleteSelf,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 20,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: false,
+ },
+ {
+ name: "invalid: both JobStatus and JobDeploymentStatus set",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 10,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: true,
+ },
+ {
+ name: "invalid: neither JobStatus nor JobDeploymentStatus set",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ TTLSeconds: 10,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: true,
+ },
+ {
+ name: "duplicate rule with JobDeploymentStatus",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 10,
+ },
+ },
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 20,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: true,
+ },
+ {
+ name: "valid: mixed JobStatus and JobDeploymentStatus rules",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteWorkers,
+ Condition: rayv1.DeletionCondition{
+ JobStatus: ptr.To(rayv1.JobStatusFailed),
+ TTLSeconds: 10,
+ },
+ },
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 20,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: false,
+ },
+ {
+ name: "inconsistent TTLs with JobDeploymentStatus (DeleteCluster < DeleteWorkers)",
+ spec: rayv1.RayJobSpec{
+ DeletionStrategy: &rayv1.DeletionStrategy{
+ DeletionRules: []rayv1.DeletionRule{
+ {
+ Policy: rayv1.DeleteWorkers,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 20,
+ },
+ },
+ {
+ Policy: rayv1.DeleteCluster,
+ Condition: rayv1.DeletionCondition{
+ JobDeploymentStatus: ptr.To(rayv1.JobDeploymentStatusFailed),
+ TTLSeconds: 10,
+ },
+ },
+ },
+ },
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: true,
+ },
}
features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)
diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go
index 36b8c006209..ee816ecf5bf 100644
--- a/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go
+++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/deletioncondition.go
@@ -9,8 +9,9 @@ import (
// DeletionConditionApplyConfiguration represents a declarative configuration of the DeletionCondition type for use
// with apply.
type DeletionConditionApplyConfiguration struct {
- JobStatus *rayv1.JobStatus `json:"jobStatus,omitempty"`
- TTLSeconds *int32 `json:"ttlSeconds,omitempty"`
+ JobStatus *rayv1.JobStatus `json:"jobStatus,omitempty"`
+ JobDeploymentStatus *rayv1.JobDeploymentStatus `json:"jobDeploymentStatus,omitempty"`
+ TTLSeconds *int32 `json:"ttlSeconds,omitempty"`
}
// DeletionConditionApplyConfiguration constructs a declarative configuration of the DeletionCondition type for use with
@@ -27,6 +28,14 @@ func (b *DeletionConditionApplyConfiguration) WithJobStatus(value rayv1.JobStatu
return b
}
+// WithJobDeploymentStatus sets the JobDeploymentStatus field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the JobDeploymentStatus field is set to the value of the last call.
+func (b *DeletionConditionApplyConfiguration) WithJobDeploymentStatus(value rayv1.JobDeploymentStatus) *DeletionConditionApplyConfiguration {
+ b.JobDeploymentStatus = &value
+ return b
+}
+
// WithTTLSeconds sets the TTLSeconds field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the TTLSeconds field is set to the value of the last call.
diff --git a/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go b/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go
index 49718d3544b..533e77665fc 100644
--- a/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go
+++ b/ray-operator/test/e2erayjob/rayjob_deletion_strategy_test.go
@@ -22,7 +22,7 @@ func TestDeletionStrategy(t *testing.T) {
// Job scripts - using existing counter.py for successful jobs and fail.py for failed jobs
// Note: This test suite requires the RayJobDeletionPolicy feature gate to be enabled
- jobsAC := NewConfigMap(namespace.Name, Files(test, "counter.py", "fail.py"))
+ jobsAC := NewConfigMap(namespace.Name, Files(test, "counter.py", "fail.py", "long_running.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
@@ -501,4 +501,295 @@ env_vars:
g.Eventually(func() error { _, err := GetRayJob(test, job.Namespace, job.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
LogWithTimestamp(test.T(), "Cleanup after legacy success scenario complete")
})
+
+ test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteWorkers policy should delete only worker pods", func(_ *testing.T) {
+ // Create a RayJob with DeleteWorkers policy, short activeDeadlineSeconds, and short TTL for faster testing.
+ rayJobAC := rayv1ac.RayJob("delete-workers-after-jobdeploymentstatus-failed", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
+ WithEntrypoint("python /home/ray/jobs/long_running.py").
+ WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running
+ WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy
+ WithDeletionStrategy(rayv1ac.DeletionStrategy().
+ WithDeletionRules(
+ rayv1ac.DeletionRule().
+ WithPolicy(rayv1.DeleteWorkers).
+ WithCondition(rayv1ac.DeletionCondition().
+ WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed).
+ WithTTLSeconds(10)), // 10 second TTL for testing
+ )).
+ WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+ LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name)
+
+ // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded.
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning))
+ g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded))
+
+ // Get the associated RayCluster name. We assert it's non-empty explicitly so that
+ // test failures surface here (clear message) rather than later when using an empty name.
+ rayClusterName := rayJob.Status.RayClusterName
+ g.Expect(rayClusterName).NotTo(BeEmpty())
+
+ // Verify cluster and workers exist initially.
+ g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutShort).
+ Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
+
+ // Count initial worker pods.
+ cluster, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ g.Expect(err).NotTo(HaveOccurred())
+ initialWorkerPods, err := GetWorkerPods(test, cluster)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(initialWorkerPods).ToNot(BeEmpty())
+ LogWithTimestamp(test.T(), "Found %d worker pods initially", len(initialWorkerPods))
+
+ // Verify resources persist during TTL wait period (first 8 seconds of 10s TTL).
+ LogWithTimestamp(test.T(), "Verifying resources persist during TTL wait period...")
+ g.Consistently(func(gg Gomega) {
+ cluster, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(cluster).NotTo(BeNil())
+ workerPods, err := GetWorkerPods(test, cluster)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(workerPods).ToNot(BeEmpty())
+ headPod, err := GetHeadPod(test, cluster)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(headPod).NotTo(BeNil())
+ jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(jobObj).NotTo(BeNil())
+ }, 8*time.Second, 2*time.Second).Should(Succeed()) // Check every 2s for 8s
+ LogWithTimestamp(test.T(), "Resources confirmed stable during TTL wait period")
+
+ // Wait for TTL to expire and workers to be deleted.
+ LogWithTimestamp(test.T(), "Waiting for TTL to expire and workers to be deleted...")
+ g.Eventually(func(gg Gomega) {
+ cluster, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(cluster).NotTo(BeNil())
+ workerPods, err := GetWorkerPods(test, cluster)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(workerPods).To(BeEmpty())
+ }, TestTimeoutMedium).Should(Succeed())
+ LogWithTimestamp(test.T(), "Worker pods deleted successfully")
+
+ // Verify cluster still exists (head pod should remain).
+ g.Consistently(RayCluster(test, namespace.Name, rayClusterName), 10*time.Second).
+ Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
+
+ // Verify head pod still exists.
+ cluster, err = GetRayCluster(test, namespace.Name, rayClusterName)
+ g.Expect(err).NotTo(HaveOccurred())
+ headPod, err := GetHeadPod(test, cluster)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(headPod).NotTo(BeNil())
+ LogWithTimestamp(test.T(), "Head pod preserved as expected")
+
+ // Verify RayJob still exists.
+ jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(jobObj).NotTo(BeNil())
+ LogWithTimestamp(test.T(), "RayJob preserved as expected")
+
+ // Cleanup: delete RayJob to free resources (cluster should be GC'd eventually if owned).
+ LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteWorkers scenario", jobObj.Namespace, jobObj.Name)
+ err = test.Client().Ray().RayV1().RayJobs(jobObj.Namespace).Delete(test.Ctx(), jobObj.Name, metav1.DeleteOptions{})
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Eventually(func() error { _, err := GetRayJob(test, jobObj.Namespace, jobObj.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ // Cluster may take a moment to be garbage collected; tolerate already-deleted state.
+ g.Eventually(func() error {
+ _, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ return err
+ }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "Cleanup after DeleteWorkers scenario complete")
+ })
+
+ test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteCluster policy should delete entire cluster", func(_ *testing.T) {
+ // Create a RayJob with DeleteCluster policy, short activeDeadlineSeconds, and short TTL for faster testing.
+ rayJobAC := rayv1ac.RayJob("delete-cluster-after-jobdeploymentstatus-failed", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
+ WithEntrypoint("python /home/ray/jobs/long_running.py").
+ WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running
+ WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy
+ WithDeletionStrategy(rayv1ac.DeletionStrategy().
+ WithDeletionRules(
+ rayv1ac.DeletionRule().
+ WithPolicy(rayv1.DeleteCluster).
+ WithCondition(rayv1ac.DeletionCondition().
+ WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed).
+ WithTTLSeconds(10)), // 10 second TTL for testing
+ )).
+ WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+ LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name)
+
+ // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded.
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning))
+ g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded))
+
+ // Get the associated RayCluster name (early assertion for clearer diagnostics).
+ rayClusterName := rayJob.Status.RayClusterName
+ g.Expect(rayClusterName).NotTo(BeEmpty())
+
+ // Verify cluster exists initially.
+ g.Eventually(RayCluster(test, namespace.Name, rayClusterName), TestTimeoutShort).
+ Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
+
+ // Wait for TTL to expire and cluster to be deleted.
+ LogWithTimestamp(test.T(), "Waiting for TTL to expire and cluster to be deleted...")
+ g.Eventually(func() error {
+ _, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ return err
+ }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "RayCluster deleted successfully")
+
+ // Verify RayJob still exists.
+ jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(jobObj).NotTo(BeNil())
+ LogWithTimestamp(test.T(), "RayJob preserved as expected")
+
+ // Cleanup: delete RayJob (cluster already deleted by policy).
+ LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteCluster scenario", jobObj.Namespace, jobObj.Name)
+ err = test.Client().Ray().RayV1().RayJobs(jobObj.Namespace).Delete(test.Ctx(), jobObj.Name, metav1.DeleteOptions{})
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Eventually(func() error { _, err := GetRayJob(test, jobObj.Namespace, jobObj.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "Cleanup after DeleteCluster scenario complete")
+ })
+
+ test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteSelf policy should delete RayJob and cluster", func(_ *testing.T) {
+ // Create a RayJob with DeleteSelf policy, short activeDeadlineSeconds, and short TTL for faster testing.
+ rayJobAC := rayv1ac.RayJob("delete-self-after-jobdeploymentstatus-failed", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
+ WithEntrypoint("python /home/ray/jobs/long_running.py").
+ WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running
+ WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy
+ WithDeletionStrategy(rayv1ac.DeletionStrategy().
+ WithDeletionRules(
+ rayv1ac.DeletionRule().
+ WithPolicy(rayv1.DeleteSelf).
+ WithCondition(rayv1ac.DeletionCondition().
+ WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed).
+ WithTTLSeconds(10)), // 10 second TTL for testing
+ )).
+ WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+ LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name)
+
+ // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded.
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning))
+ g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded))
+
+ // Get the associated RayCluster name (early assertion for clearer diagnostics).
+ rayClusterName := rayJob.Status.RayClusterName
+ g.Expect(rayClusterName).NotTo(BeEmpty())
+
+ // Wait for TTL to expire and RayJob (and cluster) to be deleted.
+ LogWithTimestamp(test.T(), "Waiting for TTL to expire and RayJob to be deleted...")
+ g.Eventually(func() error {
+ _, err := GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ return err
+ }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "RayJob deleted successfully")
+
+ // Verify associated cluster is also deleted.
+ g.Eventually(func() error {
+ _, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ return err
+ }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "Associated RayCluster deleted successfully")
+ })
+
+ test.T().Run("DeletionRules with JobDeploymentStatus Failed and DeleteNone policy should preserve all resources", func(_ *testing.T) {
+ // Create a RayJob with DeleteNone policy, short activeDeadlineSeconds, and short TTL for faster testing.
+ rayJobAC := rayv1ac.RayJob("delete-none-after-jobdeploymentstatus-failed", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
+ WithEntrypoint("python /home/ray/jobs/long_running.py").
+ WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running
+ WithShutdownAfterJobFinishes(false). // Required when using DeletionStrategy
+ WithDeletionStrategy(rayv1ac.DeletionStrategy().
+ WithDeletionRules(
+ rayv1ac.DeletionRule().
+ WithPolicy(rayv1.DeleteNone).
+ WithCondition(rayv1ac.DeletionCondition().
+ WithJobDeploymentStatus(rayv1.JobDeploymentStatusFailed).
+ WithTTLSeconds(10)), // 10 second TTL for testing
+ )).
+ WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ // Wait for JobDeploymentStatus to become Failed due to activeDeadlineSeconds timeout.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+ LogWithTimestamp(test.T(), "RayJob %s/%s failed due to activeDeadlineSeconds timeout", rayJob.Namespace, rayJob.Name)
+
+ // Verify the JobStatus remains RUNNING and the reason is DeadlineExceeded.
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(rayJob.Status.JobStatus).To(Equal(rayv1.JobStatusRunning))
+ g.Expect(rayJob.Status.Reason).To(Equal(rayv1.DeadlineExceeded))
+
+ // Get the associated RayCluster name (early assertion for clearer diagnostics).
+ rayClusterName := rayJob.Status.RayClusterName
+ g.Expect(rayClusterName).NotTo(BeEmpty())
+
+ // Wait well past the TTL and verify everything is preserved.
+ LogWithTimestamp(test.T(), "Waiting past TTL to verify resources are preserved...")
+ g.Consistently(func(gg Gomega) {
+ jobObj, err := GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(jobObj).NotTo(BeNil())
+ cluster, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(cluster).NotTo(BeNil())
+ workerPods, err := GetWorkerPods(test, cluster)
+ gg.Expect(err).NotTo(HaveOccurred())
+ gg.Expect(workerPods).ToNot(BeEmpty())
+ }, 10*time.Second, 2*time.Second).Should(Succeed())
+ LogWithTimestamp(test.T(), "All resources preserved as expected with DeleteNone policy")
+
+ // Cleanup: delete RayJob to release cluster and pods.
+ LogWithTimestamp(test.T(), "Cleaning up RayJob %s/%s after DeleteNone scenario", rayJob.Namespace, rayJob.Name)
+ err = test.Client().Ray().RayV1().RayJobs(rayJob.Namespace).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Eventually(func() error { _, err := GetRayJob(test, rayJob.Namespace, rayJob.Name); return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ g.Eventually(func() error {
+ _, err := GetRayCluster(test, namespace.Name, rayClusterName)
+ return err
+ }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))
+ LogWithTimestamp(test.T(), "Cleanup after DeleteNone scenario complete")
+ })
}