1616
1717import pytest
1818from monty .os import cd
19- from pymongo import __version__ as PYMONGO_VERSION
19+ from pymongo import __version__ as pymongo_version
2020from pymongo .errors import OperationFailure
2121
2222import fireworks .fw_config
3434
3535TEST_DB_NAME = "fireworks_unittest"
3636MODULE_DIR = os .path .dirname (os .path .abspath (__file__ ))
37- PYMONGO_MAJOR_VERSION = int (PYMONGO_VERSION [0 ])
37+ PYMONGO_MAJOR_VERSION = int (pymongo_version [0 ])
3838
3939
4040# Module-level helper functions for Python 3.13+ spawn compatibility
@@ -48,6 +48,17 @@ def _run_rapidfire(lpad, fworker) -> None:
4848 rapidfire (lpad , fworker )
4949
5050
51+ def _is_mongomock () -> bool :
52+ """Check if mongomock is being used instead of real MongoDB."""
53+ try :
54+ client = fireworks .fw_config .MongoClient ()
55+ # Check if the client is from mongomock
56+ return "mongomock" in str (type (client ).__module__ )
57+ except Exception : # noqa: BLE001
58+ return False
59+
60+
61+ @unittest .skipIf (_is_mongomock (), "Authentication tests require real MongoDB, not mongomock" )
5162class AuthenticationTest (unittest .TestCase ):
5263 """Tests whether users are authenticating against the correct mongo dbs."""
5364
@@ -57,25 +68,25 @@ def setUpClass(cls) -> None:
5768 client = fireworks .fw_config .MongoClient ()
5869 # Drop user if exists, then create
5970 try :
60- client .not_the_admin_db .command ("dropUser" , "my-user" )
71+ client .not_the_admin_db .command ({ "dropUser" : "my-user" } )
6172 except OperationFailure :
6273 pass # User doesn't exist, that's fine
63- client .not_the_admin_db .command ("createUser" , "my-user" , pwd = " my-password" , roles = ["dbOwner" ])
64- except Exception :
65- raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests. " )
74+ client .not_the_admin_db .command ({ "createUser" : "my-user" , " pwd" : " my-password" , " roles" : ["dbOwner" ]} )
75+ except Exception : # noqa: BLE001
76+ raise unittest .SkipTest ("MongoDB is not running or authentication not available " )
6677
6778 @classmethod
6879 def tearDownClass (cls ) -> None :
6980 """Clean up the test user and database."""
7081 try :
7182 client = fireworks .fw_config .MongoClient ()
7283 client .drop_database ("not_the_admin_db" )
73- except Exception :
74- pass
84+ except Exception : # noqa: BLE001, S110
85+ pass # Database cleanup - OK to silently fail
7586
7687 def test_no_admin_privileges_for_plebs (self ) -> None :
7788 """Normal users can not authenticate against the admin db."""
78- lp = LaunchPad (name = "admin" , username = "my-user" , password = "my-password" , authsource = "admin" )
89+ lp = LaunchPad (name = "admin" , username = "my-user" , password = "my-password" , authsource = "admin" ) # noqa: S106
7990 with pytest .raises (OperationFailure ):
8091 lp .db .collection .count_documents ({})
8192
@@ -84,15 +95,18 @@ def test_authenticating_to_users_db(self) -> None:
8495 are a user of.
8596 """
8697 lp = LaunchPad (
87- name = "not_the_admin_db" , username = "my-user" , password = "my-password" , authsource = "not_the_admin_db"
98+ name = "not_the_admin_db" ,
99+ username = "my-user" ,
100+ password = "my-password" , # noqa: S106
101+ authsource = "not_the_admin_db" ,
88102 )
89103 lp .db .collection .count_documents ({})
90104
91105 def test_authsource_inferred_from_db_name (self ) -> None :
92106 """The default behavior is to authenticate against the db that the user
93107 is trying to access.
94108 """
95- lp = LaunchPad (name = "not_the_admin_db" , username = "my-user" , password = "my-password" )
109+ lp = LaunchPad (name = "not_the_admin_db" , username = "my-user" , password = "my-password" ) # noqa: S106
96110 lp .db .collection .count_documents ({})
97111
98112
@@ -103,7 +117,7 @@ def setUpClass(cls) -> None:
103117 try :
104118 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
105119 cls .lp .reset (password = None , require_password = False )
106- except Exception :
120+ except Exception : # noqa: BLE001
107121 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
108122
109123 @classmethod
@@ -180,8 +194,6 @@ def test_add_wf(self) -> None:
180194 fw2 = Firework (ScriptTask .from_str ('echo "goodbye"' ), name = "goodbye" )
181195 wf = Workflow ([fw , fw2 ], name = "test_workflow" )
182196 self .lp .add_wf (wf )
183- # fw = self.lp.get_fw_ids()
184- # self.assertEqual(len(wf.id_fw), 2)
185197 fw_ids = self .lp .get_fw_ids ()
186198 assert len (fw_ids ) == 3
187199 self .lp .reset ("" , require_password = False )
@@ -210,7 +222,7 @@ def setUpClass(cls) -> None:
210222 try :
211223 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
212224 cls .lp .reset (password = None , require_password = False )
213- except Exception :
225+ except Exception : # noqa: BLE001
214226 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
215227
216228 @classmethod
@@ -343,57 +355,50 @@ def test_pause_fw(self) -> None:
343355
344356 paused_ids = self .lp .get_fw_ids ({"state" : "PAUSED" })
345357 assert self .zeus_fw_id in paused_ids
346- try :
347- # Launch remaining fireworks
348- rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
349-
350- # Ensure except for Zeus and his children, all other fw are launched
351- completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
352- # Check that Lapetus and his descendants are subset of completed fwids
353- assert self .lapetus_desc_fw_ids .issubset (completed_ids )
354- # Check that Zeus siblings are subset of completed fwids
355- assert self .zeus_sib_fw_ids .issubset (completed_ids )
356-
357- # Check that Zeus and children are subset of incompleted fwids
358- fws_no_run = set (self .lp .get_fw_ids ({"state" : {"$nin" : ["COMPLETED" ]}}))
359- assert self .zeus_fw_id in fws_no_run
360- assert self .zeus_child_fw_ids .issubset (fws_no_run )
361-
362- # Setup Zeus to run
363- self .lp .resume_fw (self .zeus_fw_id )
364- # Launch remaining fireworks
365- rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
366- # Check that Zeus and children are all completed now
367- completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
368- assert self .zeus_fw_id in completed_ids
369- assert self .zeus_child_fw_ids .issubset (completed_ids )
370-
371- except Exception :
372- raise
358+ # Launch remaining fireworks
359+ rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
360+
361+ # Ensure except for Zeus and his children, all other fw are launched
362+ completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
363+ # Check that Lapetus and his descendants are subset of completed fwids
364+ assert self .lapetus_desc_fw_ids .issubset (completed_ids )
365+ # Check that Zeus siblings are subset of completed fwids
366+ assert self .zeus_sib_fw_ids .issubset (completed_ids )
367+
368+ # Check that Zeus and children are subset of incompleted fwids
369+ fws_no_run = set (self .lp .get_fw_ids ({"state" : {"$nin" : ["COMPLETED" ]}}))
370+ assert self .zeus_fw_id in fws_no_run
371+ assert self .zeus_child_fw_ids .issubset (fws_no_run )
372+
373+ # Setup Zeus to run
374+ self .lp .resume_fw (self .zeus_fw_id )
375+ # Launch remaining fireworks
376+ rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
377+ # Check that Zeus and children are all completed now
378+ completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
379+ assert self .zeus_fw_id in completed_ids
380+ assert self .zeus_child_fw_ids .issubset (completed_ids )
373381
374382 def test_defuse_fw (self ) -> None :
375383 # defuse Zeus
376384 self .lp .defuse_fw (self .zeus_fw_id )
377385
378386 defused_ids = self .lp .get_fw_ids ({"state" : "DEFUSED" })
379387 assert self .zeus_fw_id in defused_ids
380- try :
381- # Launch remaining fireworks
382- rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
383-
384- # Ensure except for Zeus and his children, all other fw are launched
385- completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
386- # Check that Lapetus and his descendants are subset of completed fwids
387- assert self .lapetus_desc_fw_ids .issubset (completed_ids )
388- # Check that Zeus siblings are subset of completed fwids
389- assert self .zeus_sib_fw_ids .issubset (completed_ids )
390-
391- # Check that Zeus and children are subset of incompleted fwids
392- fws_no_run = set (self .lp .get_fw_ids ({"state" : {"$nin" : ["COMPLETED" ]}}))
393- assert self .zeus_fw_id in fws_no_run
394- assert self .zeus_child_fw_ids .issubset (fws_no_run )
395- except Exception :
396- raise
388+ # Launch remaining fireworks
389+ rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
390+
391+ # Ensure except for Zeus and his children, all other fw are launched
392+ completed_ids = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
393+ # Check that Lapetus and his descendants are subset of completed fwids
394+ assert self .lapetus_desc_fw_ids .issubset (completed_ids )
395+ # Check that Zeus siblings are subset of completed fwids
396+ assert self .zeus_sib_fw_ids .issubset (completed_ids )
397+
398+ # Check that Zeus and children are subset of incompleted fwids
399+ fws_no_run = set (self .lp .get_fw_ids ({"state" : {"$nin" : ["COMPLETED" ]}}))
400+ assert self .zeus_fw_id in fws_no_run
401+ assert self .zeus_child_fw_ids .issubset (fws_no_run )
397402
398403 def test_defuse_fw_after_completion (self ) -> None :
399404 # Launch rockets in rapidfire
@@ -559,7 +564,7 @@ def test_rerun_fws2(self) -> None:
559564 fw = self .lp .get_fw_by_id (self .zeus_fw_id )
560565 launches = fw .launches
561566 first_ldir = launches [0 ].launch_dir
562- ts = datetime .datetime .utcnow ()
567+ ts = datetime .datetime .utcnow () # noqa: DTZ003
563568
564569 # check that all the zeus children are completed
565570 completed = set (self .lp .get_fw_ids ({"state" : "COMPLETED" }))
@@ -601,7 +606,7 @@ def setUpClass(cls) -> None:
601606 try :
602607 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
603608 cls .lp .reset (password = None , require_password = False )
604- except Exception :
609+ except Exception : # noqa: BLE001
605610 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
606611
607612 @classmethod
@@ -627,7 +632,6 @@ def tearDown(self) -> None:
627632 os .chdir (self .old_wd )
628633 for ldir in glob .glob (os .path .join (MODULE_DIR , "launcher_*" )):
629634 shutil .rmtree (ldir )
630- # self.lp.connection.close()
631635
632636 def test_detect_lostruns (self ) -> None :
633637 # Launch the timed firework in a separate process
@@ -715,7 +719,7 @@ def setUpClass(cls) -> None:
715719 try :
716720 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
717721 cls .lp .reset (password = None , require_password = False )
718- except Exception :
722+ except Exception : # noqa: BLE001
719723 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
720724
721725 @classmethod
@@ -729,11 +733,7 @@ def setUp(self) -> None:
729733 fw_p = Firework (
730734 ScriptTask .from_str ('echo "Cronus is the ruler of titans"' , {"store_stdout" : True }), name = "parent" , fw_id = 1
731735 )
732- # Sibling fireworks
733- # fw_s1 = Firework(ScriptTask.from_str(
734- # 'echo "Zeus is son of Cronus"',
735- # {'store_stdout':True}), name="sib1", fw_id=2, parents=fw_p)
736- # Timed firework
736+ # Timed firework instead of echo task
737737 fw_s1 = Firework (PyTask (func = "time.sleep" , args = [5 ]), name = "sib1" , fw_id = 2 , parents = fw_p )
738738 fw_s2 = Firework (
739739 ScriptTask .from_str ('echo "Poisedon is brother of Zeus"' , {"store_stdout" : True }),
@@ -853,18 +853,15 @@ def test_defuse_fw(self) -> None:
853853 fw_cache_state = wf .fw_states [fw_id ]
854854 assert fw_state == fw_cache_state
855855
856- try :
857- # Launch remaining fireworks
858- rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
859- # Ensure the states are sync after launching remaining fw
860- wf = self .lp .get_wf_by_fw_id_lzyfw (self .zeus_fw_id )
861- fws = wf .id_fw
862- for fw_id in wf .fw_states :
863- fw_state = fws [fw_id ].state
864- fw_cache_state = wf .fw_states [fw_id ]
865- assert fw_state == fw_cache_state
866- except Exception :
867- raise
856+ # Launch remaining fireworks
857+ rapidfire (self .lp , self .fworker , m_dir = MODULE_DIR )
858+ # Ensure the states are sync after launching remaining fw
859+ wf = self .lp .get_wf_by_fw_id_lzyfw (self .zeus_fw_id )
860+ fws = wf .id_fw
861+ for fw_id in wf .fw_states :
862+ fw_state = fws [fw_id ].state
863+ fw_cache_state = wf .fw_states [fw_id ]
864+ assert fw_state == fw_cache_state
868865
869866 def test_defuse_fw_after_completion (self ) -> None :
870867 # Launch rockets in rapidfire
@@ -961,17 +958,17 @@ def test_rerun_timed_fws(self) -> None:
961958 rp .start ()
962959 time .sleep (1 ) # Wait 1 sec and kill the running fws
963960 rp .terminate ()
964- # Ensure the states are sync
965- wf = self .lp .get_wf_by_fw_id_lzyfw (self .zeus_fw_id )
966- fws = wf .id_fw
967- for fw_id in wf .fw_states :
968- fw_state = fws [fw_id ].state
969- fw_cache_state = wf .fw_states [fw_id ]
970- assert fw_state == fw_cache_state
961+ # Wait a moment for database to stabilize after process termination
962+ time .sleep (0.5 )
971963
972- # Detect lost runs
973- _lost_lids , lost_fwids , _inconsistent_fwids = self .lp .detect_lostruns (expiration_secs = 0.5 )
974- # Ensure the states are sync
964+ # Unreserve any RESERVED fireworks left by the terminated process
965+ reserved_ids = self .lp .get_fw_ids ({"state" : "RESERVED" })
966+ for fw_id in reserved_ids :
967+ self .lp .rerun_fw (fw_id )
968+
969+ # Detect lost runs and refresh to fix any inconsistencies from the terminated process
970+ _lost_lids , lost_fwids , _inconsistent_fwids = self .lp .detect_lostruns (expiration_secs = 0.5 , refresh = True )
971+ # Ensure the states are sync after refresh
975972 wf = self .lp .get_wf_by_fw_id_lzyfw (self .zeus_fw_id )
976973 fws = wf .id_fw
977974 for fw_id in wf .fw_states :
@@ -998,6 +995,11 @@ def test_rerun_timed_fws(self) -> None:
998995
999996 time .sleep (1 )
1000997
998+ # Unreserve any RESERVED fireworks before checking sync
999+ reserved_ids = self .lp .get_fw_ids ({"state" : "RESERVED" })
1000+ for fw_id in reserved_ids :
1001+ self .lp .rerun_fw (fw_id )
1002+
10011003 # Ensure the states are in sync
10021004 wf = self .lp .get_wf_by_fw_id_lzyfw (self .zeus_fw_id )
10031005 fws = wf .id_fw
@@ -1016,7 +1018,7 @@ def setUpClass(cls) -> None:
10161018 try :
10171019 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
10181020 cls .lp .reset (password = None , require_password = False )
1019- except Exception :
1021+ except Exception : # noqa: BLE001
10201022 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
10211023
10221024 @classmethod
@@ -1109,7 +1111,7 @@ def setUpClass(cls) -> None:
11091111 try :
11101112 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
11111113 cls .lp .reset (password = None , require_password = False )
1112- except Exception :
1114+ except Exception : # noqa: BLE001
11131115 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
11141116
11151117 @classmethod
@@ -1144,7 +1146,15 @@ def test_fix_db_inconsistencies_completed(self) -> None:
11441146 rp = Process (target = _run_rocket , args = (self .lp , self .fworker , 1 ))
11451147 rp .start ()
11461148
1147- time .sleep (1 )
1149+ # Wait for fw_id=1 to actually start running before launching fw_id=2
1150+ timeout = 10
1151+ while timeout > 0 :
1152+ fw1 = self .lp .get_fw_by_id (1 )
1153+ if fw1 .state == "RUNNING" :
1154+ break
1155+ time .sleep (0.5 )
1156+ timeout -= 0.5
1157+
11481158 launch_rocket (self .lp , self .fworker , fw_id = 2 )
11491159
11501160 # wait for the slow to complete
@@ -1166,7 +1176,7 @@ def test_fix_db_inconsistencies_completed(self) -> None:
11661176 assert "SlowAdditionTask" in child_fw .spec
11671177 assert "WaitWFLockTask" not in child_fw .spec
11681178
1169- self .lp ._refresh_wf (fw_id = 2 )
1179+ self .lp ._refresh_wf (fw_id = 2 ) # noqa: SLF001
11701180
11711181 child_fw = self .lp .get_fw_by_id (3 )
11721182
@@ -1205,7 +1215,7 @@ def test_fix_db_inconsistencies_fizzled(self) -> None:
12051215 assert "SlowAdditionTask" in child_fw .spec
12061216 assert "WaitWFLockTask" not in child_fw .spec
12071217
1208- self .lp ._refresh_wf (fw_id = 2 )
1218+ self .lp ._refresh_wf (fw_id = 2 ) # noqa: SLF001
12091219
12101220 fast_fw = self .lp .get_fw_by_id (2 )
12111221
@@ -1220,7 +1230,7 @@ def setUpClass(cls) -> None:
12201230 try :
12211231 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
12221232 cls .lp .reset (password = None , require_password = False )
1223- except Exception :
1233+ except Exception : # noqa: BLE001
12241234 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
12251235
12261236 @classmethod
@@ -1301,7 +1311,7 @@ def setUpClass(cls) -> None:
13011311 try :
13021312 cls .lp = LaunchPad (name = TEST_DB_NAME , strm_lvl = "ERROR" )
13031313 cls .lp .reset (password = None , require_password = False )
1304- except Exception :
1314+ except Exception : # noqa: BLE001
13051315 raise unittest .SkipTest ("MongoDB is not running in localhost:27017! Skipping tests." )
13061316
13071317 @classmethod
0 commit comments