-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprocess_utils.py
More file actions
116 lines (103 loc) · 3.24 KB
/
process_utils.py
File metadata and controls
116 lines (103 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import logging
import subprocess
from omegaconf import DictConfig
from typing import List, Tuple
logger = logging.getLogger(__name__)
def get_output_or_exit(command: str) -> str:
"""
Runs the bash command and streams the output
"""
logger.info(f"Running command: {command}")
p = subprocess.Popen(
args=command,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
for line in p.stdout:
logger.info(f"Stdout: {line}")
p.wait()
stdout, stderr = p.communicate()
if p.returncode != 0:
raise ValueError(
f"Command '{command}' has failed.\nStdout: {stdout}\nStderr: {stderr}"
)
return stdout
def create_slurm_submission(
job_name: str,
py_commands: str,
slurm_script_path: str,
wait: bool = True,
) -> str:
wait_str = ""
if wait:
wait_str = "--wait"
return f'sbatch {wait_str} --parsable --job-name={job_name} {slurm_script_path} "{py_commands}"'
def create_lsf_submission(
job_name: str,
py_commands: str,
lsf_script_path: str,
str_to_replace: str = "command",
wait: bool = True,
) -> str:
wait_str = ""
if wait:
wait_str = "-K"
return f'sed "s|{str_to_replace}|{py_commands}|g" < {lsf_script_path} | bsub -J {job_name} {wait_str}'
def create_job_submission(
cfg: DictConfig,
job_name: str,
py_commands: str,
lsf_script_path: str,
slurm_script_path: str,
wait: bool = True,
) -> str:
if cfg.job_submission_system == "slurm":
return create_slurm_submission(
job_name, py_commands, slurm_script_path, wait=wait
)
elif cfg.job_submission_system == "lsf":
return create_lsf_submission(job_name, py_commands, lsf_script_path, wait=wait)
else:
raise ValueError(
f"Unrecognized job submission system: {cfg.job_submission_system}"
)
def run_all_commands_and_wait_until_all_completed(
commands: List[str], ignore_failures: bool = False
) -> Tuple[List[str], List[str], List[int]]:
"""
Runs all the commands in bash shells in parallel processes and
waits until all child processes have completed. Returns the outputs from all commands.
"""
all_processes = []
for cmd in commands:
p = subprocess.Popen(
args=cmd,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
logger.info(f"Running command: {cmd}")
all_processes.append(p)
all_outputs = []
all_stderrs = []
returncodes = []
for p in all_processes:
stdout_str = ""
for line in p.stdout:
logger.info(f"Stdout: {line}")
stdout_str += f"{line}\n"
all_outputs.append(stdout_str)
for p, cmd in zip(all_processes, commands):
p.wait()
# If the process failed, return early!
_, stderr = p.communicate()
if p.returncode != 0 and not ignore_failures:
raise ValueError(f"Command '{cmd}' has failed.\nStderr: {stderr}")
all_stderrs.append(stderr)
returncodes.append(p.returncode)
return all_outputs, all_stderrs, returncodes