Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
run: uv run pre-commit run --all-files

build-n-test:
name: Build and test
name: Build and test (Daft native runner)
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -40,8 +40,10 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --all-extras
- name: Run unit tests + benchmarks
run: uv run pytest -m "not integration" --benchmark-json output.json
- name: Run unit tests + benchmarks (Daft native runner)
run: uv run pytest -m "not distributed and not integration" --benchmark-json output.json
env:
DAFT_RUNNER: native
# Download previous benchmark result from cache (if exists)
- name: Download previous benchmark data
uses: actions/cache@v4
Expand All @@ -61,3 +63,27 @@ jobs:

# Enable Job Summary for PRs
summary-always: true

distributed-tests:
name: Build and distributed test (Daft Ray runner)
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10"]
daft-runner: [ray]
timeout-minutes: 45
steps:
- name: "checkout repository"
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --all-extras
- name: Run distributed tests (Daft distributed runner)
run: uv run pytest -m "distributed" --tb=short
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
6 changes: 3 additions & 3 deletions deltacat/tests/storage/main/test_main_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7628,7 +7628,7 @@ def test_download_delta_distributed_error_handling(self):
)

# ========== DAFT DISTRIBUTED TESTS ==========
@pytest.mark.integration
@pytest.mark.distributed
def test_download_delta_distributed_daft_basic(self):
"""Test basic distributed download with DAFT dataset type."""

Expand Down Expand Up @@ -7685,7 +7685,7 @@ def test_download_delta_distributed_daft_basic(self):
), "Column names mismatch"
pd.testing.assert_frame_equal(downloaded_df, expected_df)

@pytest.mark.integration
@pytest.mark.distributed
def test_download_delta_distributed_daft_with_delta_locator(self):
"""Test DAFT distributed download using DeltaLocator instead of Delta object."""

Expand Down Expand Up @@ -7725,7 +7725,7 @@ def test_download_delta_distributed_daft_with_delta_locator(self):
expected_df = test_data.sort_values("id").reset_index(drop=True)
pd.testing.assert_frame_equal(downloaded_df, expected_df)

@pytest.mark.integration
@pytest.mark.distributed
def test_download_delta_distributed_daft_vs_ray_consistency(self):
"""Test that DAFT and Ray distributed downloads return the same data."""

Expand Down
8 changes: 7 additions & 1 deletion deltacat/tests/utils/test_daft.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ def test_read_from_local_single_column_with_row_groups(self):
self.assertEqual(table.num_rows, 10)


@pytest.mark.integration
class TestFilesToDataFrame(unittest.TestCase):
MVP_PATH = "deltacat/tests/utils/data/mvp.parquet"

@pytest.mark.distributed
def test_read_local_files_all_columns(self):
df = files_to_dataframe(
uris=[self.MVP_PATH],
Expand All @@ -178,6 +178,7 @@ def test_read_local_files_all_columns(self):
self.assertEqual(table.schema.names, ["a", "b"])
self.assertEqual(table.num_rows, 100)

@pytest.mark.distributed
def test_read_local_files_with_column_selection(self):
df = files_to_dataframe(
uris=[self.MVP_PATH],
Expand All @@ -191,6 +192,7 @@ def test_read_local_files_with_column_selection(self):
self.assertEqual(table.schema.names, ["b"])
self.assertEqual(table.num_rows, 100)

@pytest.mark.distributed
def test_read_local_files_does_not_materialize_by_default(self):
df = files_to_dataframe(
uris=[self.MVP_PATH],
Expand All @@ -206,6 +208,7 @@ def test_read_local_files_does_not_materialize_by_default(self):
df.collect()
self.assertEqual(len(df), 100)

@pytest.mark.distributed
def test_supports_unescaped_tsv_content_type(self):
# Test that UNESCAPED_TSV is now supported (was previously unsupported)
# Use a CSV file since we're testing TSV reader functionality
Expand All @@ -222,6 +225,7 @@ def test_supports_unescaped_tsv_content_type(self):
self.assertGreater(table.num_rows, 0)
self.assertGreater(len(table.schema.names), 0)

@pytest.mark.distributed
def test_supports_gzip_content_encoding(self):
# Test that GZIP encoding is now supported (was previously unsupported)
df = files_to_dataframe(
Expand Down Expand Up @@ -259,6 +263,7 @@ def test_raises_error_if_not_supported_content_encoding(self):
),
)

@pytest.mark.distributed
def test_accepts_custom_kwargs(self):
# Test that custom kwargs are passed through to daft.read_parquet
df = files_to_dataframe(
Expand All @@ -274,6 +279,7 @@ def test_accepts_custom_kwargs(self):
self.assertEqual(table.schema.names, ["a", "b"])
self.assertEqual(table.num_rows, 100)

@pytest.mark.distributed
def test_accepts_io_config(self):
# Test that io_config parameter is accepted and passed correctly
df = files_to_dataframe(
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[pytest]
markers =
integration: integration tests (deselect with '-m "not integration"')
distributed: tests that require distributed runner (Ray/Daft distributed)
native: tests that require native runner (for future use)
serial