Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ def _start_rsyncer(
stop_callback=self._rsyncer_stopped,
do_transfer=self.do_transfer,
remove_files=remove_files,
substrings_blacklist=self._machine_config.get(
"substrings_blacklist", {"directories": [], "files": []}
),
end_time=self.visit_end_time,
)

Expand Down
96 changes: 66 additions & 30 deletions src/murfey/client/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import os
import queue
import shutil
import subprocess
import threading
import time
Expand Down Expand Up @@ -61,33 +62,32 @@ def __init__(
do_transfer: bool = True,
remove_files: bool = False,
required_substrings_for_removal: List[str] = [],
substrings_blacklist: dict[str, list[str]] = {},
notify: bool = True,
end_time: datetime | None = None,
):
super().__init__()
self._basepath = basepath_local.absolute()
self._basepath_remote = basepath_remote
self._rsync_module = rsync_module
self._server_url = server_url
self._stop_callback = stop_callback
self._local = local
self._do_transfer = do_transfer
self._remove_files = remove_files
self._required_substrings_for_removal = required_substrings_for_removal
self._stop_callback = stop_callback
self._local = local
self._server_url = server_url
self._substrings_blacklist = substrings_blacklist
self._notify = notify
self._finalised = False
self._end_time = end_time
self._finalising = False

self._skipped_files: List[Path] = []

# Set rsync destination
if local:
self._remote = str(basepath_remote)
else:
self._remote = (
f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/"
)
self._remote = (
str(basepath_remote)
if local
else f"{server_url.hostname}::{self._rsync_module}/{basepath_remote}/"
)
logger.debug(f"rsync destination path set to {self._remote}")

# For local tests you can use something along the lines of
Expand All @@ -105,9 +105,24 @@ def __init__(
)
self._stopping = False
self._halt_thread = False
self._finalising = False
self._finalised = False

@property
def status(self) -> str:
if self._stopping:
if self.thread.is_alive():
return "stopping"
else:
return "finished"
else:
if self.thread.is_alive():
return "running"
else:
return "ready"

def __repr__(self) -> str:
return f"<RSyncer ({self._basepath} → {self._remote}) [{self.status}]"
return f"<RSyncer ({self._basepath} → {self._remote})>"

@classmethod
def from_rsyncer(cls, rsyncer: RSyncer, **kwargs):
Expand All @@ -133,19 +148,6 @@ def from_rsyncer(cls, rsyncer: RSyncer, **kwargs):
notify=kwarguments_from_rsyncer["notify"],
)

@property
def status(self) -> str:
if self._stopping:
if self.thread.is_alive():
return "stopping"
else:
return "finished"
else:
if self.thread.is_alive():
return "running"
else:
return "ready"

def notify(self, *args, secondary: bool = False, **kwargs) -> None:
if self._notify:
super().notify(*args, secondary=secondary, **kwargs)
Expand All @@ -169,7 +171,7 @@ def restart(self):
self.start()

def stop(self):
logger.debug("RSync thread stop requested")
logger.info(f"Stopping RSync thread {self}")
self._stopping = True
if self.thread.is_alive():
logger.info("Waiting for ongoing transfers to complete...")
Expand All @@ -179,7 +181,7 @@ def stop(self):
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
logger.debug("RSync thread successfully stopped")
logger.info(f"RSync thread {self} successfully stopped")

def request_stop(self):
self._stopping = True
Expand All @@ -195,18 +197,52 @@ def finalise(
self._notify = False
self._end_time = None
self._finalising = True

# Perform recursive cleanup on current directory
logger.info(f"Starting file cleanup for RSync thread {self}")
files_to_transfer: list[Path] = []

def recursive_cleanup(dirpath: str | Path):
for entry in os.scandir(dirpath):
if entry.is_dir():
# Recursively delete directories with blacklisted substrings
if any(
pattern in entry.name
for pattern in self._substrings_blacklist.get("directories", [])
):
logger.debug(f"Deleting blacklisted directory {entry.path}")
shutil.rmtree(entry.path)
continue
# Recursively search in whitelisted ones
recursive_cleanup(entry.path)
elif entry.is_file():
# Delete blacklisted files
if any(
pattern in entry.name
for pattern in self._substrings_blacklist.get("files", [])
):
logger.debug(f"Deleting blacklisted file {entry.path}")
Path(entry.path).unlink()
continue
# Append others for transfer
files_to_transfer.append(Path(entry.path))

recursive_cleanup(self._basepath)
logger.debug(f"Number of files to transfer: {len(files_to_transfer)}")

if thread:
self.thread = threading.Thread(
name=f"RSync finalisation {self._basepath}:{self._remote}",
target=self._process,
daemon=True,
)
for f in self._basepath.glob("**/*"):
for f in files_to_transfer:
self.queue.put(f)
self.stop()
else:
self._transfer(list(self._basepath.glob("**/*")))
self._transfer(files_to_transfer)
self._finalised = True
logger.info(f"File cleanup for RSync thread {self} successfully completed")
if callback:
callback()

Expand All @@ -221,7 +257,7 @@ def flush_skipped(self):
self._skipped_files = []

def _process(self):
logger.info("RSync thread starting")
logger.info(f"Starting main process loop for RSync thread {self}")
files_to_transfer: list[Path]
backoff = 0
while not self._halt_thread:
Expand Down
8 changes: 4 additions & 4 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def _scan_directory(
entry_name = os.path.join(path, entry.name)
# Skip any directories with matching blacklisted substrings
if entry.is_dir() and any(
char in entry.name
for char in self._substrings_blacklist.get("directories", [])
pattern in entry.name
for pattern in self._substrings_blacklist.get("directories", [])
):
log.debug(f"Skipping blacklisted directory {str(entry.name)!r}")
continue
Expand All @@ -262,8 +262,8 @@ def _scan_directory(
continue
# Exclude files with blacklisted substrings
if any(
char in entry.name
for char in self._substrings_blacklist.get("files", [])
pattern in entry.name
for pattern in self._substrings_blacklist.get("files", [])
):
log.debug(f"Skipping blacklisted file {str(entry.name)!r}")
continue
Expand Down
172 changes: 0 additions & 172 deletions src/murfey/util/rsync.py

This file was deleted.

Loading