Skip to content

Conversation

@machichima
Copy link
Collaborator

@machichima machichima commented Oct 30, 2025

Why are these changes needed?

Support cron job scheduling. Following this design docs and implement milestone 1 in this PR

Main changes:

  • Add RayCronJob CRD and controller
  • Add feature gate for enabling RayCronJob
  • Add unit test

Test

Apply the sample YAML ray-operator/config/samples/ray-cronjob.sample.yaml. RayJobs are being scheduled every minute:

image

Trigger validation error

image

Related issue number

Following comment: #2426 (comment)

Closes #2426

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@machichima machichima changed the title [POC] Add Ray Cron Job [Feat] Add Ray Cron Job Nov 24, 2025
@machichima machichima marked this pull request as ready for review November 24, 2025 13:03
@CheyuWu CheyuWu self-requested a review November 24, 2025 17:01
Comment on lines 11 to 16
type RayCronJobSpec struct {
// JobTemplate defines the job spec that will be created by cron scheduling
JobTemplate *RayJobSpec `json:"jobTemplate"`
// Schedule is the cron schedule string
Schedule string `json:"schedule"`
}
Copy link
Collaborator

@win5923 win5923 Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is alright, but I think we should introduce a separate struct (e.g. RayJobTemplateSpec) to hold both the metadata and the spec for the generated RayJob, similar to how Kubernetes models JobTemplateSpec in CronJob.

This would allow users to specify metadata inside jobTemplate, which we can then propagate to the created RayJob. It also keeps the API aligned with common Kubernetes patterns.

WDYT?

Reference:
https://github.com/kubernetes/kubernetes/blob/af9fb799ef09bbdb0b2b40b4e441f2ffccaffe18/pkg/apis/batch/types.go#L94-L105

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want this. In RayJobSpec when we specify the RayClusterSpec, we only set RayClusterSpec itself rather than also letting user set the ObjectMeta.

RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty"`

cc @rueian for confirmation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for it at the moment, this can be a future work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current structure looks good to me already.

Comment on lines 44 to 45
rayVersion: '2.46.0' # should match the Ray version in the image of the containers
# Ray head pod template
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use 2.52.0 version? thank you!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 6e3c37a. Thank you!

Comment on lines 82 to 87
// This is the only 2 places where we update the RayCronJob status. This will directly
// update the ScheduleStatus to ValidationFailed if there's validation error
if err = r.updateRayCronJobStatus(ctx, originalRayCronJobInstance, rayCronJobInstance); err != nil {
logger.Info("Failed to update RayCronJob status", "error", err)
return ctrl.Result{RequeueAfter: RayCronJobDefaultRequeueDuration}, err
}
Copy link
Collaborator

@win5923 win5923 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to update status for validation failed since we not use ScheduleStatus for RayCronJob.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is the left over code, I forgot to delete it. Removed in 612fdb4

Comment on lines 26 to 52
// Create RayCronJob with invalid cron schedule
rayCronJob := &rayv1.RayCronJob{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid-cronjob",
Namespace: "default",
},
Spec: rayv1.RayCronJobSpec{
Schedule: "invalid cron string",
JobTemplate: &rayv1.RayJobSpec{
Entrypoint: "python test.py",
RayClusterSpec: &rayv1.RayClusterSpec{
HeadGroupSpec: rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Image: "rayproject/ray:2.9.0",
},
},
},
},
},
},
},
},
}
Copy link
Collaborator

@win5923 win5923 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a template function for the ut? like this:

func rayClusterTemplate(name string, namespace string) *rayv1.RayCluster {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes of course! Modified in 9a44ff3

Copy link
Collaborator

@win5923 win5923 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Really cool!
image


if rayCronJobInstance.Status.LastScheduleTime == nil {
// The new RayCronJob, not yet scheduled
rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only update the LastScheduleTime without creating the rayjob in the first reconciliation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first reconciliation is just to prepare the schedule. We don't create the job yet because we need to check if the scheduled time has actually arrived (e.g., if the job runs every minute but it's currently 15:10:30, we shouldn't create it yet)

Therefore, the first reconcile just sets the current time as LastScheduleTime and sets the requeue delay to future schedule time - now."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question, is this how kuberentes job api works?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question, is this how kuberentes job api works?

Not really, Kubernetes CronJob uses the CreationTimestamp.Time to compute the next schedule time when evaluating it for the first time:

  func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, ...) {
      earliestTime := cj.ObjectMeta.CreationTimestamp.Time
      if cj.Status.LastScheduleTime != nil {
          earliestTime = cj.Status.LastScheduleTime.Time
      }
      // ...
  }

https://github.com/kubernetes/kubernetes/blob/ec1bf8a4f3a5f054065225dc8275c66b93310d17/pkg/controller/cronjob/utils.go#L94-L123

The CronJob also use two schedule times (t1 and t2) calculate the next time to handle missed schedules, if the current time falls between t1 and t2, the controller will create a Job.

	t1 := schedule.Next(earliestTime)
	t2 := schedule.Next(t1)

	if now.Before(t1) {
		return earliestTime, nil, missedSchedules, nil
	}
	if now.Before(t2) {
		return earliestTime, &t1, missedSchedules, nil
	}

https://github.com/kubernetes/kubernetes/blob/ec1bf8a4f3a5f054065225dc8275c66b93310d17/pkg/controller/cronjob/utils.go#L115-L180

  sequenceDiagram
      participant R as Reconciler
      participant M as mostRecentScheduleTime()
      participant K as K8s API

      Note over R: CronJob created at 10:05<br/>Schedule: "0 * * * *"

      R->>M: First reconcile (10:05)
      M->>M: earliestTime = CreationTimestamp (10:05)
      M->>M: t1 = schedule.Next(10:05) = 11:00
      M->>M: now (10:05) < t1 (11:00) ?
      M-->>R: Return nil (not time yet)
      Note over R: LastScheduleTime stays nil<br/>Requeue until 11:00

      R->>M: Second reconcile (11:00)
      M->>M: earliestTime = CreationTimestamp (10:05)
      M->>M: t1 = 11:00, t2 = 12:00
      M->>M: now (11:00) >= t1 && now < t2 ?
      M-->>R: Return &t1 (11:00) ✅

      R->>K: CreateJob(scheduledTime: 11:00)
      K-->>R: Job created ✅
      R->>K: Update Status.LastScheduleTime = 11:00
      Note over R: First Job created!
Loading

And the LastScheduleTime is only updated after creating a Job.

scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
....

cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true

https://github.com/kubernetes/kubernetes/blob/ec1bf8a4f3a5f054065225dc8275c66b93310d17/pkg/controller/cronjob/cronjob_controllerv2.go#L668-L670

If i was wrong, please correct me.

Copy link
Collaborator Author

@machichima machichima Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @win5923 for giving more details!

@Future-Outlier It's a bit different as Kubernetes use a more complex way of handling the schedule. But basically the first reconcile will not create a job. Details are as below:

In the first reconciliation (when cronjob first created), the LastScheduleTime will be nil, which the
earliestTime will be set to the cron job's creation time (link)

Then, it will calculate the expected schedule time t1 based on the earliestTime, just like what @win5923
mentioned. Note that when the current time is "before" the expected schedule time now < t1, we will "not" create a job. Detailed code trace as below:

  1. if now < t1, return mostRecentTime as nil (second return parameter)
  2. This mostRecentTime will be passed up to nextScheduleTime then syncCronJob -> the reconcile logic lies in this function
    • here, its value will be passed into scheduledTime -> scheduledTime is nil
  3. If scheduledTime is nil, we will requeue without creating a job


if rayCronJobInstance.Status.LastScheduleTime == nil {
// The new RayCronJob, not yet scheduled
rayCronJobInstance.Status.LastScheduleTime = &metav1.Time{Time: now}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question, is this how kuberentes job api works?

Comment on lines +137 to +149
func (r *RayCronJobReconciler) updateRayCronJobStatus(ctx context.Context, oldRayCronJob *rayv1.RayCronJob, newRayCronJob *rayv1.RayCronJob) error {
logger := ctrl.LoggerFrom(ctx)
oldRayCronJobStatus := oldRayCronJob.Status
newRayCronJobStatus := newRayCronJob.Status
if oldRayCronJobStatus.LastScheduleTime != newRayCronJobStatus.LastScheduleTime {

logger.Info("updateRayCronJobStatus", "old RayCronJobStatus", oldRayCronJobStatus, "new RayCronJobStatus", newRayCronJobStatus)
if err := r.Status().Update(ctx, newRayCronJob); err != nil {
return err
}
}
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include the first time we initialize LastScheduleTime?

Copy link
Collaborator Author

@machichima machichima Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For the first time the oldRayCronJobStatus.LastScheduleTime will be nil, and we will update the status with the new LastScheduleTime set here

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @alimaazamat
are you interested in reviewing this PR?

Comment on lines +45 to +47
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ray.io,resources=raycronjobs/finalizers,verbs=update
//+kubebuilder:rbac:groups=ray.io,resources=rayjobs,verbs=create
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b03a22d

Comment on lines 149 to 151
Labels: map[string]string{
"ray.io/cronjob-name": cronJob.Name,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Labels: map[string]string{
"ray.io/cronjob-name": cronJob.Name,
},
Labels: map[string]string{
utils.RayCronJobLabelKey: cronJob.Name,
},

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 342e5a9

Comment on lines 164 to 169
// SetupWithManager sets up the controller with the Manager.
func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rayv1.RayCronJob{}).
Complete(r)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SetupWithManager sets up the controller with the Manager.
func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rayv1.RayCronJob{}).
Complete(r)
}
// SetupWithManager sets up the controller with the Manager.
func (r *RayCronJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rayv1.RayCronJob{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: reconcileConcurrency,
LogConstructor: func(request *reconcile.Request) logr.Logger {
logger := ctrl.Log.WithName("controllers").WithName("RayCronJob")
if request != nil {
logger = logger.WithValues("RayCronJob", request.NamespacedName)
}
return logger
},
}).
Complete(r)
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so sorry I forgot to update this. Thank you for pointing this out!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified in d29b748

Comment on lines 295 to 301
if features.Enabled(features.RayCronJob) {
setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller")
exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr),
"unable to create controller", "controller", "RayCronJob")
} else {
setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if features.Enabled(features.RayCronJob) {
setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller")
exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr),
"unable to create controller", "controller", "RayCronJob")
} else {
setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup")
}
if features.Enabled(features.RayCronJob) {
setupLog.Info("RayCronJob feature gate is enabled, starting RayCronJob controller")
exitOnError(ray.NewRayCronJobReconciler(mgr).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayCronJob")
} else {
setupLog.Info("RayCronJob feature gate is disabled, skipping RayCronJob controller setup")
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in d29b748

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall its good, I haven't had time to review validation files, go.mod, and go.sum.

cc @jiaweijiang @seanlaii for help if you have time

Copy link
Contributor

@seanlaii seanlaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contributions!
Just a reminder, for e2e tests, we need to enable feature gate here and here.

// RayCronJobSpec defines the desired state of RayCronJob
type RayCronJobSpec struct {
// JobTemplate defines the job spec that will be created by cron scheduling
JobTemplate *RayJobSpec `json:"jobTemplate"`
Copy link
Contributor

@seanlaii seanlaii Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan to make it optional, such as referring to an existing job? If it will always be required, should we use a pointer type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will always be required. Let me change it to normal object. Thank you!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in c3f3092

return ctrl.Result{RequeueAfter: nextScheduleTime.Sub(now)}, nil
}

rayJob, err := r.constructRayJob(rayCronJobInstance)
Copy link
Contributor

@seanlaii seanlaii Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also add something similar to cronjob-scheduled-timestamp to the RayJob annotation for debugging.
Don't have to be in this PR if this is not trivial. Overall LGTM.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in bb353fb

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, really nice PR, cc @rueian to do final pass and merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support cron scheduling for RayJob

6 participants