Skip to content

Commit 0b859ed

Browse files
junmuzymuzammil
authored andcommitted
Implementing input changelog generation support in python lib
1 parent 1d05829 commit 0b859ed

5 files changed

Lines changed: 419 additions & 24 deletions

File tree

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import glob
20+
import os
21+
import shutil
22+
import tempfile
23+
import unittest
24+
25+
import pyarrow as pa
26+
27+
import json
28+
29+
from pypaimon import CatalogFactory, Schema
30+
from pypaimon.manifest.manifest_list_manager import ManifestListManager
31+
from pypaimon.write.commit_message import CommitMessage
32+
33+
34+
class ChangelogProducerTest(unittest.TestCase):
35+
36+
@classmethod
37+
def setUpClass(cls):
38+
cls.tempdir = tempfile.mkdtemp()
39+
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
40+
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
41+
cls.catalog.create_database('default', True)
42+
cls.pk_schema = pa.schema([
43+
pa.field('user_id', pa.int32(), nullable=False),
44+
('item_id', pa.int64()),
45+
('behavior', pa.string()),
46+
pa.field('dt', pa.string(), nullable=False)
47+
])
48+
49+
@classmethod
50+
def tearDownClass(cls):
51+
shutil.rmtree(cls.tempdir, ignore_errors=True)
52+
53+
def _create_table(self, table_name, options=None):
54+
schema = Schema.from_pyarrow_schema(
55+
self.pk_schema,
56+
partition_keys=['dt'],
57+
primary_keys=['user_id', 'dt'],
58+
options=options or {}
59+
)
60+
self.catalog.create_table(f'default.{table_name}', schema, False)
61+
return self.catalog.get_table(f'default.{table_name}')
62+
63+
def _sample_data(self):
64+
return pa.Table.from_pydict({
65+
'user_id': [1, 2, 3],
66+
'item_id': [101, 102, 103],
67+
'behavior': ['click', 'buy', 'view'],
68+
'dt': ['p1', 'p1', 'p1']
69+
}, schema=self.pk_schema)
70+
71+
def test_commit_message_with_changelog(self):
72+
msg = CommitMessage(partition=('p1',), bucket=0, new_files=[], changelog_files=[])
73+
self.assertTrue(msg.is_empty())
74+
75+
msg2 = CommitMessage(partition=('p1',), bucket=0, new_files=['fake'])
76+
self.assertFalse(msg2.is_empty())
77+
self.assertEqual(msg2.changelog_files, [])
78+
79+
def test_full_compaction_and_lookup_no_changelog_from_writer(self):
80+
"""FULL_COMPACTION and LOOKUP rely on dedicated compaction for changelog,
81+
so the Python writer should not produce changelog files for these modes."""
82+
for mode in ['full-compaction', 'lookup']:
83+
table = self._create_table(
84+
f'test_no_changelog_{mode.replace("-", "_")}',
85+
options={'changelog-producer': mode, 'bucket': '1'}
86+
)
87+
write_builder = table.new_batch_write_builder()
88+
table_write = write_builder.new_write()
89+
table_commit = write_builder.new_commit()
90+
91+
table_write.write_arrow(self._sample_data())
92+
table_commit.commit(table_write.prepare_commit())
93+
94+
bucket_dir = os.path.join(
95+
self.warehouse, 'default.db',
96+
f'test_no_changelog_{mode.replace("-", "_")}', 'dt=p1', 'bucket-0')
97+
changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
98+
self.assertEqual(len(changelog_files), 0,
99+
f"Writer should not produce changelog files for {mode}")
100+
101+
table_write.close()
102+
table_commit.close()
103+
104+
def test_none_mode_no_changelog(self):
105+
table = self._create_table(
106+
'test_none_mode',
107+
options={'changelog-producer': 'none', 'bucket': '1'}
108+
)
109+
write_builder = table.new_batch_write_builder()
110+
table_write = write_builder.new_write()
111+
table_commit = write_builder.new_commit()
112+
113+
table_write.write_arrow(self._sample_data())
114+
table_commit.commit(table_write.prepare_commit())
115+
116+
snapshot_path = glob.glob(
117+
os.path.join(self.warehouse, 'default.db', 'test_none_mode', 'snapshot', 'snapshot-*'))
118+
self.assertTrue(len(snapshot_path) > 0)
119+
120+
snapshot_json = open(snapshot_path[0]).read()
121+
snapshot = json.loads(snapshot_json)
122+
self.assertNotIn('changelogManifestList', snapshot)
123+
124+
table_write.close()
125+
table_commit.close()
126+
127+
def test_input_mode_produces_changelog_files(self):
128+
table = self._create_table(
129+
'test_input_files',
130+
options={'changelog-producer': 'input', 'bucket': '1'}
131+
)
132+
write_builder = table.new_batch_write_builder()
133+
table_write = write_builder.new_write()
134+
table_commit = write_builder.new_commit()
135+
136+
table_write.write_arrow(self._sample_data())
137+
table_commit.commit(table_write.prepare_commit())
138+
139+
bucket_dir = os.path.join(
140+
self.warehouse, 'default.db', 'test_input_files', 'dt=p1', 'bucket-0')
141+
data_files = glob.glob(os.path.join(bucket_dir, 'data-*'))
142+
changelog_files = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
143+
self.assertTrue(len(data_files) > 0, "Should have data files")
144+
self.assertTrue(len(changelog_files) > 0, "Should have changelog files")
145+
146+
table_write.close()
147+
table_commit.close()
148+
149+
def test_input_mode_snapshot_has_changelog_manifest(self):
150+
table = self._create_table(
151+
'test_input_snapshot',
152+
options={'changelog-producer': 'input', 'bucket': '1'}
153+
)
154+
write_builder = table.new_batch_write_builder()
155+
table_write = write_builder.new_write()
156+
table_commit = write_builder.new_commit()
157+
158+
table_write.write_arrow(self._sample_data())
159+
table_commit.commit(table_write.prepare_commit())
160+
161+
snapshot_path = glob.glob(
162+
os.path.join(self.warehouse, 'default.db', 'test_input_snapshot',
163+
'snapshot', 'snapshot-*'))
164+
self.assertTrue(len(snapshot_path) > 0)
165+
166+
snapshot_json = open(snapshot_path[0]).read()
167+
snapshot = json.loads(snapshot_json)
168+
self.assertIn('changelogManifestList', snapshot)
169+
self.assertIsNotNone(snapshot['changelogManifestList'])
170+
self.assertIn('changelogRecordCount', snapshot)
171+
self.assertEqual(snapshot['changelogRecordCount'], 3)
172+
self.assertIn('changelogManifestListSize', snapshot)
173+
174+
table_write.close()
175+
table_commit.close()
176+
177+
def test_input_mode_changelog_manifest_readable(self):
178+
table = self._create_table(
179+
'test_input_readable',
180+
options={'changelog-producer': 'input', 'bucket': '1'}
181+
)
182+
write_builder = table.new_batch_write_builder()
183+
table_write = write_builder.new_write()
184+
table_commit = write_builder.new_commit()
185+
186+
table_write.write_arrow(self._sample_data())
187+
table_commit.commit(table_write.prepare_commit())
188+
189+
from pypaimon.snapshot.snapshot_manager import SnapshotManager
190+
snapshot_manager = SnapshotManager(table)
191+
snapshot = snapshot_manager.get_latest_snapshot()
192+
193+
self.assertIsNotNone(snapshot.changelog_manifest_list)
194+
195+
manifest_list_manager = ManifestListManager(table)
196+
changelog_manifests = manifest_list_manager.read_changelog(snapshot)
197+
self.assertTrue(len(changelog_manifests) > 0)
198+
199+
total_changelog_added = sum(m.num_added_files for m in changelog_manifests)
200+
self.assertTrue(total_changelog_added > 0)
201+
202+
table_write.close()
203+
table_commit.close()
204+
205+
def test_input_mode_multiple_commits(self):
206+
table = self._create_table(
207+
'test_input_multi',
208+
options={'changelog-producer': 'input', 'bucket': '1'}
209+
)
210+
211+
# First commit
212+
write_builder1 = table.new_batch_write_builder()
213+
table_write1 = write_builder1.new_write()
214+
table_commit1 = write_builder1.new_commit()
215+
table_write1.write_arrow(self._sample_data())
216+
table_commit1.commit(table_write1.prepare_commit())
217+
table_write1.close()
218+
table_commit1.close()
219+
220+
# Second commit with different data
221+
write_builder2 = table.new_batch_write_builder()
222+
table_write2 = write_builder2.new_write()
223+
table_commit2 = write_builder2.new_commit()
224+
data2 = pa.Table.from_pydict({
225+
'user_id': [4, 5],
226+
'item_id': [104, 105],
227+
'behavior': ['click', 'buy'],
228+
'dt': ['p1', 'p1']
229+
}, schema=self.pk_schema)
230+
table_write2.write_arrow(data2)
231+
table_commit2.commit(table_write2.prepare_commit())
232+
table_write2.close()
233+
table_commit2.close()
234+
235+
from pypaimon.snapshot.snapshot_manager import SnapshotManager
236+
snapshot_manager = SnapshotManager(table)
237+
snapshot = snapshot_manager.get_latest_snapshot()
238+
self.assertEqual(snapshot.id, 2)
239+
self.assertIsNotNone(snapshot.changelog_manifest_list)
240+
self.assertEqual(snapshot.changelog_record_count, 2)
241+
242+
def test_abort_cleans_up_changelog_files(self):
243+
table = self._create_table(
244+
'test_input_abort',
245+
options={'changelog-producer': 'input', 'bucket': '1'}
246+
)
247+
write_builder = table.new_batch_write_builder()
248+
table_write = write_builder.new_write()
249+
table_commit = write_builder.new_commit()
250+
251+
table_write.write_arrow(self._sample_data())
252+
commit_messages = table_write.prepare_commit()
253+
254+
bucket_dir = os.path.join(
255+
self.warehouse, 'default.db', 'test_input_abort', 'dt=p1', 'bucket-0')
256+
changelog_files_before = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
257+
self.assertTrue(len(changelog_files_before) > 0)
258+
259+
table_commit.abort(commit_messages)
260+
261+
data_files_after = glob.glob(os.path.join(bucket_dir, 'data-*'))
262+
changelog_files_after = glob.glob(os.path.join(bucket_dir, 'changelog-*'))
263+
self.assertEqual(len(data_files_after), 0, "Data files should be cleaned up after abort")
264+
self.assertEqual(len(changelog_files_after), 0, "Changelog files should be cleaned up after abort")
265+
266+
table_write.close()
267+
table_commit.close()
268+
269+
270+
if __name__ == '__main__':
271+
unittest.main()

paimon-python/pypaimon/write/commit_message.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
################################################################################
1818

19-
from dataclasses import dataclass
19+
from dataclasses import dataclass, field
2020
from typing import List, Tuple, Optional
2121

2222
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -27,7 +27,8 @@ class CommitMessage:
2727
partition: Tuple
2828
bucket: int
2929
new_files: List[DataFileMeta]
30+
changelog_files: List[DataFileMeta] = field(default_factory=list)
3031
check_from_snapshot: Optional[int] = -1
3132

3233
def is_empty(self):
33-
return not self.new_files
34+
return not self.new_files and not self.changelog_files

0 commit comments

Comments
 (0)