Skip to content

[Fix][Zeta] Guard state cleanup races after node failure#10687

Open
zhangshenghang wants to merge 18 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence
Open

[Fix][Zeta] Guard state cleanup races after node failure#10687
zhangshenghang wants to merge 18 commits intoapache:devfrom
zhangshenghang:fix/zeta-state-cleanup-convergence

Conversation

@zhangshenghang
Copy link
Copy Markdown
Member

@zhangshenghang zhangshenghang commented Apr 1, 2026

Purpose of this pull request

This PR fixes an engine-side terminal-state convergence bug after worker node failure.

When a worker goes offline, the engine can start cleaning distributed state from the running job state maps before all asynchronous task/pipeline/job callbacks have finished. In the current code path, PhysicalVertex, SubPlan, and PhysicalPlan can observe missing state entries and throw NullPointerException, which interrupts terminal-state convergence and may leave the job hanging in an intermediate state.

This PR changes the cleanup strategy instead of relying on local fallback state:

  • keep terminal job/pipeline/task state in distributed maps for a short cleanup delay window
  • remove runningJobInfoIMap immediately so terminal jobs are not restored on master switch
  • delay physical removal of distributed state maps until late callbacks have time to drain
  • treat already-cleaned state as a no-op defensive path instead of rebuilding distributed state

It also adds:

  • targeted regression tests for terminal tombstone behavior
  • a delay-based cleanup regression test
  • an engine E2E scenario for the BATCH + no checkpoint + job.retry.times=0 no-restore path

Does this PR introduce any user-facing change?

No user-facing API/config change in normal operation. This improves failure handling so jobs are less likely to hang in an intermediate state after node failure.

How was this patch tested?

Verified locally:

  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-common spotless:check
  • ./mvnw -nsu -pl seatunnel-engine/seatunnel-engine-server spotless:check
  • ./mvnw -nsu -pl seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base spotless:check

Additional notes:

  • Added targeted regression tests: StateTransitionCleanupTest, JobStateCleanupDelayTest
  • Added engine E2E coverage: ClusterFailureNoRestoreIT
  • Full Maven test/compile validation in this checkout is currently blocked by unrelated upstream build issues in other modules (for example seatunnel-engine-server references missing types in the current checkout, and reactor builds are also blocked by seatunnel-config-shade compilation issues), so this PR remains draft.

@zhangshenghang zhangshenghang marked this pull request as ready for review April 1, 2026 09:52
Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the tombstone approach for late callbacks, but I think the delayed cleanup currently loses its cleanup owner across a second master failover.

scheduleRemoveJobStateMaps() removes runningJobInfoIMap immediately and then schedules removeJobStateMaps() only in the local monitorService. If that master dies during the delay window, the scheduled task disappears with it. When the next master restores, there is no runningJobInfoIMap entry left to rediscover this job, and restoreJobFromMasterActiveSwitch() just returns for terminal states.

That leaves the terminal entries in runningJobStateIMap / runningJobStateTimestampsIMap orphaned permanently. I think the delayed-cleanup intent needs to be persisted in distributed state (or another recoverable cleanup record), otherwise this closes the race only as long as the same master survives until the timer fires.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest HEAD locally, and I still see the same failover hole during the delayed-cleanup window.

cleanJob() still calls scheduleRemoveJobStateMaps() (JobMaster.java:778-782), and that method still removes runningJobInfoIMap immediately (JobMaster.java:644-647) before scheduling the delayed cleanup on the local monitorService (JobMaster.java:658-672). But master-switch restore only scans runningJobInfoIMap.entrySet() (CoordinatorService.java:636-642, 665-681).

So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned. The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap, because it only runs for jobs that still have a runningJobInfoIMap entry.

The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null immediately after terminal completion, which effectively codifies the same gap instead of covering the second-master-failover case.

I think the delayed-cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to remain until delayed cleanup actually executes, before this can merge.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I pulled the latest HEAD locally and re-reviewed the delayed-cleanup path.

I still see the same blocking failover hole during the cleanup-delay window. cleanJob() still calls scheduleRemoveJobStateMaps(), and that method still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs only by scanning runningJobInfoIMap. So if the active master dies before the delayed task fires, the next master has no distributed record left to rediscover this terminal job, and the remaining state maps are still orphaned.

The new end-state guard in restoreJobFromMasterActiveSwitch() does not close that gap because it only runs for jobs that still have a runningJobInfoIMap entry. The new JobStateCleanupDelayTest currently asserts that runningJobInfoIMap is already null during the delay window, which codifies the same hole instead of covering the second-master-failover case.

I think the delayed cleanup intent still needs to be persisted in recoverable distributed state, or runningJobInfoIMap needs to stay until the delayed cleanup actually executes. After that, this will be much closer.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and I still see the same blocking failover hole during the cleanup-delay window.

JobMaster.scheduleRemoveJobStateMaps() still removes runningJobInfoIMap immediately before scheduling the delayed cleanup only on the local monitorService. But master-switch restore still discovers jobs by scanning runningJobInfoIMap in CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch(). So if the active master dies before the delayed task fires, the next master still has no distributed record left to rediscover this terminal job, and the remaining state maps can still be orphaned.

The new stateCleanupDelayMillis=0 test config and the late-checkpoint guard do not close that gap, because they do not persist the delayed-cleanup intent across a second master failover.

I still think this needs one of these two directions before merge:

  • keep runningJobInfoIMap until the delayed cleanup actually executes, or
  • persist the delayed-cleanup intent in recoverable distributed state.

After that, I am happy to re-review.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the latest HEAD locally again and re-checked the terminal-cleanup / master-switch path.

The previous failover hole looks closed now: JobMaster.scheduleRemoveJobStateMaps() persists a JobCleanupRecord in IMAP_PENDING_JOB_CLEANUP, CoordinatorService.restoreJobFromMasterActiveSwitch() reschedules terminal cleanup instead of dropping the job blindly, and the REST / overview paths filter delayed-cleanup tombstones so finished jobs are not shown as running. With the new unit / E2E coverage around delayed cleanup and no-restore cluster failure, I do not see the previous blocker in the current revision.

Copy link
Copy Markdown

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the latest update. I re-reviewed the current head locally as seatunnel-review-10687 at commit 17750abb9, comparing it with upstream/dev.

What This PR Fixes

  • User pain: after terminal job/pipeline state cleanup, late asynchronous callbacks or active-master switch recovery can observe missing runtime state and either report misleading errors or leave stale job metadata behind.
  • Fix approach: the PR delays terminal job-state cleanup, records pending cleanup metadata in Hazelcast, reschedules cleanup after master failover, and prevents a new non-savepoint submission from reusing a job id while the old terminal state is still waiting for cleanup.
  • One-line value: terminal state becomes a short-lived tombstone instead of disappearing immediately, which makes late callbacks and master failover safer.

Core Logic Review

Key changed files and methods:

  • CoordinatorService.java: schedulePendingJobCleanup(...), processPendingJobCleanup(...), restoreAllRunningJobFromMasterNodeSwitch(...), and the submitJob(...) pending-cleanup guard.
  • JobMaster.java: createJobCleanupRecord() and scheduleRemoveJobStateMaps().
  • JobCleanupRecord.java: distributed cleanup metadata for job-level state keys.
  • JobInfoService.java: hides terminal jobs that are retained only as cleanup tombstones.

Important before/after point:

// Before: terminal state could be removed immediately, so late callbacks saw null state.
removeJobStateMaps();
// After: terminal state is retained and removed later by a cleanup record.
pendingJobCleanupIMap.put(jobId, cleanupRecord);
coordinatorService.schedulePendingJobCleanup(jobId, cleanupRecord);

The normal Zeta lifecycle does hit this change:

Job reaches terminal state
  -> PhysicalPlan.addPipelineEndCallback()
      -> JobMaster.initStateFuture() completion handler
          -> JobMaster.cleanJob()
              -> createJobCleanupRecord()
              -> pendingJobCleanupIMap.put(jobId, record)
              -> CoordinatorService.schedulePendingJobCleanup(jobId, record)

Delayed cleanup
  -> CoordinatorService.processPendingJobCleanup(jobId, record)
      -> verifies initializationTimestamp still matches current JobInfo
      -> verifies current job state is terminal
      -> removes runningJobInfoIMap
      -> removes recorded state/timestamp keys

Active-master switch before cleanup fires
  -> CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch()
      -> terminal-state zombie jobs are pre-filtered
      -> restoreJobFromMasterActiveSwitch()
          -> reschedules pending cleanup if a cleanup record exists
          -> otherwise removes stale runningJobInfoIMap

Local static verification:

  • git diff --stat upstream/dev...seatunnel-review-10687: 25 files changed, +1272/-45.
  • git diff --name-status upstream/dev...seatunnel-review-10687: Zeta coordinator/job-master cleanup code, serialization hook, REST visibility, tests, and one E2E are touched.
  • gh pr checks 10687: Build is CANCELLED; label and notification checks are successful.
  • Local build/tests: not run. This review is based on the local branch, full diff, and the job lifecycle / failover call-chain inspection.

Compatibility, Side Effects, Errors, and Logs

Compatibility impact: mostly compatible, with one intentional operational behavior change. Finished jobs are retained in runtime maps for state-cleanup-delay-ms before cleanup, while JobInfoService.shouldShowAsRunningJob() prevents these tombstones from showing as running jobs. No public API, protocol, or serialization format used by clients is removed, but a new internal Hazelcast data type is added via ResourceDataSerializerHook.

Performance and side effects: the default 60s retention adds bounded temporary IMap entries for terminal jobs/pipelines/tasks and checkpoint state keys. Cleanup scheduling uses the existing monitor service and should not add hot-path CPU/network cost. The owner timestamp guard is important and is present, so delayed cleanup should not remove a newly submitted job with the same id.

Error handling and logs: cleanup failures are logged and retried through retained cleanup records. Late state transitions now log and skip when the state entry is already missing or terminal, instead of trying to force an invalid transition.

Findings

I did not find a new source-level blocker in the latest code. The remaining blocker is CI status.

Merge Conclusion

Conclusion: can merge after CI is rerun successfully

  1. Blocking items:
    • CI: Build is currently CANCELLED on the latest head (17750abb9). This must be rerun and pass before merge.
  2. Suggested non-blocking items:
    • None from this latest source review.

Overall assessment: the design is a reasonable long-term fix for the terminal-state cleanup race. It keeps state retention bounded, records enough ownership data to avoid deleting a newer job, and includes targeted unit coverage for cleanup ownership, submit blocking, delayed cleanup, and late state transitions. Once Build is green, I think this PR can move forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants