Skip to content

Commit 595e135

Browse files
committed
feat: fix bugs in shutdown and add stop (#4)
1 parent 69d0c42 commit 595e135

File tree

2 files changed

+34
-6
lines changed

2 files changed

+34
-6
lines changed

postgresql_watcher/watcher.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def __init__(
5050
logger (Optional[Logger], optional): Custom logger to use. Defaults to None.
5151
"""
5252
self.update_callback = None
53-
self.parent_conn = None
5453
self.host = host
5554
self.port = port
5655
self.user = user
@@ -83,7 +82,7 @@ def _create_subscription_process(
8382
self._cleanup_connections_and_processes()
8483

8584
self.parent_conn, self.child_conn = Pipe()
86-
self.subscription_proces = Process(
85+
self.subscription_process = Process(
8786
target=casbin_channel_subscription,
8887
args=(
8988
self.child_conn,
@@ -109,9 +108,12 @@ def start(
109108
self,
110109
timeout=20, # seconds
111110
):
112-
if not self.subscription_proces.is_alive():
111+
if self.subscription_process is None:
112+
self._create_subscription_process(start_listening=False)
113+
114+
if not self.subscription_process.is_alive():
113115
# Start listening to messages
114-
self.subscription_proces.start()
116+
self.subscription_process.start()
115117
# And wait for the Process to be ready to listen for updates
116118
# from PostgreSQL
117119
timeout_time = time() + timeout
@@ -124,6 +126,9 @@ def start(
124126
raise PostgresqlWatcherChannelSubscriptionTimeoutError(timeout)
125127
sleep(1 / 1000) # wait for 1 ms
126128

129+
def stop(self):
130+
self._cleanup_connections_and_processes()
131+
127132
def _cleanup_connections_and_processes(self) -> None:
128133
# Clean up potentially existing Connections and Processes
129134
if self.parent_conn is not None:
@@ -132,8 +137,9 @@ def _cleanup_connections_and_processes(self) -> None:
132137
if self.child_conn is not None:
133138
self.child_conn.close()
134139
self.child_conn = None
135-
if self.subscription_process is not None:
140+
if self.subscription_process is not None and self.subscription_process.pid is not None:
136141
self.subscription_process.terminate()
142+
self.subscription_process.join()
137143
self.subscription_process = None
138144

139145
def set_update_callback(self, update_handler: Optional[Callable[[None], None]]):

tests/test_postgresql_watcher.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_pg_watcher_init(self):
5050
assert isinstance(pg_watcher.parent_conn, connection.PipeConnection)
5151
else:
5252
assert isinstance(pg_watcher.parent_conn, connection.Connection)
53-
assert isinstance(pg_watcher.subscription_proces, context.Process)
53+
assert isinstance(pg_watcher.subscription_process, context.Process)
5454

5555
def test_update_single_pg_watcher(self):
5656
pg_watcher = get_watcher("test_update_single_pg_watcher")
@@ -115,6 +115,28 @@ def test_update_handler_not_called(self):
115115
self.assertFalse(main_watcher.should_reload())
116116
self.assertTrue(handler.call_count == 0)
117117

118+
def test_stop_and_restart(self):
119+
channel_name = "test_stop_and_restart"
120+
pg_watcher = get_watcher(channel_name)
121+
122+
# Verify initially started
123+
self.assertTrue(pg_watcher.subscription_process.is_alive())
124+
125+
# Stop the watcher
126+
pg_watcher.stop()
127+
self.assertIsNone(pg_watcher.subscription_process)
128+
129+
# Restart the watcher
130+
pg_watcher.start()
131+
132+
# Verify resources are recreated and process is alive
133+
self.assertTrue(pg_watcher.subscription_process.is_alive())
134+
135+
# Verify it still works after restart
136+
pg_watcher.update()
137+
sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2)
138+
self.assertTrue(pg_watcher.should_reload())
139+
118140

119141
if __name__ == "__main__":
120142
main()

0 commit comments

Comments
 (0)