diff --git a/packages/climate-ref/src/climate_ref/executor/hpc.py b/packages/climate-ref/src/climate_ref/executor/hpc.py index 8d9563bc0..458f3983a 100644 --- a/packages/climate-ref/src/climate_ref/executor/hpc.py +++ b/packages/climate-ref/src/climate_ref/executor/hpc.py @@ -20,8 +20,10 @@ import os import re +import resource import time -from typing import Annotated, Any, Literal +from collections.abc import Callable +from typing import Annotated, Any, Literal, TypeVar, cast import parsl from loguru import logger @@ -44,6 +46,8 @@ from .local import ExecutionFuture, process_result from .pbs_scheduler import SmartPBSProvider +F = TypeVar("F", bound=Callable[..., Any]) + class SlurmConfig(BaseModel): """Slurm Configurations""" @@ -61,7 +65,7 @@ class SlurmConfig(BaseModel): validation: StrictBool = False walltime: str = "00:30:00" scheduler_options: str = "" - retries: Annotated[int, Field(strict=True, ge=1, le=3)] = 2 + retries: Annotated[int, Field(strict=True, ge=0, le=3)] = 2 max_blocks: Annotated[int, Field(strict=True, ge=1)] = 1 # one block mean one job? worker_init: str = "" overrides: str = "" @@ -111,7 +115,23 @@ def _validate_walltime(cls, v: str) -> str: return v +def with_memory_limit(limit_gb: float) -> Callable[[F], F]: + """Set memory limit for a parsl worker""" + + def decorator(func: F) -> F: + def wrapper(*args: Any, **kwargs: Any) -> Any: + bytes_limit = int(limit_gb * 1024 * 1024 * 1024) + soft, hard = bytes_limit, bytes_limit + resource.setrlimit(resource.RLIMIT_AS, (soft, hard)) + return func(*args, **kwargs) + + return cast(F, wrapper) + + return decorator + + @python_app +@with_memory_limit(7.0) def _process_run(definition: ExecutionDefinition, log_level: str) -> ExecutionResult: """Run the function on computer nodes""" # This is a catch-all for any exceptions that occur in the process and need to raise for