-
Notifications
You must be signed in to change notification settings - Fork 664
[Feature] Support JobDeploymentStatus as the deletion condition #4262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Feature] Support JobDeploymentStatus as the deletion condition #4262
Conversation
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
…cies Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
|
The helm lint is failing. |
Will fix after getting of work, thanks for reviewing! |
|
cc @seanlaii and @win5923 for help. Note that we need to wait until @andrewsykim is back to discuss the API change. |
Signed-off-by: JiangJiaWei1103 <[email protected]>
win5923
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @JiangJiaWei1103, Can you also update the comment to mention that JobDeploymentStatus is also support?
kuberay/ray-operator/config/samples/ray-job.deletion-rules.yaml
Lines 12 to 22 in e32405e
| # DeletionStrategy defines the deletion policies for a RayJob. | |
| # It allows for fine-grained control over resource cleanup after a job finishes. | |
| # DeletionRules is a list of deletion rules, processed based on their trigger conditions. | |
| # While the rules can be used to define a sequence, if multiple rules are overdue (e.g., due to controller downtime), | |
| # the most impactful rule (e.g., DeleteCluster) will be executed first to prioritize resource cleanup and cost savings. | |
| deletionStrategy: | |
| # This sample demonstrates a staged cleanup process for a RayJob. | |
| # Regardless of whether the job succeeds or fails, the cleanup follows these steps: | |
| # 1. After 30 seconds, the worker pods are deleted. This allows for quick resource release while keeping the head pod for debugging. | |
| # 2. After 60 seconds, the entire RayCluster (including the head pod) is deleted. | |
| # 3. After 90 seconds, the RayJob custom resource itself is deleted, removing it from the Kubernetes API server. |
Signed-off-by: JiangJiaWei1103 <[email protected]>
Hi @win5923, nice suggestion. I'm considering adding one more sample demonstrating JobDeploymentStatus-based deletion rules, wdyt? |
| // Group TTLs by condition type for cross-rule validation and uniqueness checking. | ||
| // We separate JobStatus and JobDeploymentStatus to avoid confusion. | ||
| rulesByJobStatus := make(map[rayv1.JobStatus]map[rayv1.DeletionPolicyType]int32) | ||
| rulesByJobDeploymentStatus := make(map[rayv1.JobDeploymentStatus]map[rayv1.DeletionPolicyType]int32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is more clear, WDYT?
// validateDeletionRules validates the deletion rules in the RayJob spec.
// It performs per-rule validations, checks for uniqueness, and ensures logical TTL consistency.
// Errors are collected and returned as a single aggregated error using errors.Join for better user feedback.
func validateDeletionRules(rayJob *rayv1.RayJob) error {
rules := rayJob.Spec.DeletionStrategy.DeletionRules
isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
// Group TTLs by condition type for cross-rule validation and uniqueness checking.
rulesByCondition := make(map[string]map[rayv1.DeletionPolicyType]int32)
var errs []error
// Single pass: Validate each rule individually and group for later consistency checks.
for i, rule := range rules {
// Validate and extract the condition key.
conditionKey, err := getDeletionCondition(&rule.Condition)
if err != nil {
errs = append(errs, fmt.Errorf("deletionRules[%d]: %w", i, err))
continue
}
// Validate TTL is non-negative.
if rule.Condition.TTLSeconds < 0 {
errs = append(errs, fmt.Errorf("deletionRules[%d]: TTLSeconds must be non-negative", i))
continue
}
// Contextual validations based on spec.
if isClusterSelectorMode && (rule.Policy == rayv1.DeleteCluster || rule.Policy == rayv1.DeleteWorkers) {
errs = append(errs, fmt.Errorf("deletionRules[%d]: DeletionPolicyType '%s' not supported when ClusterSelector is set", i, rule.Policy))
continue
}
if IsAutoscalingEnabled(rayJob.Spec.RayClusterSpec) && rule.Policy == rayv1.DeleteWorkers {
// TODO (rueian): Support in future Ray versions by checking RayVersion.
errs = append(errs, fmt.Errorf("deletionRules[%d]: DeletionPolicyType 'DeleteWorkers' not supported with autoscaling enabled", i))
continue
}
// Group valid rule for consistency check
if _, ok := rulesByCondition[conditionKey]; !ok {
rulesByCondition[conditionKey] = make(map[rayv1.DeletionPolicyType]int32)
}
// Check for uniqueness of (condition, DeletionPolicyType) pair.
if _, exists := rulesByCondition[conditionKey][rule.Policy]; exists {
errs = append(errs, fmt.Errorf("deletionRules[%d]: duplicate rule for %s and DeletionPolicyType '%s'", i, conditionKey, rule.Policy))
continue
}
rulesByCondition[conditionKey][rule.Policy] = rule.Condition.TTLSeconds
}
// Second pass: Validate TTL consistency for each condition group.
for conditionKey, policyTTLs := range rulesByCondition {
// Extract the condition type and value from the key (e.g., "JobStatus:FAILED" -> "JobStatus", "FAILED")
parts := strings.SplitN(conditionKey, ":", 2)
if len(parts) != 2 {
// This should never happen due to getDeletionCondition contract,
errs = append(errs, fmt.Errorf("internal error: invalid condition key format: %s", conditionKey))
continue
}
conditionType := parts[0] // "JobStatus" or "JobDeploymentStatus"
conditionValue := parts[1] // "SUCCEEDED", "FAILED", etc.
if err := validateTTLConsistency(policyTTLs, conditionType, conditionValue); err != nil {
errs = append(errs, err)
}
}
return errstd.Join(errs...)
}
func getDeletionCondition(cond *rayv1.DeletionCondition) (string, error) {
hasJobStatus := cond.JobStatus != nil
hasJobDeploymentStatus := cond.JobDeploymentStatus != nil
if hasJobStatus && hasJobDeploymentStatus {
return "", fmt.Errorf("cannot set both JobStatus and JobDeploymentStatus at the same time")
}
if !hasJobStatus && !hasJobDeploymentStatus {
return "", fmt.Errorf("exactly one of JobStatus and JobDeploymentStatus must be set")
}
if hasJobStatus {
return fmt.Sprintf("JobStatus:%s", *cond.JobStatus), nil
}
return fmt.Sprintf("JobDeploymentStatus:%s", *cond.JobDeploymentStatus), nil
}
// validateTTLConsistency ensures TTLs follow the deletion hierarchy: Workers <= Cluster <= Self.
// (Lower TTL means deletes earlier.)
func validateTTLConsistency(policyTTLs map[rayv1.DeletionPolicyType]int32, conditionType string, conditionValue string) error {
// Define the required deletion order. TTLs must be non-decreasing along this sequence.
deletionOrder := []rayv1.DeletionPolicyType{
rayv1.DeleteWorkers,
rayv1.DeleteCluster,
rayv1.DeleteSelf,
}
var prevPolicy rayv1.DeletionPolicyType
var prevTTL int32
var hasPrev bool
var errs []error
for _, policy := range deletionOrder {
ttl, exists := policyTTLs[policy]
if !exists {
continue
}
if hasPrev && ttl < prevTTL {
errs = append(errs, fmt.Errorf(
"for %s '%s': %s TTL (%d) must be >= %s TTL (%d)",
conditionType, conditionValue, policy, ttl, prevPolicy, prevTTL,
))
}
prevPolicy = policy
prevTTL = ttl
hasPrev = true
}
return errstd.Join(errs...)
}| WithSpec(rayv1ac.RayJobSpec(). | ||
| WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))). | ||
| WithEntrypoint("python /home/ray/jobs/long_running.py"). | ||
| WithActiveDeadlineSeconds(45). // Short deadline for failing the JobDeploymentStatus, but making sure the cluster is running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why set 45 seconds here, Is this stable?
Signed-off-by: JiangJiaWei1103 <[email protected]>
Signed-off-by: JiangJiaWei1103 <[email protected]>
| rulesByStatus[rule.Condition.JobStatus] = policyTTLs | ||
| } | ||
| if hasJobStatus { | ||
| policyTTLs, ok := rulesByJobStatus[*rule.Condition.JobStatus] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, I think I didn't find where the values are assigned to rulesByJobStatus
Co-authored-by: Nary Yeh <[email protected]> Signed-off-by: 江家瑋 <[email protected]>
Why are these changes needed?
The current
deletionStrategyrelies exclusively on the terminal states ofJobStatus(SUCCEEDEDorFAILED). However, there are several scenarios in which a user-deployed RayJob ends up withJobStatus == ""(JobStatusNew) whileJobDeploymentStatus == "Failed". In these cases, the associated resources (e.g.,RayJob,RayCluster, etc.) remain stuck and are never cleaned up, resulting in indefinite resource consumption.Changes
JobDeploymentStatusfield toDeletionConditionFailedonlyJobStatusandJobDeploymentStatuswithinDeletionConditionImplementation Details
To determine which field the user specifies, we use pointers instead of raw values. Both
JobStatusandJobDeploymentStatushave empty strings as their zero values, which correspond to a "new" state. Usingnilallows us to reliably distinguish between "unspecified" and "explicitly set," avoiding unintended ambiguity.Related issue number
Closes #4233.
Checks