Skip to content

Commit 5e89a4b

Browse files
bpiwowarclaude
andcommitted
fix: detect missing files when resource path coincides with datapath
Single-resource ArchiveDownloader returns dataset.datapath as its path, which always exists (holds .state.json), so the "COMPLETE but files missing" and "adopt pre-existing" checks were fooled into skipping downloads. - Add Resource.files_present() with FolderResource override that checks for actual content beyond metadata files when path == datapath - Fix _move_path to merge directory contents instead of nesting when destination already exists - Add 5 regression tests covering re-download and transient scenarios Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c9663fa commit 5e89a4b

3 files changed

Lines changed: 217 additions & 4 deletions

File tree

src/datamaestro/definitions.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,24 @@ def _delete_path(path: Path) -> None:
125125

126126

127127
def _move_path(src: Path, dst: Path) -> None:
128-
"""Move a file or directory from src to dst."""
129-
if src.exists():
128+
"""Move a file or directory from src to dst.
129+
130+
When dst is an existing directory, merges src contents into dst
131+
(shutil.move would nest src inside dst, which is not what we want).
132+
"""
133+
if not src.exists():
134+
return
135+
136+
if dst.is_dir() and src.is_dir():
137+
# Merge contents of src into dst
138+
for child in src.iterdir():
139+
child_dst = dst / child.name
140+
if child_dst.exists():
141+
_delete_path(child_dst)
142+
shutil.move(str(child), str(child_dst))
143+
# Remove now-empty src
144+
shutil.rmtree(src, ignore_errors=True)
145+
else:
130146
dst.parent.mkdir(parents=True, exist_ok=True)
131147
shutil.move(str(src), str(dst))
132148

@@ -402,7 +418,7 @@ def _download_locked(self, force, ResourceState):
402418

403419
if current_state == ResourceState.COMPLETE and not force:
404420
# Verify files are actually present on disk
405-
if resource.has_files() and not resource.path.exists():
421+
if resource.has_files() and not resource.files_present():
406422
logging.warning(
407423
"Resource %s marked COMPLETE but files "
408424
"missing at %s — re-downloading",
@@ -420,7 +436,7 @@ def _download_locked(self, force, ResourceState):
420436
current_state == ResourceState.NONE
421437
and not force
422438
and resource.has_files()
423-
and resource.path.exists()
439+
and resource.files_present()
424440
):
425441
logging.info(
426442
"Resource %s already exists at %s — marking COMPLETE",

src/datamaestro/download/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,16 @@ def has_files(self) -> bool:
346346
"""
347347
return True
348348

349+
def files_present(self) -> bool:
350+
"""Whether this resource's output files actually exist on disk.
351+
352+
Used by the download orchestrator to detect COMPLETE resources
353+
whose files have been removed. Default checks ``path.exists()``.
354+
Subclasses may override for more thorough checks (e.g., when
355+
``path`` coincides with ``dataset.datapath``).
356+
"""
357+
return self.path.exists()
358+
349359
# Backward compat alias
350360
def hasfiles(self) -> bool:
351361
"""Deprecated: use has_files() instead."""
@@ -554,6 +564,8 @@ class FolderResource(Resource):
554564
the given destination (which is ``self.transient_path``).
555565
"""
556566

567+
_METADATA_FILES = {".state.json", ".state.json.tmp", ".state.lock"}
568+
557569
@property
558570
def path(self) -> Path:
559571
"""Final path to the produced directory.
@@ -562,6 +574,27 @@ def path(self) -> Path:
562574
"""
563575
return self.dataset.datapath / self.name
564576

577+
def files_present(self) -> bool:
578+
"""Check that the directory exists and has content.
579+
580+
When ``path`` coincides with ``dataset.datapath`` (e.g.,
581+
single-resource ArchiveDownloader), the directory always
582+
exists because it holds metadata files (.state.json).
583+
In that case, verify it contains at least one non-metadata entry.
584+
"""
585+
p = self.path
586+
if not p.exists():
587+
return False
588+
589+
if p != self.dataset.datapath:
590+
return True
591+
592+
# path == datapath: check for actual content beyond metadata
593+
for child in p.iterdir():
594+
if child.name not in self._METADATA_FILES:
595+
return True
596+
return False
597+
565598
@property
566599
def transient_path(self) -> Path:
567600
"""Temporary path for writing during download.

src/datamaestro/test/test_resource.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,3 +1655,167 @@ class Squad(Base):
16551655
ann = dataset_dec(base=Squad, url="http://test.com")
16561656
dw = DatasetWrapper(ann, Squad)
16571657
assert dw.id == "stanford.qa.squad"
1658+
1659+
1660+
# ==== Redownload with folder-path == datapath ====
1661+
1662+
1663+
class FolderAtDatapath(FolderResource):
1664+
"""A FolderResource whose path collapses to dataset.datapath.
1665+
1666+
Simulates ArchiveDownloader with a single resource,
1667+
where ``path`` returns ``dataset.datapath`` instead of a subdirectory.
1668+
"""
1669+
1670+
def __init__(self, **kw):
1671+
super().__init__(**kw)
1672+
self._download_called = False
1673+
1674+
@property
1675+
def path(self) -> Path:
1676+
return self.dataset.datapath
1677+
1678+
def _download(self, destination: Path):
1679+
destination.mkdir(parents=True, exist_ok=True)
1680+
(destination / "data.tsv").write_text("col1\tcol2\n")
1681+
self._download_called = True
1682+
1683+
1684+
class TestRedownloadFolderAtDatapath:
1685+
"""Regression: single-resource FolderResource whose path == datapath.
1686+
1687+
When resource.path is the same as dataset.datapath, the directory
1688+
always exists (it holds .state.json), so the "COMPLETE but files
1689+
missing" check must look deeper than just path.exists().
1690+
"""
1691+
1692+
def test_redownload_when_content_missing(self, dataset):
1693+
"""COMPLETE folder resource at datapath re-downloads when empty."""
1694+
r = FolderAtDatapath()
1695+
r.bind("DATA", dataset)
1696+
1697+
dataset.ordered_resources = [r]
1698+
_compute_dependents(dataset.resources)
1699+
1700+
# Simulate: state is COMPLETE but actual content is gone
1701+
# (datapath exists because .state.json is stored there)
1702+
dataset.datapath.mkdir(parents=True, exist_ok=True)
1703+
r.state = ResourceState.COMPLETE
1704+
assert r.path.exists() # datapath exists (has .state.json)
1705+
assert not (r.path / "data.tsv").exists() # but no data files
1706+
1707+
dataset.download()
1708+
1709+
assert r._download_called is True
1710+
assert r.state == ResourceState.COMPLETE
1711+
assert (r.path / "data.tsv").exists()
1712+
1713+
def test_no_redownload_when_content_present(self, dataset):
1714+
"""COMPLETE folder resource at datapath is NOT re-downloaded."""
1715+
r = FolderAtDatapath()
1716+
r.bind("DATA", dataset)
1717+
1718+
dataset.ordered_resources = [r]
1719+
_compute_dependents(dataset.resources)
1720+
1721+
# Simulate: state is COMPLETE and actual content is present
1722+
dataset.datapath.mkdir(parents=True, exist_ok=True)
1723+
(dataset.datapath / "data.tsv").write_text("existing\n")
1724+
r.state = ResourceState.COMPLETE
1725+
1726+
dataset.download()
1727+
1728+
assert r._download_called is False
1729+
assert r.state == ResourceState.COMPLETE
1730+
1731+
def _make_transient_dag(self, dataset):
1732+
"""Helper: transient source -> dependent file resource."""
1733+
source = DummyFolderResource(transient=True)
1734+
source.bind("SOURCE", dataset)
1735+
1736+
dependent = DummyFileResource("result.txt")
1737+
dependent._dependencies = [source]
1738+
dependent.bind("RESULT", dataset)
1739+
1740+
dataset.ordered_resources = [source, dependent]
1741+
_compute_dependents(dataset.resources)
1742+
return source, dependent
1743+
1744+
def test_transient_first_download(self, dataset):
1745+
"""First prep: transient source is downloaded then cleaned up."""
1746+
source, dependent = self._make_transient_dag(dataset)
1747+
1748+
dataset.download()
1749+
1750+
# Both should have been downloaded
1751+
assert source._download_called is True
1752+
assert dependent._download_called is True
1753+
assert dependent.path.exists()
1754+
assert dependent.state == ResourceState.COMPLETE
1755+
1756+
# Transient source was cleaned up after dependent completed
1757+
assert source.state == ResourceState.NONE
1758+
assert not source.path.exists()
1759+
1760+
def test_transient_second_download_all_ok(self, dataset):
1761+
"""Second prep: transient source is skipped (dependent is COMPLETE)."""
1762+
source, dependent = self._make_transient_dag(dataset)
1763+
1764+
# First download
1765+
dataset.download()
1766+
assert source._download_called is True
1767+
assert dependent._download_called is True
1768+
1769+
# Reset call flags
1770+
source._download_called = False
1771+
dependent._download_called = False
1772+
1773+
# Second download — everything is already done
1774+
dataset.download()
1775+
1776+
# Transient source skipped (all dependents COMPLETE)
1777+
assert source._download_called is False
1778+
# Dependent still COMPLETE, not re-downloaded
1779+
assert dependent._download_called is False
1780+
assert dependent.state == ResourceState.COMPLETE
1781+
1782+
def test_transient_second_download_after_first_failure(self, dataset):
1783+
"""Second prep after first failed: transient re-downloads."""
1784+
source = DummyFolderResource(transient=True)
1785+
source.bind("SOURCE", dataset)
1786+
1787+
# Use a resource that will fail on first attempt
1788+
dependent = FailingResource("result.txt")
1789+
dependent._dependencies = [source]
1790+
dependent.bind("RESULT", dataset)
1791+
1792+
dataset.ordered_resources = [source, dependent]
1793+
_compute_dependents(dataset.resources)
1794+
1795+
# First download — source succeeds, dependent fails
1796+
result = dataset.download()
1797+
assert result is False
1798+
assert source._download_called is True
1799+
# Source is COMPLETE but NOT cleaned up (dependent not COMPLETE)
1800+
assert source.state == ResourceState.COMPLETE
1801+
assert dependent.state == ResourceState.NONE
1802+
1803+
# Now replace the failing resource with a working one for retry
1804+
source._download_called = False
1805+
good_dependent = DummyFileResource("result.txt")
1806+
good_dependent._dependencies = [source]
1807+
# Re-bind: remove old, add new
1808+
del dataset.resources["RESULT"]
1809+
good_dependent.bind("RESULT", dataset)
1810+
dataset.ordered_resources = [source, good_dependent]
1811+
_compute_dependents(dataset.resources)
1812+
1813+
dataset.download()
1814+
1815+
# Source is already COMPLETE, not re-downloaded
1816+
assert source._download_called is False
1817+
# Dependent should now succeed
1818+
assert good_dependent._download_called is True
1819+
assert good_dependent.state == ResourceState.COMPLETE
1820+
# Transient source cleaned up now that dependent is COMPLETE
1821+
assert source.state == ResourceState.NONE

0 commit comments

Comments
 (0)