|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +# or more contributor license agreements. See the NOTICE file |
| 3 | +# distributed with this work for additional information |
| 4 | +# regarding copyright ownership. The ASF licenses this file |
| 5 | +# to you under the Apache License, Version 2.0 (the |
| 6 | +# "License"); you may not use this file except in compliance |
| 7 | +# with the License. You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, |
| 12 | +# software distributed under the License is distributed on an |
| 13 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +# KIND, either express or implied. See the License for the |
| 15 | +# specific language governing permissions and limitations |
| 16 | +# under the License. |
| 17 | + |
| 18 | +"""End-to-end tests for Paimon system tables via pypaimon SQL. |
| 19 | +
|
| 20 | +Exercises the `<table>$<system_name>` syntax handled by paimon-rust |
| 21 | +DataFusion integration. A non-partitioned table with one snapshot is |
| 22 | +created in setUpClass and queried by each test. |
| 23 | +""" |
| 24 | + |
| 25 | +import os |
| 26 | +import tempfile |
| 27 | +import unittest |
| 28 | + |
| 29 | +import pyarrow as pa |
| 30 | + |
| 31 | + |
| 32 | +WAREHOUSE = os.environ.get("PAIMON_TEST_WAREHOUSE") |
| 33 | +TABLE_NAME = "sql_system_test_table" |
| 34 | +TABLE_FQN = f"default.{TABLE_NAME}" |
| 35 | +ROW_COUNT = 3 |
| 36 | + |
| 37 | + |
| 38 | +class SQLSystemTableTest(unittest.TestCase): |
| 39 | + |
| 40 | + @classmethod |
| 41 | + def setUpClass(cls): |
| 42 | + from pypaimon import CatalogFactory, Schema |
| 43 | + from pypaimon.schema.data_types import AtomicType, DataField |
| 44 | + from pypaimon.sql.sql_context import SQLContext |
| 45 | + |
| 46 | + cls._tmpdir = None |
| 47 | + if WAREHOUSE: |
| 48 | + cls.warehouse = WAREHOUSE |
| 49 | + else: |
| 50 | + cls._tmpdir = tempfile.TemporaryDirectory(prefix="paimon-sql-systest-") |
| 51 | + cls.warehouse = cls._tmpdir.name |
| 52 | + |
| 53 | + catalog = CatalogFactory.create({"warehouse": cls.warehouse}) |
| 54 | + catalog.create_database("default", ignore_if_exists=True) |
| 55 | + catalog.drop_table(TABLE_FQN, ignore_if_not_exists=True) |
| 56 | + schema = Schema( |
| 57 | + fields=[ |
| 58 | + DataField(0, "id", AtomicType("INT")), |
| 59 | + DataField(1, "name", AtomicType("STRING")), |
| 60 | + ], |
| 61 | + primary_keys=[], |
| 62 | + partition_keys=[], |
| 63 | + options={}, |
| 64 | + comment="", |
| 65 | + ) |
| 66 | + catalog.create_table(TABLE_FQN, schema, ignore_if_exists=False) |
| 67 | + |
| 68 | + table = catalog.get_table(TABLE_FQN) |
| 69 | + write_builder = table.new_batch_write_builder() |
| 70 | + table_write = write_builder.new_write() |
| 71 | + table_commit = write_builder.new_commit() |
| 72 | + try: |
| 73 | + pa_table = pa.table({ |
| 74 | + "id": pa.array([1, 2, 3], type=pa.int32()), |
| 75 | + "name": pa.array(["alice", "bob", "carol"], type=pa.string()), |
| 76 | + }) |
| 77 | + table_write.write_arrow(pa_table) |
| 78 | + table_commit.commit(table_write.prepare_commit()) |
| 79 | + finally: |
| 80 | + table_write.close() |
| 81 | + table_commit.close() |
| 82 | + |
| 83 | + cls.ctx = SQLContext() |
| 84 | + cls.ctx.register_catalog("paimon", {"warehouse": cls.warehouse}) |
| 85 | + cls.ctx.set_current_catalog("paimon") |
| 86 | + cls.ctx.set_current_database("default") |
| 87 | + |
| 88 | + @classmethod |
| 89 | + def tearDownClass(cls): |
| 90 | + from pypaimon import CatalogFactory |
| 91 | + catalog = CatalogFactory.create({"warehouse": cls.warehouse}) |
| 92 | + catalog.drop_table(TABLE_FQN, ignore_if_not_exists=True) |
| 93 | + if cls._tmpdir is not None: |
| 94 | + cls._tmpdir.cleanup() |
| 95 | + |
| 96 | + def _query(self, system_name: str) -> pa.Table: |
| 97 | + return self.ctx.sql(f"SELECT * FROM {TABLE_NAME}${system_name}") |
| 98 | + |
| 99 | + def test_options_system_table(self): |
| 100 | + table = self._query("options") |
| 101 | + self.assertListEqual(table.schema.names, ["key", "value"]) |
| 102 | + |
| 103 | + def test_schemas_system_table(self): |
| 104 | + table = self._query("schemas") |
| 105 | + self.assertListEqual( |
| 106 | + table.schema.names, |
| 107 | + ["schema_id", "fields", "partition_keys", "primary_keys", |
| 108 | + "options", "comment", "update_time"], |
| 109 | + ) |
| 110 | + self.assertGreaterEqual(table.num_rows, 1, "should have at least one schema") |
| 111 | + ids = table.column("schema_id").to_pylist() |
| 112 | + self.assertEqual(sorted(ids), sorted(set(ids)), "schema_id should be unique") |
| 113 | + fields_json = table.column("fields").to_pylist()[0] |
| 114 | + self.assertIn("id", fields_json) |
| 115 | + self.assertIn("name", fields_json) |
| 116 | + |
| 117 | + def test_snapshots_system_table(self): |
| 118 | + table = self._query("snapshots") |
| 119 | + names = table.schema.names |
| 120 | + for required in ( |
| 121 | + "snapshot_id", "schema_id", "commit_user", "commit_identifier", |
| 122 | + "commit_kind", "commit_time", "base_manifest_list", |
| 123 | + "delta_manifest_list", "total_record_count", |
| 124 | + ): |
| 125 | + self.assertIn(required, names) |
| 126 | + self.assertEqual(table.num_rows, 1, "single batch write should produce one snapshot") |
| 127 | + self.assertEqual(table.column("total_record_count").to_pylist()[0], ROW_COUNT) |
| 128 | + |
| 129 | + def test_tags_system_table_empty(self): |
| 130 | + table = self._query("tags") |
| 131 | + self.assertListEqual( |
| 132 | + table.schema.names, |
| 133 | + ["tag_name", "snapshot_id", "schema_id", "commit_time", |
| 134 | + "record_count", "create_time", "time_retained"], |
| 135 | + ) |
| 136 | + self.assertEqual(table.num_rows, 0, "no tags created") |
| 137 | + |
| 138 | + def test_branches_system_table_empty(self): |
| 139 | + table = self._query("branches") |
| 140 | + self.assertListEqual(table.schema.names, ["branch_name", "create_time"]) |
| 141 | + self.assertEqual(table.num_rows, 0, "implicit main branch is not listed") |
| 142 | + |
| 143 | + def test_manifests_system_table(self): |
| 144 | + table = self._query("manifests") |
| 145 | + for required in ( |
| 146 | + "file_name", "file_size", "num_added_files", |
| 147 | + "num_deleted_files", "schema_id", |
| 148 | + ): |
| 149 | + self.assertIn(required, table.schema.names) |
| 150 | + self.assertGreaterEqual(table.num_rows, 1, "snapshot should have manifests") |
| 151 | + for size in table.column("file_size").to_pylist(): |
| 152 | + self.assertGreater(size, 0) |
| 153 | + total_added = sum(table.column("num_added_files").to_pylist()) |
| 154 | + self.assertGreaterEqual(total_added, 1, "single write should add at least one file") |
| 155 | + |
| 156 | + |
| 157 | +if __name__ == "__main__": |
| 158 | + unittest.main() |
0 commit comments