Skip to content
9 changes: 9 additions & 0 deletions BOTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ Tests use pytest. Example commands:
./venv/bin/python -m pytest src/toil/test -k "safe" -v
```

## Running Make Targets (mypy, tests, etc.)

The `Makefile` targets require the virtualenv to be activated. Some targets (like `test_debug`) enforce this with a `check_venv` guard that checks for `VIRTUAL_ENV` in the environment. Set `PATH` and `VIRTUAL_ENV` to satisfy this without needing `source`:

```bash
PATH="./venv/bin:$PATH" VIRTUAL_ENV=./venv make mypy
PATH="./venv/bin:$PATH" VIRTUAL_ENV=./venv make test_debug tests='src/toil/test/path/to/test.py::TestClass::test_name'
```

## Running Individual WDL Spec Unit Tests

The WDL spec embeds example workflows as unit tests (under the `wdl-1.1` and `wdl-1.2` branches of `https://github.com/openwdl/wdl`). `TestWDLConformance.test_single_unit_test` in `src/toil/test/wdl/wdltoil_test.py` runs one such test at a time and is controlled by environment variables:
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ pytest-xdist
build
check-jsonschema
strip_ansi==0.1.1
edit_distance>=1.0.7,<2
21 changes: 19 additions & 2 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def getLocalTempFileName(

# Functions related to reading, writing and removing files to/from the job store
@abstractmethod
def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
def writeGlobalFile(self, localFileName: str, cleanup: bool = False, hints: list[str] | None = None) -> FileID:
"""
Upload a file (as a path) to the job store.

Expand All @@ -319,6 +319,11 @@ def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
:param cleanup: if True then the copy of the global file will be deleted once
the job and all its successors have completed running. If not the global
file must be deleted manually.
:param hints: String values such as a workflow names or task names that
should be used to store the file at a human-findable location.
Large numbers of files stored under the same non-empty hints
may be inefficient; hints are intended for human-navigable
categorization, not for high-throughput bulk file creation.

:return: an ID that can be used to retrieve the file.
"""
Expand All @@ -329,6 +334,7 @@ def writeGlobalFileStream(
self,
cleanup: bool = False,
basename: str | None = None,
hints: list[str] | None = None,
encoding: str | None = None,
errors: str | None = None,
) -> Iterator[tuple[WriteWatchingStream, FileID]]:
Expand All @@ -349,12 +355,23 @@ def writeGlobalFileStream(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: String values such as a workflow names or task names that
should be used to store the file at a human-findable location.
Large numbers of files stored under the same non-empty hints
may be inefficient; hints are intended for human-navigable
categorization, not for high-throughput bulk file creation.

:return: A context manager yielding a tuple of
1) a file handle which can be written to and
2) the toil.fileStores.FileID of the resulting file in the job store.
"""
with self.jobStore.write_file_stream(
str(self.jobDesc.jobStoreID), cleanup, basename, encoding, errors
str(self.jobDesc.jobStoreID),
cleanup=cleanup,
basename=basename,
hints=hints,
encoding=encoding,
errors=errors,
) as (backingStream, fileStoreID):
# We have a string version of the file ID, and the backing stream.
# We need to yield a stream the caller can write to, and a FileID
Expand Down
4 changes: 2 additions & 2 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,7 @@ def open(self, job: Job) -> Generator[None, None, None]:
# its temp dir and database entry.
self._deallocateSpaceForJob()

def writeGlobalFile(self, localFileName, cleanup=False):
def writeGlobalFile(self, localFileName, cleanup=False, hints=None):
"""
Creates a file in the jobstore and returns a FileID reference.
"""
Expand All @@ -1224,7 +1224,7 @@ def writeGlobalFile(self, localFileName, cleanup=False):
# Make sure to pass along the file basename.
# TODO: this empty file could leak if we die now...
fileID = self.jobStore.get_empty_file_store_id(
creatorID, cleanup, os.path.basename(localFileName)
creatorID, cleanup, os.path.basename(localFileName), hints=hints
)
# Work out who we are
with self.as_process() as me:
Expand Down
4 changes: 2 additions & 2 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def open(self, job: Job) -> Generator[None, None, None]:
self.jobStateFile,
)

def writeGlobalFile(self, localFileName: str, cleanup: bool = False) -> FileID:
def writeGlobalFile(self, localFileName: str, cleanup: bool = False, hints: list[str] | None = None) -> FileID:
absLocalFileName = self._resolveAbsoluteLocalPath(localFileName)
creatorID = str(self.jobDesc.jobStoreID)
fileStoreID = self.jobStore.write_file(absLocalFileName, creatorID, cleanup)
fileStoreID = self.jobStore.write_file(absLocalFileName, creatorID, cleanup, hints=hints)
if absLocalFileName.startswith(self.localTempDir):
# Only files in the appropriate directory should become local files
# we can delete with deleteLocalFile
Expand Down
93 changes: 88 additions & 5 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,18 +1088,60 @@ def jobs(self) -> Iterator[JobDescription]:
# associated with a given job.
##########################################

# Characters allowed in sanitized hints: these survive quote() unchanged
# and are safe in filesystem paths and S3 keys.
HINT_SAFE_RE = re.compile(r"[^a-zA-Z0-9_\-.]")
# Maximum length of a single sanitized hint path component.
MAX_HINT_LENGTH = 40
# Maximum total length of the hints portion of the path (joined with /).
MAX_HINTS_PATH_LENGTH = 120

def _sanitize_hints(self, hints: list[str] | None) -> list[str]:
"""
Turn user-supplied hints into path-safe components usable as
directory names on a filesystem or key segments in an object store.

Drops empty hints and hints that become empty after sanitization.
Truncates individual hints and the overall joined path to bounded
lengths so that the resulting file ID stays under a usable size.
"""
if not hints:
return []
result: list[str] = []
total_length = 0
for hint in hints:
sanitized = self.HINT_SAFE_RE.sub("", hint)
sanitized = sanitized[: self.MAX_HINT_LENGTH]
if not sanitized:
continue
# +1 for the separator between components
if total_length + len(sanitized) + (1 if result else 0) > self.MAX_HINTS_PATH_LENGTH:
break
total_length += len(sanitized) + (1 if result else 0)
result.append(sanitized)
return result

# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="write_file")
def writeFile(
self,
localFilePath: str,
jobStoreID: str | None = None,
cleanup: bool = False,
) -> str:
return self.write_file(localFilePath, jobStoreID, cleanup)
return self.write_file(
localFilePath,
job_id=jobStoreID,
cleanup=cleanup
)

@abstractmethod
def write_file(
self, local_path: str, job_id: str | None = None, cleanup: bool = False
self,
local_path: str,
job_id: str | None = None,
cleanup: bool = False,
hints: list[str] | None = None,
) -> str:
"""
Takes a file (as a path) and places it in this job store. Returns an ID that can be used
Expand All @@ -1118,6 +1160,15 @@ def write_file(
whose jobStoreID was given as jobStoreID is deleted with
jobStore.delete(job). If jobStoreID was not given, does nothing.

:param hints: String values such as a workflow names or task names that
should be used to store the file at a human-findable location.
Two files with the same basename and same hints are still
guaranteed to never collide and to have distinct assigned IDs.
Large numbers of files stored under the same non-empty hints
may be inefficient, as slot allocation scans existing entries;
hints are intended for human-navigable categorization, not for
high-throughput bulk file creation.

:raise ConcurrentFileModificationException: if the file was modified concurrently during
an invocation of this method

Expand All @@ -1131,6 +1182,7 @@ def write_file(
"""
raise NotImplementedError()

# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="write_file_stream")
def writeFileStream(
self,
Expand All @@ -1140,14 +1192,21 @@ def writeFileStream(
encoding: str | None = None,
errors: str | None = None,
) -> ContextManager[tuple[IO[bytes], str]]:
return self.write_file_stream(jobStoreID, cleanup, basename, encoding, errors)
return self.write_file_stream(
jobStoreID,
cleanup=cleanup,
basename=basename,
encoding=encoding,
errors=errors
)

@abstractmethod
@contextmanager
def write_file_stream(
self,
job_id: str | None = None,
cleanup: bool = False,
hints: list[str] | None = None,
basename: str | None = None,
encoding: str | None = None,
errors: str | None = None,
Expand All @@ -1171,6 +1230,15 @@ def write_file_stream(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: String values such as a workflow names or task names that
should be used to store the file at a human-findable location.
Two files with the same basename and same hints are still
guaranteed to never collide and to have distinct assigned IDs.
Large numbers of files stored under the same non-empty hints
may be inefficient, as slot allocation scans existing entries;
hints are intended for human-navigable categorization, not for
high-throughput bulk file creation.

:param str encoding: the name of the encoding used to encode the file. Encodings are the same
as for encode(). Defaults to None which represents binary mode.

Expand All @@ -1189,22 +1257,28 @@ def write_file_stream(
:rtype: Iterator[Tuple[IO[bytes], str]]
"""
raise NotImplementedError()


# Don't add any new arguments to this old version; make people use the new one!
@deprecated(new_function_name="get_empty_file_store_id")
def getEmptyFileStoreID(
self,
jobStoreID: str | None = None,
cleanup: bool = False,
basename: str | None = None,
) -> str:
return self.get_empty_file_store_id(jobStoreID, cleanup, basename)
return self.get_empty_file_store_id(
job_id=jobStoreID,
cleanup=cleanup,
basename=basename
)

@abstractmethod
def get_empty_file_store_id(
self,
job_id: str | None = None,
cleanup: bool = False,
basename: str | None = None,
hints: list[str] | None = None,
) -> str:
"""
Creates an empty file in the job store and returns its ID.
Expand All @@ -1221,6 +1295,15 @@ def get_empty_file_store_id(
file basename so that when searching the job store with a query
matching that basename, the file will be detected.

:param hints: String values such as a workflow names or task names that
should be used to store the file at a human-findable location.
Two files with the same basename and same hints are still
guaranteed to never collide and to have distinct assigned IDs.
Large numbers of files stored under the same non-empty hints
may be inefficient, as slot allocation scans existing entries;
hints are intended for human-navigable categorization, not for
high-throughput bulk file creation.

:return: a jobStoreFileID that references the newly created file and can be used to reference the
file in the future.
:rtype: str
Expand Down
Loading
Loading