-
Notifications
You must be signed in to change notification settings - Fork 664
[Feat] Add Ray Cron Job #4159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Feat] Add Ray Cron Job #4159
Conversation
| type RayCronJobSpec struct { | ||
| // JobTemplate defines the job spec that will be created by cron scheduling | ||
| JobTemplate *RayJobSpec `json:"jobTemplate"` | ||
| // Schedule is the cron schedule string | ||
| Schedule string `json:"schedule"` | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is alright, but I think we should introduce a separate struct (e.g. RayJobTemplateSpec) to hold both the metadata and the spec for the generated RayJob, similar to how Kubernetes models JobTemplateSpec in CronJob.
This would allow users to specify metadata inside jobTemplate, which we can then propagate to the created RayJob. It also keeps the API aligned with common Kubernetes patterns.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we want this. In RayJobSpec when we specify the RayClusterSpec, we only set RayClusterSpec itself rather than also letting user set the ObjectMeta.
| RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"` |
cc @rueian for confirmation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for it at the moment, this can be a future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current structure looks good to me already.
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
| rayVersion: '2.46.0' # should match the Ray version in the image of the containers | ||
| # Ray head pod template |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use 2.52.0 version? thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 6e3c37a. Thank you!
Signed-off-by: machichima <[email protected]>
| // This is the only 2 places where we update the RayCronJob status. This will directly | ||
| // update the ScheduleStatus to ValidationFailed if there's validation error | ||
| if err = r.updateRayCronJobStatus(ctx, originalRayCronJobInstance, rayCronJobInstance); err != nil { | ||
| logger.Info("Failed to update RayCronJob status", "error", err) | ||
| return ctrl.Result{RequeueAfter: RayCronJobDefaultRequeueDuration}, err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to update status for validation failed since we not use ScheduleStatus for RayCronJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this is the left over code, I forgot to delete it. Removed in 612fdb4
| // Create RayCronJob with invalid cron schedule | ||
| rayCronJob := &rayv1.RayCronJob{ | ||
| ObjectMeta: metav1.ObjectMeta{ | ||
| Name: "invalid-cronjob", | ||
| Namespace: "default", | ||
| }, | ||
| Spec: rayv1.RayCronJobSpec{ | ||
| Schedule: "invalid cron string", | ||
| JobTemplate: &rayv1.RayJobSpec{ | ||
| Entrypoint: "python test.py", | ||
| RayClusterSpec: &rayv1.RayClusterSpec{ | ||
| HeadGroupSpec: rayv1.HeadGroupSpec{ | ||
| Template: corev1.PodTemplateSpec{ | ||
| Spec: corev1.PodSpec{ | ||
| Containers: []corev1.Container{ | ||
| { | ||
| Name: "ray-head", | ||
| Image: "rayproject/ray:2.9.0", | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a template function for the ut? like this:
| func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes of course! Modified in 9a44ff3
win5923
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| if rayCronJobInstance.Status.LastScheduleTime == nil { | ||
| // The new RayCronJob, not yet scheduled | ||
| rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we only update the LastScheduleTime without creating the rayjob in the first reconciliation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first reconciliation is just to prepare the schedule. We don't create the job yet because we need to check if the scheduled time has actually arrived (e.g., if the job runs every minute but it's currently 15:10:30, we shouldn't create it yet)
Therefore, the first reconcile just sets the current time as LastScheduleTime and sets the requeue delay to future schedule time - now."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
Not really, Kubernetes CronJob uses the CreationTimestamp.Time to compute the next schedule time when evaluating it for the first time:
func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, ...) {
earliestTime := cj.ObjectMeta.CreationTimestamp.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
}
// ...
}The CronJob also use two schedule times (t1 and t2) calculate the next time to handle missed schedules, if the current time falls between t1 and t2, the controller will create a Job.
t1 := schedule.Next(earliestTime)
t2 := schedule.Next(t1)
if now.Before(t1) {
return earliestTime, nil, missedSchedules, nil
}
if now.Before(t2) {
return earliestTime, &t1, missedSchedules, nil
} sequenceDiagram
participant R as Reconciler
participant M as mostRecentScheduleTime()
participant K as K8s API
Note over R: CronJob created at 10:05<br/>Schedule: "0 * * * *"
R->>M: First reconcile (10:05)
M->>M: earliestTime = CreationTimestamp (10:05)
M->>M: t1 = schedule.Next(10:05) = 11:00
M->>M: now (10:05) < t1 (11:00) ?
M-->>R: Return nil (not time yet)
Note over R: LastScheduleTime stays nil<br/>Requeue until 11:00
R->>M: Second reconcile (11:00)
M->>M: earliestTime = CreationTimestamp (10:05)
M->>M: t1 = 11:00, t2 = 12:00
M->>M: now (11:00) >= t1 && now < t2 ?
M-->>R: Return &t1 (11:00) ✅
R->>K: CreateJob(scheduledTime: 11:00)
K-->>R: Job created ✅
R->>K: Update Status.LastScheduleTime = 11:00
Note over R: First Job created!
And the LastScheduleTime is only updated after creating a Job.
scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
....
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = trueIf i was wrong, please correct me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @win5923 for giving more details!
@Future-Outlier It's a bit different as Kubernetes use a more complex way of handling the schedule. But basically the first reconcile will not create a job. Details are as below:
In the first reconciliation (when cronjob first created), the LastScheduleTime will be nil, which the
earliestTime will be set to the cron job's creation time (link)
Then, it will calculate the expected schedule time t1 based on the earliestTime, just like what @win5923
mentioned. Note that when the current time is "before" the expected schedule time now < t1, we will "not" create a job. Detailed code trace as below:
- if
now < t1, returnmostRecentTimeasnil(second return parameter) - This
mostRecentTimewill be passed up tonextScheduleTimethensyncCronJob-> the reconcile logic lies in this function- here, its value will be passed into
scheduledTime->scheduledTimeis nil
- here, its value will be passed into
- If
scheduledTimeis nil, we will requeue without creating a job
|
|
||
| if rayCronJobInstance.Status.LastScheduleTime == nil { | ||
| // The new RayCronJob, not yet scheduled | ||
| rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question, is this how kuberentes job api works?
| func (r *RayCronJobReconciler) updateRayCronJobStatus(ctx context.Context, oldRayCronJob *rayv1.RayCronJob, newRayCronJob *rayv1.RayCronJob) error { | ||
| logger := ctrl.LoggerFrom(ctx) | ||
| oldRayCronJobStatus := oldRayCronJob.Status | ||
| newRayCronJobStatus := newRayCronJob.Status | ||
| if oldRayCronJobStatus.LastScheduleTime != newRayCronJobStatus.LastScheduleTime { | ||
|
|
||
| logger.Info("updateRayCronJobStatus", "old RayCronJobStatus", oldRayCronJobStatus, "new RayCronJobStatus", newRayCronJobStatus) | ||
| if err := r.Status().Update(ctx, newRayCronJob); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this include the first time we initialize LastScheduleTime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. For the first time the oldRayCronJobStatus.LastScheduleTime will be nil, and we will update the status with the new LastScheduleTime set here
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @alimaazamat
are you interested in reviewing this PR?
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete | ||
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch | ||
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete | |
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch | |
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update | |
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete | |
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch | |
| //+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update | |
| //+kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=create | |
| //+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in b03a22d
| Labels: map[string]string{ | ||
| "ray.io/cronjob-name": cronJob.Name, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Labels: map[string]string{ | |
| "ray.io/cronjob-name": cronJob.Name, | |
| }, | |
| Labels: map[string]string{ | |
| utils.RayCronJobLabelKey: cronJob.Name, | |
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 342e5a9
| // SetupWithManager sets up the controller with the Manager. | ||
| func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
| return ctrl.NewControllerManagedBy(mgr). | ||
| For(&rayv1.RayCronJob{}). | ||
| Complete(r) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // SetupWithManager sets up the controller with the Manager. | |
| func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager) error { | |
| return ctrl.NewControllerManagedBy(mgr). | |
| For(&rayv1.RayCronJob{}). | |
| Complete(r) | |
| } | |
| // SetupWithManager sets up the controller with the Manager. | |
| func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error { | |
| return ctrl.NewControllerManagedBy(mgr). | |
| For(&rayv1.RayCronJob{}). | |
| WithOptions(controller.Options{ | |
| MaxConcurrentReconciles: reconcileConcurrency, | |
| LogConstructor: func(request *reconcile.Request) logr.Logger { | |
| logger := ctrl.Log.WithName("controllers").WithName("RayCronJob") | |
| if request != nil { | |
| logger = logger.WithValues("RayCronJob", request.NamespacedName) | |
| } | |
| return logger | |
| }, | |
| }). | |
| Complete(r) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh so sorry I forgot to update this. Thank you for pointing this out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified in d29b748
| if features.Enabled(features.RayCronJob) { | ||
| setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller") | ||
| exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr), | ||
| "unable to create controller", "controller", "RayCronJob") | ||
| } else { | ||
| setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if features.Enabled(features.RayCronJob) { | |
| setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller") | |
| exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr), | |
| "unable to create controller", "controller", "RayCronJob") | |
| } else { | |
| setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup") | |
| } | |
| if features.Enabled(features.RayCronJob) { | |
| setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller") | |
| exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr, config.ReconcileConcurrency), | |
| "unable to create controller", "controller", "RayCronJob") | |
| } else { | |
| setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup") | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in d29b748
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall its good, I haven't had time to review validation files, go.mod, and go.sum.
cc @jiaweijiang @seanlaii for help if you have time
seanlaii
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // RayCronJobSpec defines the desired state of RayCronJob | ||
| type RayCronJobSpec struct { | ||
| // JobTemplate defines the job spec that will be created by cron scheduling | ||
| JobTemplate *RayJobSpec `json:"jobTemplate"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a plan to make it optional, such as referring to an existing job? If it will always be required, should we use a pointer type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will always be required. Let me change it to normal object. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in c3f3092
| return ctrl.Result{RequeueAfter: nextScheduleTime.Sub(now)}, nil | ||
| } | ||
|
|
||
| rayJob, err := r.constructRayJob(rayCronJobInstance) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could also add something similar to cronjob-scheduled-timestamp to the RayJob annotation for debugging.
Don't have to be in this PR if this is not trivial. Overall LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in bb353fb
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, really nice PR, cc @rueian to do final pass and merge this.

Why are these changes needed?
Support cron job scheduling. Following this design docs and implement milestone 1 in this PR
Main changes:
RayCronJobCRD and controllerRayCronJobTest
Apply the sample YAML
ray-operator/config/samples/ray-cronjob.sample.yaml. RayJobs are being scheduled every minute:Trigger validation error
Related issue number
Following comment: #2426 (comment)
Closes #2426
Checks