Skip to content

Conversation

@JiangJiaWei1103
Copy link
Contributor

Why are these changes needed?

The current deletionStrategy relies exclusively on the terminal states of JobStatus (SUCCEEDED or FAILED). However, there are several scenarios in which a user-deployed RayJob ends up with JobStatus == "" (JobStatusNew) while JobDeploymentStatus == "Failed". In these cases, the associated resources (e.g., RayJob, RayCluster, etc.) remain stuck and are never cleaned up, resulting in indefinite resource consumption.

Changes

  • Add the JobDeploymentStatus field to DeletionCondition
    • Currently supports Failed only
  • Enforce mutual exclusivity between JobStatus and JobDeploymentStatus within DeletionCondition

Implementation Details

To determine which field the user specifies, we use pointers instead of raw values. Both JobStatus and JobDeploymentStatus have empty strings as their zero values, which correspond to a "new" state. Using nil allows us to reliably distinguish between "unspecified" and "explicitly set," avoiding unintended ambiguity.

Related issue number

Closes #4233.

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@rueian
Copy link
Collaborator

rueian commented Dec 8, 2025

The helm lint is failing.

@pickymodel
Copy link

The helm lint is failing.

Will fix after getting of work, thanks for reviewing!

@Future-Outlier
Copy link
Member

cc @seanlaii and @win5923 for help. Note that we need to wait until @andrewsykim is back to discuss the API change.

Copy link
Collaborator

@win5923 win5923 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JiangJiaWei1103, Can you also update the comment to mention that JobDeploymentStatus is also support?

# DeletionStrategy defines the deletion policies for a RayJob.
# It allows for fine-grained control over resource cleanup after a job finishes.
# DeletionRules is a list of deletion rules, processed based on their trigger conditions.
# While the rules can be used to define a sequence, if multiple rules are overdue (e.g., due to controller downtime),
# the most impactful rule (e.g., DeleteCluster) will be executed first to prioritize resource cleanup and cost savings.
deletionStrategy:
# This sample demonstrates a staged cleanup process for a RayJob.
# Regardless of whether the job succeeds or fails, the cleanup follows these steps:
# 1. After 30 seconds, the worker pods are deleted. This allows for quick resource release while keeping the head pod for debugging.
# 2. After 60 seconds, the entire RayCluster (including the head pod) is deleted.
# 3. After 90 seconds, the RayJob custom resource itself is deleted, removing it from the Kubernetes API server.

@JiangJiaWei1103
Copy link
Contributor Author

Hi @JiangJiaWei1103, Can you also update the comment to mention that JobDeploymentStatus is also support?

# DeletionStrategy defines the deletion policies for a RayJob.
# It allows for fine-grained control over resource cleanup after a job finishes.
# DeletionRules is a list of deletion rules, processed based on their trigger conditions.
# While the rules can be used to define a sequence, if multiple rules are overdue (e.g., due to controller downtime),
# the most impactful rule (e.g., DeleteCluster) will be executed first to prioritize resource cleanup and cost savings.
deletionStrategy:
# This sample demonstrates a staged cleanup process for a RayJob.
# Regardless of whether the job succeeds or fails, the cleanup follows these steps:
# 1. After 30 seconds, the worker pods are deleted. This allows for quick resource release while keeping the head pod for debugging.
# 2. After 60 seconds, the entire RayCluster (including the head pod) is deleted.
# 3. After 90 seconds, the RayJob custom resource itself is deleted, removing it from the Kubernetes API server.

Hi @win5923, nice suggestion. I'm considering adding one more sample demonstrating JobDeploymentStatus-based deletion rules, wdyt?

Comment on lines +473 to +476
// Group TTLs by condition type for cross-rule validation and uniqueness checking.
// We separate JobStatus and JobDeploymentStatus to avoid confusion.
rulesByJobStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32)
rulesByJobDeploymentStatus := make(map[rayv1.JobDeploymentStatus]map[rayv1.DeletionPolicyType]int32)
Copy link
Collaborator

@win5923 win5923 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more clear, WDYT?

// validateDeletionRules validates the deletion rules in the RayJob spec.
// It performs per-rule validations, checks for uniqueness, and ensures logical TTL consistency.
// Errors are collected and returned as a single aggregated error using errors.Join for better user feedback.
func validateDeletionRules(rayJob *rayv1.RayJob) error {
	rules := rayJob.Spec.DeletionStrategy.DeletionRules
	isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0

	// Group TTLs by condition type for cross-rule validation and uniqueness checking.
	rulesByCondition := make(map[string]map[rayv1.DeletionPolicyType]int32)
	var errs []error

	// Single pass: Validate each rule individually and group for later consistency checks.
	for i, rule := range rules {
		// Validate and extract the condition key.
		conditionKey, err := getDeletionCondition(&rule.Condition)
		if err != nil {
			errs = append(errs, fmt.Errorf("deletionRules[%d]: %w", i, err))
			continue
		}

		// Validate TTL is non-negative.
		if rule.Condition.TTLSeconds < 0 {
			errs = append(errs, fmt.Errorf("deletionRules[%d]: TTLSeconds must be non-negative", i))
			continue
		}

		// Contextual validations based on spec.
		if isClusterSelectorMode && (rule.Policy == rayv1.DeleteCluster || rule.Policy == rayv1.DeleteWorkers) {
			errs = append(errs, fmt.Errorf("deletionRules[%d]: DeletionPolicyType '%s' not supported when ClusterSelector is set", i, rule.Policy))
			continue
		}
		if IsAutoscalingEnabled(rayJob.Spec.RayClusterSpec) && rule.Policy == rayv1.DeleteWorkers {
			// TODO (rueian): Support in future Ray versions by checking RayVersion.
			errs = append(errs, fmt.Errorf("deletionRules[%d]: DeletionPolicyType 'DeleteWorkers' not supported with autoscaling enabled", i))
			continue
		}

		// Group valid rule for consistency check
		if _, ok := rulesByCondition[conditionKey]; !ok {
			rulesByCondition[conditionKey] = make(map[rayv1.DeletionPolicyType]int32)
		}

		// Check for uniqueness of (condition, DeletionPolicyType) pair.
		if _, exists := rulesByCondition[conditionKey][rule.Policy]; exists {
			errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for %s and DeletionPolicyType '%s'", i, conditionKey, rule.Policy))
			continue
		}

		rulesByCondition[conditionKey][rule.Policy] = rule.Condition.TTLSeconds
	}

	// Second pass: Validate TTL consistency for each condition group.
	for conditionKey, policyTTLs := range rulesByCondition {
		// Extract the condition type and value from the key (e.g., "JobStatus:FAILED" -> "JobStatus", "FAILED")
        parts := strings.SplitN(conditionKey, ":", 2)
        if len(parts) != 2 {
                // This should never happen due to getDeletionCondition contract,
                errs = append(errs, fmt.Errorf("internal error: invalid condition key format: %s", conditionKey))
                continue
        }

        conditionType := parts[0]   // "JobStatus" or "JobDeploymentStatus"
        conditionValue := parts[1]  // "SUCCEEDED", "FAILED", etc.

        if err := validateTTLConsistency(policyTTLs, conditionType, conditionValue); err != nil {
                errs = append(errs, err)
        }
	}

	return errstd.Join(errs...)
}

func getDeletionCondition(cond *rayv1.DeletionCondition) (string, error) {
	hasJobStatus := cond.JobStatus != nil
	hasJobDeploymentStatus := cond.JobDeploymentStatus != nil

	if hasJobStatus && hasJobDeploymentStatus {
		return "", fmt.Errorf("cannot set both JobStatus and JobDeploymentStatus at the same time")
	}
	if !hasJobStatus && !hasJobDeploymentStatus {
		return "", fmt.Errorf("exactly one of JobStatus and JobDeploymentStatus must be set")
	}

	if hasJobStatus {
		return fmt.Sprintf("JobStatus:%s", *cond.JobStatus), nil
	}
	return fmt.Sprintf("JobDeploymentStatus:%s", *cond.JobDeploymentStatus), nil
}

// validateTTLConsistency ensures TTLs follow the deletion hierarchy: Workers <= Cluster <= Self.
// (Lower TTL means deletes earlier.)
func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, conditionType string, conditionValue string) error {
	// Define the required deletion order. TTLs must be non-decreasing along this sequence.
	deletionOrder := []rayv1.DeletionPolicyType{
		rayv1.DeleteWorkers,
		rayv1.DeleteCluster,
		rayv1.DeleteSelf,
	}

	var prevPolicy rayv1.DeletionPolicyType
	var prevTTL int32
	var hasPrev bool

	var errs []error

	for _, policy := range deletionOrder {
		ttl, exists := policyTTLs[policy]
		if !exists {
			continue
		}

		if hasPrev && ttl < prevTTL {
			errs = append(errs, fmt.Errorf(
				"for %s '%s': %s TTL (%d) must be >= %s TTL (%d)",
				conditionType, conditionValue, policy, ttl, prevPolicy, prevTTL,
			))
		}

		prevPolicy = policy
		prevTTL = ttl
		hasPrev = true
	}

	return errstd.Join(errs...)
}

WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running
Copy link
Collaborator

@win5923 win5923 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set 45 seconds here, Is this stable?

rulesByStatus[rule.Condition.JobStatus] = policyTTLs
}
if hasJobStatus {
policyTTLs, ok := rulesByJobStatus[*rule.Condition.JobStatus]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, I think I didn't find where the values are assigned to rulesByJobStatus

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Automatic Cleanup for RayJobs that exceed activeDeadlineSeconds

6 participants