Skip to content

Commit 08099eb

Browse files
author
xiaohongbo
committed
[python] optimize schema validation and support binary/large_binary type conversion
add doc for ray convert large binary to binary fix test case failure clean code
1 parent 0525bc0 commit 08099eb

File tree

8 files changed

+181
-9
lines changed

8 files changed

+181
-9
lines changed

docs/content/program-api/python-api.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ record_batch = ...
195195
table_write.write_arrow_batch(record_batch)
196196

197197
# 2.4 Write Ray Dataset (requires ray to be installed)
198+
199+
**Note:** Ray Data converts `large_binary()` to `binary()` when reading. `write_ray()` automatically converts `binary()` back to `large_binary()` to match the table schema.
198200
import ray
199201
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
200202
table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
@@ -471,6 +473,8 @@ df = ray_dataset.to_pandas()
471473
- `**read_args`: Additional kwargs passed to the datasource (e.g., `per_task_row_limit`
472474
in Ray 2.52.0+).
473475

476+
**Note:** Ray Data converts `large_binary()` to `binary()` when reading. When writing back via `write_ray()`, the conversion is handled automatically.
477+
474478
**Ray Block Size Configuration:**
475479

476480
If you need to configure Ray's block size (e.g., when Paimon splits exceed Ray's default

paimon-python/pypaimon/read/datasource.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class RayDatasource(Datasource):
5050
5151
This datasource enables distributed parallel reading of Paimon table splits,
5252
allowing Ray to read multiple splits concurrently across the cluster.
53+
54+
.. note::
55+
Ray Data converts ``large_binary()`` to ``binary()`` when reading.
56+
When writing back via :meth:`TableWrite.write_ray`, the conversion is handled automatically.
5357
"""
5458

5559
def __init__(self, table_read: TableRead, splits: List[Split]):

paimon-python/pypaimon/read/table_read.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,11 @@ def to_ray(
194194
**read_args,
195195
) -> "ray.data.dataset.Dataset":
196196
"""Convert Paimon table data to Ray Dataset.
197+
198+
.. note::
199+
Ray Data converts ``large_binary()`` to ``binary()`` when reading.
200+
When writing back via :meth:`write_ray`, the conversion is handled automatically.
201+
197202
Args:
198203
splits: List of splits to read from the Paimon table.
199204
ray_remote_args: Optional kwargs passed to :func:`ray.remote` in read tasks.

paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ def test_write_wrong_schema(self):
577577

578578
with self.assertRaises(ValueError) as e:
579579
table_write.write_arrow_batch(record_batch)
580-
self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols."))
580+
self.assertTrue(str(e.exception).startswith("Input schema doesn't match table schema."))
581581

582582
def test_write_wide_table_large_data(self):
583583
logging.basicConfig(level=logging.INFO)

paimon-python/pypaimon/tests/ray_data_test.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import shutil
2323

2424
import pyarrow as pa
25+
import pyarrow.types as pa_types
2526
import ray
2627

2728
from pypaimon import CatalogFactory, Schema
@@ -115,6 +116,66 @@ def test_basic_ray_data_read(self):
115116
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
116117
self.assertEqual(ray_dataset.count(), 5, "Should have 5 rows")
117118

119+
def test_ray_data_read_with_blob(self):
120+
pa_schema = pa.schema([
121+
('id', pa.int32()),
122+
('name', pa.string()),
123+
('data', pa.large_binary()), # BLOB type in Paimon
124+
])
125+
126+
schema = Schema.from_pyarrow_schema(
127+
pa_schema,
128+
options={
129+
'row-tracking.enabled': 'true',
130+
'data-evolution.enabled': 'true',
131+
'blob-field': 'data',
132+
}
133+
)
134+
import time
135+
table_name = f'default.test_ray_blob_{int(time.time() * 1000000)}'
136+
137+
self.catalog.create_table(table_name, schema, False)
138+
table = self.catalog.get_table(table_name)
139+
140+
test_data = pa.Table.from_pydict({
141+
'id': [1, 2, 3, 4, 5],
142+
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
143+
'data': [b'data1', b'data2', b'data3', b'data4', b'data5'],
144+
}, schema=pa_schema)
145+
146+
write_builder = table.new_batch_write_builder()
147+
writer = write_builder.new_write()
148+
writer.write_arrow(test_data)
149+
commit_messages = writer.prepare_commit()
150+
commit = write_builder.new_commit()
151+
commit.commit(commit_messages)
152+
writer.close()
153+
154+
read_builder = table.new_read_builder()
155+
table_read = read_builder.new_read()
156+
table_scan = read_builder.new_scan()
157+
splits = table_scan.plan().splits()
158+
159+
ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
160+
161+
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
162+
163+
df_check = ray_dataset.to_pandas()
164+
ray_table_check = pa.Table.from_pandas(df_check)
165+
ray_schema_check = ray_table_check.schema
166+
ray_data_field = ray_schema_check.field('data')
167+
168+
self.assertTrue(
169+
pa_types.is_binary(ray_data_field.type),
170+
f"Ray Dataset should convert large_binary to binary when reading, "
171+
f"but got {ray_data_field.type}"
172+
)
173+
self.assertFalse(
174+
pa_types.is_large_binary(ray_data_field.type),
175+
f"Ray Dataset should NOT have large_binary type after reading, "
176+
f"but got {ray_data_field.type}"
177+
)
178+
118179
# Test basic operations
119180
sample_data = ray_dataset.take(3)
120181
self.assertEqual(len(sample_data), 3, "Should have 3 sample rows")
@@ -130,6 +191,13 @@ def test_basic_ray_data_read(self):
130191
['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
131192
"Name column should match"
132193
)
194+
195+
data_values = [bytes(d) if d is not None else None for d in df_sorted['data']]
196+
self.assertEqual(
197+
data_values,
198+
[b'data1', b'data2', b'data3', b'data4', b'data5'],
199+
"Data column should match"
200+
)
133201

134202
def test_basic_ray_data_write(self):
135203
"""Test basic Ray Data write from PyPaimon table."""

paimon-python/pypaimon/tests/reader_base_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def test_write_wrong_schema(self):
273273

274274
with self.assertRaises(ValueError) as e:
275275
table_write.write_arrow_batch(record_batch)
276-
self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols."))
276+
self.assertTrue(str(e.exception).startswith("Input schema doesn't match table schema."))
277277

278278
def test_reader_iterator(self):
279279
read_builder = self.table.new_read_builder()

paimon-python/pypaimon/tests/rest/rest_read_write_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def test_write_wrong_schema(self):
438438

439439
with self.assertRaises(ValueError) as e:
440440
table_write.write_arrow_batch(record_batch)
441-
self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols."))
441+
self.assertTrue(str(e.exception).startswith("Input schema doesn't match table schema."))
442442

443443
def test_reader_iterator(self):
444444
read_builder = self.table.new_read_builder()

paimon-python/pypaimon/write/table_write.py

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import TYPE_CHECKING, Any, Dict, List, Optional
2020

2121
import pyarrow as pa
22+
import pyarrow.types as pa_types
2223

2324
from pypaimon.schema.data_types import PyarrowFieldParser
2425
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
@@ -29,6 +30,14 @@
2930
from ray.data import Dataset
3031

3132

33+
def _is_binary_type_compatible(input_type: pa.DataType, table_type: pa.DataType) -> bool:
34+
if pa_types.is_binary(input_type) and pa_types.is_large_binary(table_type):
35+
return True
36+
if pa_types.is_large_binary(input_type) and pa_types.is_binary(table_type):
37+
return True
38+
return False
39+
40+
3241
class TableWrite:
3342
def __init__(self, table, commit_user):
3443
from pypaimon.table.file_store_table import FileStoreTable
@@ -44,8 +53,46 @@ def write_arrow(self, table: pa.Table):
4453
for batch in batches_iterator:
4554
self.write_arrow_batch(batch)
4655

56+
def _convert_binary_types(self, data: pa.RecordBatch) -> pa.RecordBatch:
57+
write_cols = self.file_store_write.write_cols
58+
table_schema = self.table_pyarrow_schema
59+
60+
converted_arrays = []
61+
needs_conversion = False
62+
63+
for i, field in enumerate(data.schema):
64+
array = data.column(i)
65+
expected_type = None
66+
67+
if write_cols is None or field.name in write_cols:
68+
try:
69+
expected_type = table_schema.field(field.name).type
70+
except KeyError:
71+
pass
72+
73+
if expected_type and field.type != expected_type and _is_binary_type_compatible(field.type, expected_type):
74+
try:
75+
array = pa.compute.cast(array, expected_type)
76+
needs_conversion = True
77+
except (pa.ArrowInvalid, pa.ArrowCapacityError, ValueError) as e:
78+
direction = f"{field.type} to {expected_type}"
79+
raise ValueError(
80+
f"Failed to convert field '{field.name}' from {direction}. "
81+
f"If converting to binary(), ensure no value exceeds 2GB limit: {e}"
82+
) from e
83+
84+
converted_arrays.append(array)
85+
86+
if needs_conversion:
87+
new_fields = [pa.field(field.name, arr.type, nullable=field.nullable)
88+
for field, arr in zip(data.schema, converted_arrays)]
89+
return pa.RecordBatch.from_arrays(converted_arrays, schema=pa.schema(new_fields))
90+
91+
return data
92+
4793
def write_arrow_batch(self, data: pa.RecordBatch):
4894
self._validate_pyarrow_schema(data.schema)
95+
data = self._convert_binary_types(data)
4996
partitions, buckets = self.row_key_extractor.extract_partition_bucket_batch(data)
5097

5198
partition_bucket_groups = defaultdict(list)
@@ -59,7 +106,7 @@ def write_arrow_batch(self, data: pa.RecordBatch):
59106

60107
def write_pandas(self, dataframe):
61108
pa_schema = PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
62-
record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema)
109+
record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema, preserve_index=False)
63110
return self.write_arrow_batch(record_batch)
64111

65112
def with_write_type(self, write_cols: List[str]):
@@ -81,6 +128,11 @@ def write_ray(
81128
"""
82129
Write a Ray Dataset to Paimon table.
83130
131+
.. note::
132+
Ray Data converts ``large_binary()`` to ``binary()`` when reading.
133+
This method automatically converts ``binary()`` back to ``large_binary()``
134+
to match the table schema.
135+
84136
Args:
85137
dataset: Ray Dataset to write. This is a distributed data collection
86138
from Ray Data (ray.data.Dataset).
@@ -102,11 +154,50 @@ def close(self):
102154
self.file_store_write.close()
103155

104156
def _validate_pyarrow_schema(self, data_schema: pa.Schema):
105-
if data_schema != self.table_pyarrow_schema and data_schema.names != self.file_store_write.write_cols:
106-
raise ValueError(f"Input schema isn't consistent with table schema and write cols. "
107-
f"Input schema is: {data_schema} "
108-
f"Table schema is: {self.table_pyarrow_schema} "
109-
f"Write cols is: {self.file_store_write.write_cols}")
157+
write_cols = self.file_store_write.write_cols
158+
159+
if write_cols is None:
160+
if data_schema.names != self.table_pyarrow_schema.names:
161+
raise ValueError(
162+
f"Input schema doesn't match table schema. "
163+
f"Field names and order must exactly match.\n"
164+
f"Input schema: {data_schema}\n"
165+
f"Table schema: {self.table_pyarrow_schema}"
166+
)
167+
for input_field, table_field in zip(data_schema, self.table_pyarrow_schema):
168+
if input_field.type != table_field.type:
169+
if not _is_binary_type_compatible(input_field.type, table_field.type):
170+
raise ValueError(
171+
f"Input schema doesn't match table schema. "
172+
f"Field '{input_field.name}' type mismatch.\n"
173+
f"Input type: {input_field.type}\n"
174+
f"Table type: {table_field.type}\n"
175+
f"Input schema: {data_schema}\n"
176+
f"Table schema: {self.table_pyarrow_schema}"
177+
)
178+
else:
179+
if list(data_schema.names) != write_cols:
180+
raise ValueError(
181+
f"Input schema field names don't match write_cols. "
182+
f"Field names and order must match write_cols.\n"
183+
f"Input schema names: {list(data_schema.names)}\n"
184+
f"Write cols: {write_cols}"
185+
)
186+
table_field_map = {field.name: field for field in self.table_pyarrow_schema}
187+
for field_name in write_cols:
188+
if field_name not in table_field_map:
189+
raise ValueError(
190+
f"Field '{field_name}' in write_cols is not in table schema."
191+
)
192+
input_field = data_schema.field(field_name)
193+
table_field = table_field_map[field_name]
194+
if input_field.type != table_field.type:
195+
if not _is_binary_type_compatible(input_field.type, table_field.type):
196+
raise ValueError(
197+
f"Field '{field_name}' type mismatch.\n"
198+
f"Input type: {input_field.type}\n"
199+
f"Table type: {table_field.type}"
200+
)
110201

111202

112203
class BatchTableWrite(TableWrite):

0 commit comments

Comments
 (0)