diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index ad1020abd..f52414f52 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -380,6 +380,9 @@ def _start_rsyncer( stop_callback=self._rsyncer_stopped, do_transfer=self.do_transfer, remove_files=remove_files, + substrings_blacklist=self._machine_config.get( + "substrings_blacklist", {"directories": [], "files": []} + ), end_time=self.visit_end_time, ) diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index f9729cf0b..deae2f443 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -10,6 +10,7 @@ import logging import os import queue +import shutil import subprocess import threading import time @@ -61,6 +62,7 @@ def __init__( do_transfer: bool = True, remove_files: bool = False, required_substrings_for_removal: List[str] = [], + substrings_blacklist: dict[str, list[str]] = {}, notify: bool = True, end_time: datetime | None = None, ): @@ -68,26 +70,24 @@ def __init__( self._basepath = basepath_local.absolute() self._basepath_remote = basepath_remote self._rsync_module = rsync_module + self._server_url = server_url + self._stop_callback = stop_callback + self._local = local self._do_transfer = do_transfer self._remove_files = remove_files self._required_substrings_for_removal = required_substrings_for_removal - self._stop_callback = stop_callback - self._local = local - self._server_url = server_url + self._substrings_blacklist = substrings_blacklist self._notify = notify - self._finalised = False self._end_time = end_time - self._finalising = False self._skipped_files: List[Path] = [] # Set rsync destination - if local: - self._remote = str(basepath_remote) - else: - self._remote = ( - f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/" - ) + self._remote = ( + str(basepath_remote) + if local + else f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/" + ) logger.debug(f"rsync destination path set to {self._remote}") # For local tests you can use something along the lines of @@ -105,9 +105,24 @@ def __init__( ) self._stopping = False self._halt_thread = False + self._finalising = False + self._finalised = False + + @property + def status(self) -> str: + if self._stopping: + if self.thread.is_alive(): + return "stopping" + else: + return "finished" + else: + if self.thread.is_alive(): + return "running" + else: + return "ready" def __repr__(self) -> str: - return f"" @classmethod def from_rsyncer(cls, rsyncer: RSyncer, **kwargs): @@ -133,19 +148,6 @@ def from_rsyncer(cls, rsyncer: RSyncer, **kwargs): notify=kwarguments_from_rsyncer["notify"], ) - @property - def status(self) -> str: - if self._stopping: - if self.thread.is_alive(): - return "stopping" - else: - return "finished" - else: - if self.thread.is_alive(): - return "running" - else: - return "ready" - def notify(self, *args, secondary: bool = False, **kwargs) -> None: if self._notify: super().notify(*args, secondary=secondary, **kwargs) @@ -169,7 +171,7 @@ def restart(self): self.start() def stop(self): - logger.debug("RSync thread stop requested") + logger.info(f"Stopping RSync thread {self}") self._stopping = True if self.thread.is_alive(): logger.info("Waiting for ongoing transfers to complete...") @@ -179,7 +181,7 @@ def stop(self): if self.thread.is_alive(): self.queue.put(None) self.thread.join() - logger.debug("RSync thread successfully stopped") + logger.info(f"RSync thread {self} successfully stopped") def request_stop(self): self._stopping = True @@ -195,18 +197,52 @@ def finalise( self._notify = False self._end_time = None self._finalising = True + + # Perform recursive cleanup on current directory + logger.info(f"Starting file cleanup for RSync thread {self}") + files_to_transfer: list[Path] = [] + + def recursive_cleanup(dirpath: str | Path): + for entry in os.scandir(dirpath): + if entry.is_dir(): + # Recursively delete directories with blacklisted substrings + if any( + pattern in entry.name + for pattern in self._substrings_blacklist.get("directories", []) + ): + logger.debug(f"Deleting blacklisted directory {entry.path}") + shutil.rmtree(entry.path) + continue + # Recursively search in whitelisted ones + recursive_cleanup(entry.path) + elif entry.is_file(): + # Delete blacklisted files + if any( + pattern in entry.name + for pattern in self._substrings_blacklist.get("files", []) + ): + logger.debug(f"Deleting blacklisted file {entry.path}") + Path(entry.path).unlink() + continue + # Append others for transfer + files_to_transfer.append(Path(entry.path)) + + recursive_cleanup(self._basepath) + logger.debug(f"Number of files to transfer: {len(files_to_transfer)}") + if thread: self.thread = threading.Thread( name=f"RSync finalisation {self._basepath}:{self._remote}", target=self._process, daemon=True, ) - for f in self._basepath.glob("**/*"): + for f in files_to_transfer: self.queue.put(f) self.stop() else: - self._transfer(list(self._basepath.glob("**/*"))) + self._transfer(files_to_transfer) self._finalised = True + logger.info(f"File cleanup for RSync thread {self} successfully completed") if callback: callback() @@ -221,7 +257,7 @@ def flush_skipped(self): self._skipped_files = [] def _process(self): - logger.info("RSync thread starting") + logger.info(f"Starting main process loop for RSync thread {self}") files_to_transfer: list[Path] backoff = 0 while not self._halt_thread: diff --git a/src/murfey/client/watchdir.py b/src/murfey/client/watchdir.py index fdb3d363b..24abe0293 100644 --- a/src/murfey/client/watchdir.py +++ b/src/murfey/client/watchdir.py @@ -247,8 +247,8 @@ def _scan_directory( entry_name = os.path.join(path, entry.name) # Skip any directories with matching blacklisted substrings if entry.is_dir() and any( - char in entry.name - for char in self._substrings_blacklist.get("directories", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("directories", []) ): log.debug(f"Skipping blacklisted directory {str(entry.name)!r}") continue @@ -262,8 +262,8 @@ def _scan_directory( continue # Exclude files with blacklisted substrings if any( - char in entry.name - for char in self._substrings_blacklist.get("files", []) + pattern in entry.name + for pattern in self._substrings_blacklist.get("files", []) ): log.debug(f"Skipping blacklisted file {str(entry.name)!r}") continue diff --git a/src/murfey/util/rsync.py b/src/murfey/util/rsync.py deleted file mode 100644 index bcbb4b059..000000000 --- a/src/murfey/util/rsync.py +++ /dev/null @@ -1,172 +0,0 @@ -from __future__ import annotations - -import logging -import subprocess -from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple, Union - -from murfey.util import Processor -from murfey.util.file_monitor import Monitor - -logger = logging.getLogger("murfey.util.rsync") - - -class RsyncPipe(Processor): - def __init__( - self, - finaldir: Path, - name: str = "rsync_pipe", - root: Optional[Path] = None, - notify: Optional[Callable[[Path], Optional[dict]]] = None, - destination_structure: Optional[ - Callable[[Path, Path], Tuple[Path, str]] - ] = None, - ): - super().__init__(name=name) - self._finaldir = finaldir - self.failed: List[Path] = [] - self._failed_tmp: List[str] = [] - self._transferring = False - self.sent_bytes = 0 - self.received_bytes = 0 - self.byte_rate: float = 0 - self.total_size = 0 - self.runner_return: List[subprocess.CompletedProcess] = [] - self._root = root - self._sub_structure: Optional[Path] = None - self._notify = notify or (lambda f: None) - self._destination_structure = destination_structure - - def _process(self, retry: bool = True, **kwargs): - if isinstance(self._previous, Monitor) and self._previous.thread: - while self._previous.thread.is_alive(): - files_for_transfer = self._in.get() - if not files_for_transfer: - continue - self._run_rsync(self._previous.dir, files_for_transfer, retry=retry) - - def _run_rsync( - self, - root: Path, - files: List[Path], - retry: bool = True, - ): - """ - Run rsync -v on a list of files using subprocess. - - :param root: root path of files for transferring; structure below the root is preserved - :type root: pathlib.Path object - :param files: List of files to be transferred - :type files: list of strigs or pathlib.Path objects - :param destination: Directory that files are to be copied to. - :type destination: string or pathlib.Path object - :param retry: If True put failed files back into the queue to be consumed - :type retry: bool - """ - self._root = root - - def _structure(p: Path) -> Path: - return (p.relative_to(root)).parent - - divided_files: Dict[Path, List[Path]] = {} - for f in files: - s = _structure(f) - try: - divided_files[s].append(f) - except KeyError: - divided_files[s] = [f] - for s in divided_files.keys(): - if self._destination_structure: - for f in divided_files[s]: - self._sub_structure, new_file_name = self._destination_structure( - s, f - ) - self._single_rsync( - root, - self._sub_structure, - [f], - file_name=Path(new_file_name), - retry=retry, - ) - else: - self._sub_structure = s - self._single_rsync(root, s, divided_files[s], retry=retry) - - def _single_rsync( - self, - root: Path, - sub_struct: Union[str, Path], - sources: List[Path], - file_name: Optional[Path] = None, - retry: bool = True, - ): - cmd: List[str] = ["rsync", "-v"] - self._failed_tmp = [] - cmd.extend(str(f) for f in sources) - if file_name: - cmd.append(str(self._finaldir / sub_struct / file_name)) - else: - cmd.append(str(self._finaldir / sub_struct) + "/") - self._transferring = True - - runner = subprocess.run( - cmd, - capture_output=True, - ) - for line in runner.stdout.decode("utf-8", "replace").split("\n"): - self._parse_rsync_stdout(line) - for line in runner.stderr.decode("utf-8", "replace").split("\n"): - self._parse_rsync_stderr(line) - self.runner_return.append(runner) - self.failed.extend(root / sub_struct / f for f in self._failed_tmp) - if retry: - self._in.put(root / sub_struct / f for f in self._failed_tmp) - - def _parse_rsync_stdout(self, line: str): - """ - Parse rsync stdout to collect information such as the paths of transferred - files and the amount of data transferred. - - :param stdout: stdout of rsync process - :type stdout: bytes - """ - if self._transferring: - if line.startswith("sent"): - self._transferring = False - byte_info = line.split() - self.sent_bytes = int( - byte_info[byte_info.index("sent") + 1].replace(",", "") - ) - self.received_bytes = int( - byte_info[byte_info.index("received") + 1].replace(",", "") - ) - self.byte_rate = float( - byte_info[byte_info.index("bytes/sec") - 1].replace(",", "") - ) - elif len(line.split()) == 1: - if self._root and self._sub_structure: - self._notify(self._finaldir / self._sub_structure / line) - self._out.put(self._root / self._sub_structure / line) - else: - logger.warning( - f"root or substructure not set for transfer of {line}" - ) - else: - if "total size" in line: - self.total_size = int(line.replace("total size", "").split()[1]) - - def _parse_rsync_stderr(self, line: str): - """ - Parse rsync stderr to collect information on any files that failed to transfer. - - :param stderr: stderr of rsync process - :type stderr: bytes - """ - if ( - line.startswith("rsync: link_stat") - or line.startswith("rsync: [sender] link_stat") - ) and "failed" in line: - failed_msg = line.split() - self._failed_tmp.append( - failed_msg[failed_msg.index("failed:") - 1].replace('"', "") - ) diff --git a/tests/client/test_rsync.py b/tests/client/test_rsync.py new file mode 100644 index 000000000..96e1e4fb3 --- /dev/null +++ b/tests/client/test_rsync.py @@ -0,0 +1,584 @@ +import queue +import threading +from datetime import datetime +from pathlib import Path +from unittest import mock +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.client.rsync import RSyncer +from tests.conftest import ExampleVisit + + +@pytest.fixture +def rsync_module(): + return "data" + + +@pytest.fixture +def mock_server_url(): + mock_url = MagicMock() + mock_url.hostname = "10.0.0.1" + return mock_url + + +# Create a dummy callback function +def dummy_callback(): + return None + + +@pytest.mark.parametrize("is_local", (True, False)) +def test_rsyncer_initialises( + tmp_path: Path, + rsync_module: str, + mock_server_url: MagicMock, + is_local: bool, +): + # Assign values to parameters + basepath_local = tmp_path / "local" + basepath_remote = tmp_path / "remote" + + # Create a test substrings blacklist dict + substrings_blacklist = { + "directories": ["1", "2", "3"], + "files": ["a", "b", "c"], + } + + # Create a timestamp + timestamp = datetime.now() + + # Initialise the RSyncer + rsyncer = RSyncer( + basepath_local=basepath_local, + basepath_remote=basepath_remote, + rsync_module=rsync_module, + server_url=mock_server_url, + stop_callback=dummy_callback, + local=is_local, + substrings_blacklist=substrings_blacklist, + end_time=timestamp, + ) + + # Check that the attributes are as expected + assert rsyncer._basepath == basepath_local.absolute() + assert rsyncer._basepath_remote == basepath_remote + assert rsyncer._rsync_module == rsync_module + assert rsyncer._server_url == mock_server_url + assert rsyncer._stop_callback == dummy_callback + assert rsyncer._local == is_local + assert rsyncer._do_transfer + assert not rsyncer._remove_files + assert rsyncer._required_substrings_for_removal == [] + assert rsyncer._substrings_blacklist == substrings_blacklist + assert rsyncer._notify + assert rsyncer._end_time == timestamp + assert rsyncer._skipped_files == [] + assert ( + rsyncer._remote == str(basepath_remote) + if is_local + else f"{mock_server_url.hostname}::{rsync_module}/{basepath_remote}" + ) + assert rsyncer._files_transferred == 0 + assert rsyncer._bytes_transferred == 0 + assert isinstance(rsyncer.queue, queue.Queue) + assert isinstance(rsyncer.thread, threading.Thread) + assert not rsyncer._stopping + assert not rsyncer._halt_thread + assert not rsyncer._finalising + assert not rsyncer._finalised + + +@pytest.mark.parametrize( + "test_params", + ( + # Is stopping? | Is thread alive? | Expected status + (False, False, "ready"), + (False, True, "running"), + (True, True, "stopping"), + (True, False, "finished"), + ), +) +def test_rsyncer_status( + tmp_path: Path, + mock_server_url: MagicMock, + test_params: tuple[bool, bool, str], +): + # Unpack test params + is_stopping, is_thread_alive, expected_status = test_params + + # Mock the thread + mock_thread = MagicMock() + mock_thread.is_alive.return_value = is_thread_alive + + # Initialise the RSyncer and patch the attributes to be tested + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.thread = mock_thread + rsyncer._stopping = is_stopping + + # Check that its status is correct + assert rsyncer.status == expected_status + + # Check that its canonical representation is correct + assert str(rsyncer) == f"" + + +@pytest.mark.parametrize("notify", (True, False)) +def test_rsyncer_notify( + mocker: MockerFixture, + tmp_path: Path, + mock_server_url: MagicMock, + notify: bool, +): + # Patch the superclass that RSyncer stems from + mock_notify = mocker.patch("murfey.client.rsync.Observer.notify") + mock_notify.return_value = None + + # Initialise the RSyncer + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + notify=notify, + ) + # Check that the 'notify' attribute is set correctly + assert rsyncer._notify == notify + + # Run 'notify' and check that the expected calls were made + rsyncer.notify("arg1", "arg2", kwarg1="kwarg1", kwarg2="kwarg2") + if notify: + mock_notify.assert_called_once_with( + "arg1", + "arg2", + secondary=False, + kwarg1="kwarg1", + kwarg2="kwarg2", + ) + else: + mock_notify.assert_not_called() + + +@pytest.mark.parametrize("rsyncer_status", ("default", "is_alive", "stopping")) +def test_rsyncer_start( + tmp_path: Path, + mock_server_url: MagicMock, + rsyncer_status: str, +): + # Mock the thread attribute so that it doesn't start an actual Thread + mock_thread = MagicMock() + mock_thread.start.return_value = None + mock_thread.is_alive.return_value = rsyncer_status == "is_alive" + + # Initialise the RSyncer and patch the attributes to be tested + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.thread = mock_thread + rsyncer._stopping = rsyncer_status == "stopping" + + # Start the RSyncer + if rsyncer_status == "default": + rsyncer.start() + mock_thread.start.assert_called_once() + else: + with pytest.raises(RuntimeError): + rsyncer.start() + + +def test_rsyncer_restart( + mocker: MockerFixture, + tmp_path: Path, + mock_server_url: MagicMock, +): + # Patch the 'start' class method, which is called by 'restart' + mock_start = mocker.patch.object(RSyncer, "start") + mock_start.return_value = None + + # Mock the thread and the attributes used + mock_thread = MagicMock() + mock_thread.join.return_value = None + + # Initialise the RSyncer and patch the attributes to be tested + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.thread = mock_thread + + # Run 'restart' + rsyncer.restart() + + # Check that the correct calls and attributes are present + mock_thread.join.assert_called_once() + assert not rsyncer._halt_thread + assert isinstance(rsyncer.thread, threading.Thread) + mock_start.assert_called_once() + + +@pytest.mark.parametrize("thread_is_alive", (True, False)) +def test_rsyncer_stop( + tmp_path: Path, + mock_server_url: MagicMock, + thread_is_alive: bool, +): + # Mock the thread + mock_thread = MagicMock() + mock_thread.is_alive.return_value = thread_is_alive + mock_thread.join.return_value = None + + # Mock the queue + mock_queue = MagicMock() + mock_queue.join.return_value = None + mock_queue.put.return_value = None + + # Initialise the RSyncer and patch the attributes to be tested + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.thread = mock_thread + rsyncer.queue = mock_queue + + # Check that initial attributes are as expected + assert not rsyncer._stopping + assert not rsyncer._halt_thread + + # Run 'stop' and check that the calls are as expected + rsyncer.stop() + + assert rsyncer._stopping + assert rsyncer._halt_thread + if thread_is_alive: + mock_queue.join.assert_called_once() + mock_queue.put.assert_called_with(None) + mock_thread.join.assert_called_once() + else: + mock_queue.join.assert_not_called() + mock_queue.put.assert_not_called() + mock_thread.join.assert_not_called() + + +def test_rsyncer_request_stop( + tmp_path: Path, + mock_server_url: MagicMock, +): + # Initialise the RSyncer + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + + # Check that initial attributes are as expected + assert not rsyncer._stopping + assert not rsyncer._halt_thread + + # Run 'request_stop' and check that attributes have changed + rsyncer.request_stop() + assert rsyncer._stopping + assert rsyncer._halt_thread + + +@pytest.fixture +def clem_visit_dir(tmp_path: Path): + visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}" + visit_dir = tmp_path / "local" / visit_name + visit_dir.mkdir(parents=True, exist_ok=True) + return visit_dir + + +@pytest.fixture +def clem_test_files(clem_visit_dir: Path): + # Create test files for the DirWatcher to scan + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Example atlas collection + for s in range(20): + file_list.append( + project_dir + / "Overview 1" + / "Image 1" + / f"Image 1--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif" + ) + + # Example image stack collection + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "TileScan 1" + / "Position 1" + / f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif" + ) + + # Create all files and directories specified + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +@pytest.fixture +def clem_junk_files(clem_visit_dir: Path): + # Create junk files that are to be blacklisted from the CLEM workflow + file_list: list[Path] = [] + project_dir = clem_visit_dir / "images" / "test_grid" + + # Create junk atlas data + for n in range(5): + for s in range(20): + file_list.append( + project_dir + / "Image 1" + / f"Image 1_pmd_{n}" + / f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif" + ) + file_list.append( + project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif" + ) + + # Creat junk image data + for n in range(5): + for c in range(3): + for z in range(10): + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif" + ) + file_list.append( + project_dir + / "Position 1" + / f"Position 1_pmd_{n}" + / "Metadata" + / "Position 1.xlif" + ) + + # Create remaining junk files + for file_path in ( + "1.xlef", + "Metadata/IOManagerConfiguation.xlif", + "Metadata/Overview 1.xlcf", + "Metadata/TileScan 1.xlcf", + "Overview 1/Image 1/Image 1_histo.lof", + "TileScan 1/Position 1/Position 1_histo.lof", + "Overview 1/Image 1/Metadata/Image 1_histo.xlif", + "TileScan 1/Position 1/Metadata/Position 1_histo.xlif", + ): + file_list.append(project_dir / file_path) + + # Create files and directoriees + for file in file_list: + if not file.parent.exists(): + file.parent.mkdir(parents=True) + if not file.exists(): + file.touch() + return sorted(file_list) + + +clem_substrings_blacklist = { + "directories": [ + "_pmd_", + ], + "files": [ + ".xlef", + ".xlcf", + "_histo.lof", + "_histo.xlif", + "IOManagerConfiguation.xlif", + ], +} + + +@pytest.mark.parametrize( + "test_params", + ( + # Workflow type | Use thread? | Use callback function? | Use blacklist? + ("clem", False, False, False), + ("clem", False, False, True), + ("clem", False, True, False), + ("clem", False, True, True), + ("clem", True, False, False), + ("clem", True, False, True), + ("clem", True, True, False), + ("clem", True, True, True), + ), +) +def test_rsyncer_finalise( + mocker: MockerFixture, + rsync_module: str, + mock_server_url: MagicMock, + clem_visit_dir: Path, + clem_test_files: list[Path], + clem_junk_files: list[Path], + test_params: tuple[str, bool, bool, bool], +): + # Unpack test params + workflow_type, use_thread, use_callback, use_blacklist = test_params + + # Create a test end time + timestamp = datetime.now() + + # Mock the class functions/attributes called by the 'finalise' class function + mock_queue = MagicMock() + mock_queue.put.return_value = None + + mock_transfer = mocker.patch.object(RSyncer, "_transfer") + mock_transfer.return_value = True + + mock_stop = mocker.patch.object(RSyncer, "stop") + mock_stop.return_value = None + + mock_process = mocker.patch.object(RSyncer, "_process") + mock_process.return_value = None + + mock_callback = MagicMock(return_value=None) + + # Initialise the RSyncer class based on the workflow type being tested + if workflow_type == "clem": + rsyncer = RSyncer( + basepath_local=clem_visit_dir / "images", + basepath_remote=Path(clem_visit_dir.name) / "images", + rsync_module=rsync_module, + server_url=mock_server_url, + stop_callback=dummy_callback, + substrings_blacklist=clem_substrings_blacklist if use_blacklist else {}, + end_time=timestamp, + ) + # Patch the 'queue' attribute with the mocked one + rsyncer.queue = mock_queue + + # Check the initial state of attributes that will be changed by 'finalise' + assert not rsyncer._remove_files + assert rsyncer._notify + assert rsyncer._end_time == timestamp + assert not rsyncer._finalising + assert not rsyncer._finalised + + # Run the 'finalise' class function with the workflow-specific paths + rsyncer.finalise( + thread=use_thread, + callback=mock_callback if use_callback else None, + ) + + # Check that attributes are set correctly at the start of the function + assert rsyncer._remove_files + assert not rsyncer._notify + assert rsyncer._end_time is None + assert rsyncer._finalising + + # Check that list of files with and without using a blacklist are correct + if use_thread: + for file in clem_test_files: + mock_queue.put.assert_any_call(file) + if not use_blacklist: + for file in clem_junk_files: + mock_queue.put.assert_any_call(file) + else: + transfer_args = mock_transfer.call_args.args + assert sorted(transfer_args[0]) == ( + sorted(clem_test_files) + if use_blacklist + else sorted([*clem_test_files, *clem_junk_files]) + ) + + # Transfer is being mocked, so check that files to transfer are all present + for file in clem_test_files: + assert file.exists() + for file in clem_junk_files: + assert not file.exists() if use_blacklist else file.exists() + + # Check that stop was called the correct number of times depending on the setup + assert mock_stop.call_count == 2 if use_thread else 1 + + # Check that the RSyncer is marked as finalised at the end + assert rsyncer._finalised + + # Check that the callback was set at the end + if use_callback: + mock_callback.assert_called_once() + + +@pytest.mark.parametrize("is_stopping", (True, False)) +def test_rsyncer_enqueue( + tmp_path: Path, + mock_server_url: MagicMock, + is_stopping: bool, +): + # Mock the queue + mock_queue = MagicMock() + mock_queue.put.return_value = None + + # Initialise the RSyncer and patch the attributes used by the test + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer._stopping = is_stopping + rsyncer.queue = mock_queue + + # Run enqueue with a test file and check that the expected calls were made + rsyncer.enqueue(Path("test_file")) + if is_stopping: + mock_queue.put.assert_not_called() + else: + mock_queue.put.assert_called_once_with( + (tmp_path / "local" / "test_file").absolute() + ) + + +def test_rsyncer_flush_skipped( + tmp_path: Path, + mock_server_url: MagicMock, +): + # Mock the queue + mock_queue = MagicMock() + mock_queue.put.return_value = None + + # Create a list of test files + skipped_files = [ + tmp_path / "local" / f"file_{str(n).zfill(2)}.txt" for n in range(20) + ] + + # Initialise the RSyncer and patch the attributes used by the test + rsyncer = RSyncer( + basepath_local=tmp_path / "local", + basepath_remote=tmp_path / "remote", + rsync_module=mock.ANY, + server_url=mock_server_url, + ) + rsyncer.queue = mock_queue + rsyncer._skipped_files = skipped_files + + # Run 'flush_skipped' and check that it works as intended + rsyncer.flush_skipped() + for f in skipped_files: + mock_queue.put.assert_any_call(f) + assert rsyncer._skipped_files == [] diff --git a/tests/util/test_rsync.py b/tests/util/test_rsync.py deleted file mode 100644 index 567c4a4a7..000000000 --- a/tests/util/test_rsync.py +++ /dev/null @@ -1,133 +0,0 @@ -from __future__ import annotations - -from pathlib import Path -from typing import Tuple - -from murfey.util.file_monitor import Monitor -from murfey.util.rsync import RsyncPipe - - -def test_a_simple_rsync_instance(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01]) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - - -def test_rsync_multiple_files(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - f02 = tmp_path / "from" / "file02.txt" - f02.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01, f02]) - assert rp._out.qsize() == 2 - transferred = [rp._out.get()] - transferred.append(rp._out.get()) - assert len(transferred) == 2 - assert set(transferred) == {f01, f02} - - -def test_rsync_a_nonexistant_file(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - f02 = tmp_path / "from" / "file02.txt" - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01, f02], retry=False) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - assert len(rp.failed) == 1 - - -def test_rsync_instance_on_nested_directory_structure(tmp_path): - initial_dir = tmp_path / "from" / "nest" - initial_dir.mkdir(parents=True) - destination = tmp_path / "to" - destination.mkdir() - f01 = initial_dir / "file01.txt" - f01.touch() - rp = RsyncPipe(destination) - rp._run_rsync(tmp_path / "from", [f01]) - assert rp._out.qsize() == 1 - transferred = rp._out.get() - assert transferred == f01 - assert not len(rp.failed) - assert (destination / "nest" / "file01.txt").exists() - - -def test_rsync_pipe_from_monitor(tmp_path): - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert (destination / "file01.txt").exists() - - -def test_rsync_with_additional_structure_without_changing_file_name(tmp_path): - def _new_structure(s: Path, p: Path) -> Tuple[Path, str]: - new_name = p.name - new_dest = s / "extra" - return new_dest, new_name - - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - (destination / "extra").mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination, destination_structure=_new_structure) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert (destination / "extra" / "file01.txt").exists() - - -def test_rsync_with_changed_file_name(tmp_path): - def _new_structure(s: Path, p: Path) -> Tuple[Path, str]: - new_name = p.name.replace("01", "05") - return s, new_name - - (tmp_path / "from").mkdir() - destination = tmp_path / "to" - destination.mkdir() - f01 = tmp_path / "from" / "file01.txt" - f01.touch() - monitor = Monitor(tmp_path / "from") - monitor.process(in_thread=True, sleep=0.1) - rp = RsyncPipe(destination, destination_structure=_new_structure) - monitor >> rp - rp.process(in_thread=True) - assert rp.thread - monitor.stop() - monitor.wait() - rp.wait() - assert not (destination / "file01.txt").exists() - assert (destination / "file05.txt").exists()