Skip to content

Commit ca0fcc1

Browse files
Feature:4135 Orchestrator generic graceful stopping (#4247)
* Feature:4135 Orchestrator generic graceful stopping - Early stopping on unhealthy heartbeat - Token invalidation - Graceful step termination on pipeline stop event
1 parent c7b1dd7 commit ca0fcc1

File tree

20 files changed

+505
-81
lines changed

20 files changed

+505
-81
lines changed

docs/book/how-to/steps-pipelines/advanced_features.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,40 @@ If steps 2, 3, and 4 execute in parallel and step 2 fails:
147147
All three execution modes are currently only supported by the `local`, `local_docker`, and `kubernetes` orchestrator flavors. For any other orchestrator flavor, the default (and only available) behavior is `CONTINUE_ON_FAILURE`. If you would like to see any of the other orchestrators extended to support the other execution modes, reach out to us in [Slack](https://zenml.io/slack-invite).
148148
{% endhint %}
149149

150+
### Step Heartbeat
151+
152+
Step heartbeat is a background mechanism that runs alongside step executions and performs two core functions:
153+
154+
- Periodically pings the ZenML server to refresh the step's heartbeat value.
155+
- Retrieves the current pipeline and step status, and terminates the step if the pipeline has entered a stopping state.
156+
157+
This enables ZenML to:
158+
159+
- Track the liveness of a step execution and assess its health based on incoming heartbeats.
160+
- Gracefully interrupt running steps when a pipeline is being stopped.
161+
162+
*Scope and current behavior*
163+
164+
- Heartbeats are enabled only for steps executed in isolated environments. This excludes:
165+
- `Inline` steps in `dynamic` pipelines.
166+
- Steps run via the `local` orchestrator.
167+
- A step that becomes unhealthy automatically triggers a graceful shutdown (currently supported for the `kubernetes` orchestrator).
168+
- When using `CONTINUE_ON_FAILURE` execution mode, heartbeat status is also used to decide whether execution tokens should be invalidated.
169+
170+
*Configuration*
171+
172+
You can configure how long a step may go without sending a heartbeat before it is considered unhealthy using the `heartbeat_healthy_threshold` step parameter.
173+
The default value currently applied is the system's maximum allowed value (30 minutes).
174+
175+
```python
176+
from zenml import step
177+
178+
@step(heartbeat_healthy_threshold=30)
179+
def my_step():
180+
...
181+
182+
```
183+
150184
## Data & Output Management
151185

152186
## Type annotations

src/zenml/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4959,7 +4959,7 @@ def list_run_steps(
49594959
cache_expired: Whether the cache expiration time of the step run
49604960
has passed.
49614961
code_hash: The code hash of the step run to filter by.
4962-
status: The name of the run to filter by.
4962+
status: The status of the step run.
49634963
run_metadata: Filter by run metadata.
49644964
exclude_retried: Whether to exclude retried step runs.
49654965
hydrate: Flag deciding whether to hydrate the output model(s)

src/zenml/config/step_configurations.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@ class StepConfigurationUpdate(FrozenBaseModel):
221221
"run inline unless a step operator or docker/resource settings "
222222
"are configured. This is only applicable for dynamic pipelines.",
223223
)
224+
heartbeat_healthy_threshold: int = Field(
225+
default=30,
226+
description="The amount of time (in minutes) that a running step "
227+
"has not received heartbeat and is considered healthy. By default, "
228+
"set to the maximum value (30 minutes).",
229+
ge=1,
230+
le=30,
231+
)
224232

225233
outputs: Mapping[str, PartialArtifactConfiguration] = {}
226234

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,24 @@ def stop_step(node: Node) -> None:
612612
)
613613
break
614614

615+
def is_node_heartbeat_unhealthy(node: Node) -> bool:
616+
from zenml.steps.heartbeat import is_heartbeat_unhealthy
617+
618+
sr_ = client.list_run_steps(
619+
name=node.id, pipeline_run_id=pipeline_run.id
620+
)
621+
622+
if sr_.items:
623+
return is_heartbeat_unhealthy(
624+
step_run_id=sr_.items[0].id,
625+
status=sr_.items[0].status,
626+
start_time=sr_.items[0].start_time,
627+
heartbeat_threshold=sr_.items[0].heartbeat_threshold,
628+
latest_heartbeat=sr_.items[0].latest_heartbeat,
629+
)
630+
631+
return False
632+
615633
def check_job_status(node: Node) -> NodeStatus:
616634
"""Check the status of a job.
617635
@@ -652,6 +670,13 @@ def check_job_status(node: Node) -> NodeStatus:
652670
error_message,
653671
)
654672
return NodeStatus.FAILED
673+
elif is_node_heartbeat_unhealthy(node):
674+
logger.error(
675+
"Heartbeat for step `%s` indicates unhealthy status.",
676+
step_name,
677+
)
678+
stop_step(node=node)
679+
return NodeStatus.FAILED
655680
else:
656681
return NodeStatus.RUNNING
657682

src/zenml/models/v2/core/step_run.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,17 @@ def latest_heartbeat(self) -> Optional[datetime]:
617617
"""
618618
return self.get_body().latest_heartbeat
619619

620+
@property
621+
def heartbeat_threshold(self) -> Optional[int]:
622+
"""The `heartbeat_threshold` property.
623+
624+
Returns:
625+
the value of the property.
626+
"""
627+
if self.get_metadata().spec.enable_heartbeat:
628+
return self.get_metadata().config.heartbeat_healthy_threshold
629+
return None
630+
620631
@property
621632
def snapshot_id(self) -> UUID:
622633
"""The `snapshot_id` property.
@@ -843,3 +854,4 @@ class StepHeartbeatResponse(BaseModel, use_enum_values=True):
843854
id: UUID
844855
status: ExecutionStatus
845856
latest_heartbeat: datetime
857+
pipeline_run_status: ExecutionStatus | None = None

src/zenml/orchestrators/base_orchestrator.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -682,27 +682,16 @@ def stop_run(
682682
If False, forces immediate termination. Default is False.
683683
684684
Raises:
685-
NotImplementedError: If any orchestrator inheriting from the base
686-
class does not implement this logic.
687685
IllegalOperationError: If the run has no orchestrator run id yet.
688686
"""
689-
# Check if the orchestrator supports cancellation
690-
if (
691-
getattr(self._stop_run, "__func__", None)
692-
is BaseOrchestrator._stop_run
693-
):
694-
raise NotImplementedError(
695-
f"The '{self.__class__.__name__}' orchestrator does not "
696-
"support stopping pipeline runs."
697-
)
698-
699687
if not run.orchestrator_run_id:
700688
raise IllegalOperationError(
701689
"Cannot stop a pipeline run that has no orchestrator run id "
702690
"yet."
703691
)
704692

705693
# Update pipeline status to STOPPING before calling concrete implementation
694+
# Initiates graceful termination.
706695
publish_pipeline_run_status_update(
707696
pipeline_run_id=run.id,
708697
status=ExecutionStatus.STOPPING,
@@ -724,13 +713,24 @@ def _stop_run(
724713
run: A pipeline run response to stop (already updated to STOPPING status).
725714
graceful: If True, allows for graceful shutdown where possible.
726715
If False, forces immediate termination. Default is True.
727-
728-
Raises:
729-
NotImplementedError: If any orchestrator inheriting from the base
730-
class does not implement this logic.
731716
"""
717+
if graceful:
718+
# This should work out of the box for HeartBeat step termination.
719+
# Orchestrators should extend the functionality to cover other scenarios.
720+
self._stop_run_gracefully(pipeline_run=run)
721+
else:
722+
self._stop_run_forcefully(pipeline_run=run)
723+
724+
def _stop_run_gracefully(
725+
self, pipeline_run: "PipelineRunResponse"
726+
) -> None:
727+
pass
728+
729+
def _stop_run_forcefully(
730+
self, pipeline_run: "PipelineRunResponse"
731+
) -> None:
732732
raise NotImplementedError(
733-
"The stop run functionality is not implemented for the "
733+
"The forceful stop run functionality is not implemented for the "
734734
f"'{self.__class__.__name__}' orchestrator."
735735
)
736736

src/zenml/orchestrators/publish_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,23 @@ def publish_failed_step_run(step_run_id: "UUID") -> "StepRunResponse":
118118
)
119119

120120

121+
def publish_stopped_step_run(step_run_id: "UUID") -> "StepRunResponse":
122+
"""Publishes a stopped step run.
123+
124+
Args:
125+
step_run_id: The ID of the step run to update.
126+
127+
Returns:
128+
The updated step run.
129+
"""
130+
return publish_step_run_status_update(
131+
step_run_id=step_run_id,
132+
status=ExecutionStatus.STOPPED,
133+
end_time=utc_now(),
134+
exception_info=step_exception_info.get(),
135+
)
136+
137+
121138
def publish_successful_pipeline_run(
122139
pipeline_run_id: "UUID",
123140
) -> "PipelineRunResponse":

src/zenml/orchestrators/step_launcher.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from zenml.orchestrators import utils as orchestrator_utils
4040
from zenml.orchestrators.step_runner import StepRunner
4141
from zenml.stack import Stack
42+
from zenml.steps import StepHeartBeatTerminationException
4243
from zenml.utils import env_utils, exception_utils, string_utils
4344
from zenml.utils.logging_utils import (
4445
is_step_logging_enabled,
@@ -325,19 +326,32 @@ def launch(self) -> StepRunResponse:
325326
except RunStoppedException as e:
326327
raise e
327328
except BaseException as e: # noqa: E722
328-
logger.error(
329-
"Failed to run step `%s`: %s",
330-
self._invocation_id,
331-
e,
329+
step_run = Client().get_run_step(
330+
step_run_id=step_run.id
332331
)
333-
publish_utils.publish_failed_step_run(step_run.id)
332+
333+
if (
334+
isinstance(e, StepHeartBeatTerminationException)
335+
or step_run.status == ExecutionStatus.STOPPING
336+
):
337+
# Handle as a non-failure as exception is a propagation of graceful termination.
338+
publish_utils.publish_stopped_step_run(step_run.id)
339+
340+
else:
341+
logger.error(
342+
"Failed to run step `%s`: %s",
343+
self._invocation_id,
344+
e,
345+
)
346+
publish_utils.publish_failed_step_run(step_run.id)
334347
raise
335348

336349
duration = time.time() - start_time
337350
logger.info(
338351
f"Step `{self._invocation_id}` has finished in "
339352
f"`{string_utils.get_human_readable_time(duration)}`."
340353
)
354+
341355
else:
342356
logger.info(
343357
f"Using cached version of step `{self._invocation_id}`."

src/zenml/orchestrators/step_runner.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from zenml.constants import (
3636
ENV_ZENML_STEP_OPERATOR,
3737
)
38-
from zenml.enums import ArtifactSaveType
38+
from zenml.enums import ArtifactSaveType, ExecutionStatus
3939
from zenml.exceptions import StepInterfaceError
4040
from zenml.hooks.hook_validators import load_and_run_hook
4141
from zenml.logger import get_logger
@@ -236,46 +236,55 @@ def run(
236236
)
237237
except BaseException as step_exception: # noqa: E722
238238
step_failed = True
239-
240-
exception_info = (
241-
exception_utils.collect_exception_information(
242-
step_exception, step_instance
243-
)
244-
)
245-
246-
if ENV_ZENML_STEP_OPERATOR in os.environ:
247-
# We're running in a step operator environment, so we can't
248-
# depend on the step launcher to publish the exception info
239+
if (
240+
isinstance(step_exception, KeyboardInterrupt)
241+
and heartbeat_worker.is_terminated
242+
):
249243
Client().zen_store.update_run_step(
250244
step_run_id=step_run_info.step_run_id,
251245
step_run_update=StepRunUpdate(
252-
exception_info=exception_info,
246+
status=ExecutionStatus.STOPPING,
253247
),
254248
)
255-
else:
256-
# This will be published by the step launcher
257-
step_exception_info.set(exception_info)
258249

259-
if not step_run.is_retriable:
260-
if (
261-
failure_hook_source
262-
:= self.configuration.failure_hook_source
263-
):
264-
logger.info("Detected failure hook. Running...")
265-
with env_utils.temporary_environment(
266-
step_environment
267-
):
268-
load_and_run_hook(
269-
failure_hook_source,
270-
step_exception=step_exception,
271-
)
272-
if (
273-
isinstance(step_exception, KeyboardInterrupt)
274-
and heartbeat_worker.is_terminated
275-
):
276250
raise StepHeartBeatTerminationException(
277251
"Remotely stopped step - terminating execution."
278252
)
253+
else:
254+
exception_info = (
255+
exception_utils.collect_exception_information(
256+
step_exception, step_instance
257+
)
258+
)
259+
260+
if ENV_ZENML_STEP_OPERATOR in os.environ:
261+
# We're running in a step operator environment, so we can't
262+
# depend on the step launcher to publish the exception info
263+
Client().zen_store.update_run_step(
264+
step_run_id=step_run_info.step_run_id,
265+
step_run_update=StepRunUpdate(
266+
exception_info=exception_info,
267+
),
268+
)
269+
else:
270+
# This will be published by the step launcher
271+
step_exception_info.set(exception_info)
272+
273+
if not step_run.is_retriable:
274+
if (
275+
failure_hook_source
276+
:= self.configuration.failure_hook_source
277+
):
278+
logger.info(
279+
"Detected failure hook. Running..."
280+
)
281+
with env_utils.temporary_environment(
282+
step_environment
283+
):
284+
load_and_run_hook(
285+
failure_hook_source,
286+
step_exception=step_exception,
287+
)
279288
raise step_exception
280289
finally:
281290
heartbeat_worker.stop()

0 commit comments

Comments
 (0)