This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e87c30c4e12656eace5329b11cc5e91a97819d02 Author: XiaoHongbo <[email protected]> AuthorDate: Thu Apr 2 14:28:44 2026 +0800 [python] Fix rolling stack overflow by replacing recursion with iteration (#7578) Fix potential stack overflow in `_check_and_roll_if_needed` by replacing recursion with a while loop. This can occur when `target_file_size` is set to a small value, causing excessive recursive splits. --- .../pypaimon/tests/write/table_write_test.py | 35 ++++++++++++++++++++++ paimon-python/pypaimon/write/writer/data_writer.py | 22 +++++++------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py b/paimon-python/pypaimon/tests/write/table_write_test.py index 60ddcdc0bd..2c685f5830 100644 --- a/paimon-python/pypaimon/tests/write/table_write_test.py +++ b/paimon-python/pypaimon/tests/write/table_write_test.py @@ -28,6 +28,8 @@ import pyarrow as pa from parameterized import parameterized from pypaimon.common.json_util import JSON +from pypaimon.common.options.core_options import CoreOptions +from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter class TableWriteTest(unittest.TestCase): @@ -436,3 +438,36 @@ class TableWriteTest(unittest.TestCase): splits = read_builder.new_scan().plan().splits() actual = table_read.to_arrow(splits) self.assertEqual(expected, actual) + + def test_rolling(self): + pa_schema = pa.schema([('name', pa.string())]) + schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=[]) + self.catalog.create_table('default.test_rolling_recursion', schema, True) + table = self.catalog.get_table('default.test_rolling_recursion') + + row_value = 'x' * 100 + sample = pa.Table.from_batches([ + pa.RecordBatch.from_pydict({'name': pa.array([row_value], type=pa.string())}) + ]) + # Set target just above single chunk nbytes so best_split=1 every time + target = sample.nbytes + 1 + + options = CoreOptions.copy(table.options) + options.set(CoreOptions.TARGET_FILE_SIZE, str(target)) + writer = AppendOnlyDataWriter( + table=table, partition=(), bucket=0, + max_seq_number=0, options=options, + ) + + num_rows = 1500 + big_batch = pa.RecordBatch.from_pydict( + {'name': pa.array([row_value] * num_rows, type=pa.string())} + ) + writer.write(big_batch) + + pending_rows = writer.pending_data.num_rows if writer.pending_data is not None else 0 + committed_rows = sum(f.row_count for f in writer.committed_files) + self.assertEqual(committed_rows + pending_rows, num_rows) + self.assertGreater(len(writer.committed_files), 0) + if writer.pending_data is not None: + self.assertLessEqual(writer.pending_data.nbytes, target) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index e0cbe607ca..4d2513a28c 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -142,19 +142,17 @@ class DataWriter(ABC): """Merge existing data with new data. Must be implemented by subclasses.""" def _check_and_roll_if_needed(self): - if self.pending_data is None: - return - - current_size = self.pending_data.nbytes - if current_size > self.target_file_size: + while self.pending_data is not None: + current_size = self.pending_data.nbytes + if current_size <= self.target_file_size: + break split_row = self._find_optimal_split_point(self.pending_data, self.target_file_size) - if split_row > 0: - data_to_write = self.pending_data.slice(0, split_row) - remaining_data = self.pending_data.slice(split_row) - - self._write_data_to_file(data_to_write) - self.pending_data = remaining_data - self._check_and_roll_if_needed() + if split_row <= 0: + break + data_to_write = self.pending_data.slice(0, split_row) + remaining_data = self.pending_data.slice(split_row) + self._write_data_to_file(data_to_write) + self.pending_data = remaining_data def _write_data_to_file(self, data: pa.Table): if data.num_rows == 0:
