Skip to content

Commit 7e84f6c

Browse files
authored
Merge pull request #567 from ikondov/recovery-rerun
[BUG] Recovery rerun
2 parents adec6ef + a3db8ee commit 7e84f6c

4 files changed

Lines changed: 78 additions & 18 deletions

File tree

fireworks/core/launchpad.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ def bulk_add_wfs(self, wfs) -> None:
410410
411411
"""
412412
# Make all fireworks workflows
413-
wfs = [Workflow.from_Firework(wf) if isinstance(wf, Firework) else wf for wf in wfs]
413+
wfs = [Workflow.from_firework(wf) if isinstance(wf, Firework) else wf for wf in wfs]
414414

415415
# Initialize new firework counter, starting from the next fw id
416416
total_num_fws = sum(len(wf) for wf in wfs)
@@ -1700,8 +1700,11 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
17001700
17011701
Returns:
17021702
[int]: list of firework ids that were rerun
1703+
1704+
Raises:
1705+
ValueError: raised in case of firework, recover_launch or recovery info not found
17031706
"""
1704-
m_fw = self.fireworks.find_one({"fw_id": fw_id}, {"state": 1})
1707+
m_fw = self.fireworks.find_one({"fw_id": fw_id}, {"state": True, "launches": True})
17051708

17061709
if not m_fw:
17071710
raise ValueError(f"FW with id: {fw_id} not found!")
@@ -1722,14 +1725,25 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
17221725

17231726
# Launch recovery
17241727
if recover_launch is not None:
1725-
recovery = self.get_recovery(fw_id, recover_launch)
1728+
if not m_fw["launches"]:
1729+
raise ValueError(f"FW with id: {fw_id} has no active launches")
1730+
if recover_launch == "last":
1731+
rec_launch_id = m_fw["launches"][-1]
1732+
else:
1733+
if recover_launch not in m_fw["launches"]:
1734+
raise ValueError(f"launch_id: {recover_launch} is no launch of fw_id: {fw_id}")
1735+
rec_launch_id = recover_launch
1736+
recovery = self.get_recovery(rec_launch_id)
1737+
if not recovery:
1738+
raise ValueError(f"No recovery info found in launch {rec_launch_id}")
17261739
recovery.update(_mode=recover_mode)
17271740
set_spec = recursive_dict({"$set": {"spec._recovery": recovery}})
17281741
if recover_mode == "prev_dir":
1729-
prev_dir = self.get_launch_by_id(recovery.get("_launch_id")).launch_dir
1742+
launch_f = {"launch_id": recovery.get("_launch_id")}
1743+
launch_p = {"launch_dir": True}
1744+
prev_dir = self.launches.find_one(launch_f, launch_p)["launch_dir"]
17301745
set_spec["$set"]["spec._launch_dir"] = prev_dir
17311746
self.fireworks.find_one_and_update({"fw_id": fw_id}, set_spec)
1732-
17331747
# If no launch recovery specified, unset the firework recovery spec
17341748
else:
17351749
set_spec = {"$unset": {"spec._recovery": ""}}
@@ -1756,17 +1770,28 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
17561770

17571771
return reruns
17581772

1759-
def get_recovery(self, fw_id, launch_id="last"):
1760-
"""Function to get recovery data for a given fw and launch
1773+
def get_recovery(self, launch_id):
1774+
"""Function to get recovery data for a given launch.
1775+
17611776
Args:
1762-
fw_id (int): fw id to get recovery data for
1763-
launch_id (int or 'last'): launch_id to get recovery data for, if 'last'
1764-
recovery data is generated from last launch.
1765-
"""
1766-
m_fw = self.get_fw_by_id(fw_id)
1767-
launch = m_fw.launches[-1] if launch_id == "last" else self.get_launch_by_id(launch_id)
1768-
recovery = launch.state_history[-1].get("checkpoint")
1769-
recovery.update(_prev_dir=launch.launch_dir, _launch_id=launch.launch_id)
1777+
launch_id (int): launch_id to get recovery data for
1778+
1779+
Returns:
1780+
recovery (dict): recovery metadata, None when no recovery retrieved
1781+
1782+
Raises:
1783+
ValueError: raised when no launch under the the launch_id is found
1784+
"""
1785+
launch_f = {"launch_id": launch_id}
1786+
launch_p = {"launch_dir": True, "state_history": True}
1787+
launch_dct = self.launches.find_one(launch_f, launch_p)
1788+
if launch_dct is None:
1789+
raise ValueError(f"launch_id: {launch_id} does not exist")
1790+
if not launch_dct["state_history"]:
1791+
return None
1792+
recovery = launch_dct["state_history"][-1].get("checkpoint")
1793+
if recovery:
1794+
recovery.update(_prev_dir=launch_dct["launch_dir"], _launch_id=launch_id)
17701795
return recovery
17711796

17721797
def _refresh_wf(self, fw_id) -> None:

fireworks/core/rocket.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ def do_ping(launchpad: LaunchPad, launch_id: int) -> None:
5454

5555
def ping_launch(launchpad: LaunchPad, launch_id: int, stop_event: Event, master_thread: Thread) -> None:
5656
while not stop_event.is_set() and master_thread.is_alive():
57-
do_ping(launchpad, launch_id)
57+
# wait before pinging to avoid a race condition with main thread
5858
stop_event.wait(PING_TIME_SECS)
59+
do_ping(launchpad, launch_id)
5960

6061

6162
def start_ping_launch(launchpad: LaunchPad, launch_id: int) -> Event | None:

fireworks/core/tests/test_launchpad.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,38 @@ def test_task_level_rerun_prev_dir(self) -> None:
11021102
assert ExecutionCounterTask.exec_counter == 1
11031103
assert ExceptionTestTask.exec_counter == 2
11041104

1105+
def test_task_level_rerun_wrong_fw_id(self) -> None:
1106+
with pytest.raises(ValueError, match="FW with id: 999 not found!"):
1107+
self.lp.rerun_fw(999)
1108+
1109+
def test_task_level_rerun_recover_launch_id(self) -> None:
1110+
rapidfire(self.lp, self.fworker, m_dir=MODULE_DIR)
1111+
self.lp.rerun_fw(fw_id=1, recover_launch=1)
1112+
fw = self.lp.get_fw_by_id(1)
1113+
assert "_recovery" in fw.spec
1114+
assert isinstance(fw.spec["_recovery"], dict)
1115+
assert fw.spec["_recovery"]["_task_n"] == 2
1116+
1117+
def test_task_level_rerun_wrong_launch_id(self) -> None:
1118+
rapidfire(self.lp, self.fworker, m_dir=MODULE_DIR)
1119+
assert os.getcwd() == MODULE_DIR
1120+
with pytest.raises(ValueError, match="launch_id: 999 is no launch of fw_id: 1"):
1121+
self.lp.rerun_fw(1, recover_launch=999)
1122+
1123+
def test_task_level_rerun_wrong_state(self) -> None:
1124+
with pytest.raises(ValueError, match="FW with id: 1 has no active launches"):
1125+
self.lp.rerun_fw(1, recover_launch="last")
1126+
1127+
def test_task_level_rerun_no_recovery_info(self) -> None:
1128+
self.lp.add_wf(Firework(ScriptTask.from_str('echo')))
1129+
launch_rocket(self.lp, self.fworker, fw_id=2)
1130+
with pytest.raises(ValueError, match="No recovery info found in launch 1"):
1131+
self.lp.rerun_fw(2, recover_launch="last")
1132+
1133+
def test_get_recovery_wrong_launch_id(self) -> None:
1134+
with pytest.raises(ValueError, match="launch_id: 999 does not exist"):
1135+
self.lp.get_recovery(launch_id=999)
1136+
11051137

11061138
class WFLockTest(unittest.TestCase):
11071139
@classmethod

fireworks/user_objects/queue_adapters/pbs_newt_adapter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
import os
77

88
from fireworks.queue.queue_adapter import QueueAdapterBase
9+
from fireworks.utilities.fw_utilities import get_fw_logger
910

1011
try:
1112
from requests import Session
1213
except ImportError:
1314
Session = None
14-
print("pip install requests to use PBSAdapterNEWT")
15+
msg = "install the fireworks[newt] extra or the requests package to use PBSAdapterNEWT"
16+
get_fw_logger(__name__).warning(msg)
1517

1618
__author__ = "Shreyas Cholia, Anubhav Jain"
1719
__copyright__ = "Copyright 2013, The Materials Project"
@@ -41,7 +43,7 @@ def submit_to_queue(self, script_file: str) -> int:
4143

4244
def get_njobs_in_queue(self, username: str | None = None) -> int:
4345
if username is None:
44-
username = getpass.getus
46+
username = getpass.getuser()
4547

4648
resp = Session().get(f"https://newt.nersc.gov/newt/queue/{self.resource}/?user={username}")
4749
return len(resp.json())

0 commit comments

Comments
 (0)