Skip to content

Commit d93459e

Browse files
junmuzymuzammil
authored andcommitted
Reject changelog-producer on non-PK tables and decouple changelog file format from data file format
1 parent 0b859ed commit d93459e

5 files changed

Lines changed: 136 additions & 29 deletions

File tree

paimon-python/pypaimon/common/options/core_options.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,14 @@ class CoreOptions:
292292
"Options: none, input, full-compaction, lookup.")
293293
)
294294

295+
CHANGELOG_FILE_FORMAT: ConfigOption[str] = (
296+
ConfigOptions.key("changelog-file.format")
297+
.string_type()
298+
.no_default_value()
299+
.with_description("Specify the file format of changelog files. "
300+
"Currently parquet, avro and orc are supported.")
301+
)
302+
295303
MERGE_ENGINE: ConfigOption[MergeEngine] = (
296304
ConfigOptions.key("merge-engine")
297305
.enum_type(MergeEngine)
@@ -542,6 +550,9 @@ def deletion_vectors_enabled(self, default=None):
542550
def changelog_producer(self, default=None):
543551
return self.options.get(CoreOptions.CHANGELOG_PRODUCER, default)
544552

553+
def changelog_file_format(self, default=None):
554+
return self.options.get(CoreOptions.CHANGELOG_FILE_FORMAT, default)
555+
545556
def merge_engine(self, default=None):
546557
return self.options.get(CoreOptions.MERGE_ENGINE, default)
547558

paimon-python/pypaimon/schema/schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,11 @@ def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str]
9292
if primary_keys is not None:
9393
raise ValueError("Blob type is not supported with primary key.")
9494

95+
changelog_producer = (options or {}).get(CoreOptions.CHANGELOG_PRODUCER.key(), 'none')
96+
if changelog_producer != 'none' and not primary_keys:
97+
raise ValueError(
98+
f"Cannot set 'changelog-producer' to '{changelog_producer}' on a table without primary keys. "
99+
f"Changelog producer requires primary keys to be defined."
100+
)
101+
95102
return Schema(fields, partition_keys, primary_keys, options, comment)

paimon-python/pypaimon/tests/write/changelog_producer_test.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,94 @@ def test_abort_cleans_up_changelog_files(self):
266266
table_write.close()
267267
table_commit.close()
268268

269+
def test_reject_changelog_producer_on_append_only_table(self):
270+
append_schema = pa.schema([
271+
('user_id', pa.int32()),
272+
('item_id', pa.int64()),
273+
('behavior', pa.string()),
274+
('dt', pa.string())
275+
])
276+
for mode in ['input', 'full-compaction', 'lookup']:
277+
with self.assertRaises(ValueError, msg=f"Should reject changelog-producer={mode} without PKs"):
278+
Schema.from_pyarrow_schema(
279+
append_schema,
280+
partition_keys=['dt'],
281+
options={'changelog-producer': mode, 'bucket': '1'}
282+
)
283+
284+
def test_changelog_producer_none_allowed_on_append_only_table(self):
285+
append_schema = pa.schema([
286+
('user_id', pa.int32()),
287+
('item_id', pa.int64()),
288+
('behavior', pa.string()),
289+
('dt', pa.string())
290+
])
291+
schema = Schema.from_pyarrow_schema(
292+
append_schema,
293+
partition_keys=['dt'],
294+
options={'changelog-producer': 'none', 'bucket': '1'}
295+
)
296+
self.assertIsNotNone(schema)
297+
298+
def test_input_mode_changelog_uses_parquet_regardless_of_data_format(self):
299+
table = self._create_table(
300+
'test_input_changelog_format',
301+
options={'changelog-producer': 'input', 'bucket': '1', 'file.format': 'orc'}
302+
)
303+
write_builder = table.new_batch_write_builder()
304+
table_write = write_builder.new_write()
305+
table_commit = write_builder.new_commit()
306+
307+
table_write.write_arrow(self._sample_data())
308+
table_commit.commit(table_write.prepare_commit())
309+
310+
bucket_dir = os.path.join(
311+
self.warehouse, 'default.db', 'test_input_changelog_format', 'dt=p1', 'bucket-0')
312+
changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
313+
self.assertTrue(len(changelog_files) > 0, "Should have changelog files")
314+
for f in changelog_files:
315+
self.assertTrue(f.endswith('.parquet'),
316+
f"Changelog file should use parquet format by default, got {f}")
317+
318+
data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
319+
self.assertTrue(len(data_files) > 0, "Should have data files")
320+
for f in data_files:
321+
self.assertTrue(f.endswith('.orc'),
322+
f"Data file should use orc format, got {f}")
323+
324+
table_write.close()
325+
table_commit.close()
326+
327+
def test_input_mode_changelog_respects_changelog_file_format(self):
328+
table = self._create_table(
329+
'test_input_cl_file_fmt',
330+
options={'changelog-producer': 'input', 'bucket': '1',
331+
'file.format': 'parquet', 'changelog-file.format': 'orc'}
332+
)
333+
write_builder = table.new_batch_write_builder()
334+
table_write = write_builder.new_write()
335+
table_commit = write_builder.new_commit()
336+
337+
table_write.write_arrow(self._sample_data())
338+
table_commit.commit(table_write.prepare_commit())
339+
340+
bucket_dir = os.path.join(
341+
self.warehouse, 'default.db', 'test_input_cl_file_fmt', 'dt=p1', 'bucket-0')
342+
changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
343+
self.assertTrue(len(changelog_files) > 0, "Should have changelog files")
344+
for f in changelog_files:
345+
self.assertTrue(f.endswith('.orc'),
346+
f"Changelog file should use orc format, got {f}")
347+
348+
data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
349+
self.assertTrue(len(data_files) > 0, "Should have data files")
350+
for f in data_files:
351+
self.assertTrue(f.endswith('.parquet'),
352+
f"Data file should use parquet format, got {f}")
353+
354+
table_write.close()
355+
table_commit.close()
356+
269357

270358
if __name__ == '__main__':
271359
unittest.main()

paimon-python/pypaimon/write/file_store_commit.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
127127
len(commit_messages),
128128
)
129129
commit_entries = []
130-
changelog_entries = []
131130
for msg in commit_messages:
132131
partition = GenericRow(list(msg.partition), self.table.partition_keys_fields)
133132
for file in msg.new_files:
@@ -138,17 +137,10 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
138137
total_buckets=self.table.total_buckets,
139138
file=file
140139
))
141-
for file in msg.changelog_files:
142-
changelog_entries.append(ManifestEntry(
143-
kind=0,
144-
partition=partition,
145-
bucket=msg.bucket,
146-
total_buckets=self.table.total_buckets,
147-
file=file
148-
))
140+
changelog_entries = self._collect_changelog_entries(commit_messages)
149141

150142
logger.info("Finished collecting changes, including: %d entries, %d changelog entries",
151-
len(commit_entries), len(changelog_entries))
143+
len(commit_entries), len(changelog_entries))
152144

153145
commit_kind = "APPEND"
154146
detect_conflicts = False
@@ -193,17 +185,7 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c
193185
raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes "
194186
f"in {msg.partition} does not belong to this partition")
195187

196-
changelog_entries = []
197-
for msg in commit_messages:
198-
partition = GenericRow(list(msg.partition), self.table.partition_keys_fields)
199-
for file in msg.changelog_files:
200-
changelog_entries.append(ManifestEntry(
201-
kind=0,
202-
partition=partition,
203-
bucket=msg.bucket,
204-
total_buckets=self.table.total_buckets,
205-
file=file
206-
))
188+
changelog_entries = self._collect_changelog_entries(commit_messages)
207189

208190
self._try_commit(
209191
commit_kind="OVERWRITE",
@@ -600,6 +582,20 @@ def _commit_retry_wait(self, retry_count: int):
600582

601583
time.sleep(total_wait_ms / 1000.0)
602584

585+
def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> List[ManifestEntry]:
586+
changelog_entries = []
587+
for msg in commit_messages:
588+
partition = GenericRow(list(msg.partition), self.table.partition_keys_fields)
589+
for file in msg.changelog_files:
590+
changelog_entries.append(ManifestEntry(
591+
kind=0,
592+
partition=partition,
593+
bucket=msg.bucket,
594+
total_buckets=self.table.total_buckets,
595+
file=file
596+
))
597+
return changelog_entries
598+
603599
def _cleanup_preparation_failure(self,
604600
delta_manifest_list: Optional[str],
605601
base_manifest_list: Optional[str],

paimon-python/pypaimon/write/writer/data_writer.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op
6464
self.committed_files: List[DataFileMeta] = []
6565
self.committed_changelog_files: List[DataFileMeta] = []
6666
self.changelog_producer = changelog_producer
67+
self.changelog_file_format = (
68+
self.options.changelog_file_format()
69+
or CoreOptions.FILE_FORMAT_PARQUET
70+
)
6771
self.write_cols = write_cols
6872
self.blob_as_descriptor = self.options.blob_as_descriptor()
6973

@@ -252,25 +256,26 @@ def _write_data_to_file(self, data: pa.Table):
252256
def _write_changelog_file(self, data, min_key, max_key, key_stats, value_stats,
253257
min_seq, max_seq, creation_time,
254258
value_stats_enabled, is_external):
255-
changelog_file_name = f"changelog-{uuid.uuid4()}-0.{self.file_format}"
259+
cl_fmt = self.changelog_file_format
260+
changelog_file_name = f"changelog-{uuid.uuid4()}-0.{cl_fmt}"
256261
changelog_file_path = self._generate_file_path(changelog_file_name)
257262

258263
changelog_external_path = None
259264
if is_external and self._current_external_path:
260265
changelog_external_path = self._current_external_path
261266

262-
if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
267+
if cl_fmt == CoreOptions.FILE_FORMAT_PARQUET:
263268
self.file_io.write_parquet(changelog_file_path, data, compression=self.compression,
264269
zstd_level=self.zstd_level)
265-
elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
270+
elif cl_fmt == CoreOptions.FILE_FORMAT_ORC:
266271
self.file_io.write_orc(changelog_file_path, data, compression=self.compression,
267-
zstd_level=self.zstd_level)
268-
elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
272+
zstd_level=self.zstd_level)
273+
elif cl_fmt == CoreOptions.FILE_FORMAT_AVRO:
269274
self.file_io.write_avro(changelog_file_path, data, compression=self.compression,
270-
zstd_level=self.zstd_level)
275+
zstd_level=self.zstd_level)
271276
else:
272-
self.file_io.write_parquet(changelog_file_path, data, compression=self.compression,
273-
zstd_level=self.zstd_level)
277+
raise ValueError(f"Unsupported changelog file format: {cl_fmt}. "
278+
f"Supported formats: parquet, orc, avro.")
274279

275280
self.committed_changelog_files.append(DataFileMeta.create(
276281
file_name=changelog_file_name,

0 commit comments

Comments
 (0)