This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit da948fa1cd6d65e531417408aba702b9aa9344a1 Author: Jiajia Li <[email protected]> AuthorDate: Tue Apr 7 09:58:38 2026 +0800 [python] Fix BlockHandle varlen decoding in SstFileIterator.read_batch (#7597) read_batch() incorrectly used fixed-width struct.unpack to decode BlockHandle, while the SST format uses variable-length encoding. Extract shared _parse_block_handle() to ensure both seek_to() and read_batch() use consistent varlen decoding. --- .../pypaimon/globalindex/btree/sst_file_reader.py | 33 ++- .../pypaimon/tests/e2e/java_py_read_write_test.py | 7 + .../pypaimon/tests/sst_file_iterator_test.py | 221 +++++++++++++++++++++ 3 files changed, 242 insertions(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py index 9d7f2eafc2..c7a28850ca 100644 --- a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py +++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py @@ -30,7 +30,7 @@ import zlib from typing import Optional, Callable from typing import BinaryIO -from pypaimon.globalindex.btree.btree_file_footer import BlockHandle +from pypaimon.globalindex.btree.block_handle import BlockHandle from pypaimon.globalindex.btree.block_entry import BlockEntry from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput @@ -48,27 +48,28 @@ class SstFileIterator: self.index_iterator = index_block_iterator self.sought_data_block: Optional[BlockIterator] = None + @staticmethod + def _parse_block_handle(block_handle_bytes: bytes) -> BlockHandle: + handle_input = MemorySliceInput(block_handle_bytes) + return BlockHandle( + handle_input.read_var_len_long(), + handle_input.read_var_len_int() + ) + def seek_to(self, key: bytes) -> None: """ Seek to the position of the record whose key is exactly equal to or greater than the specified key. - + Args: key: The key to seek to """ self.index_iterator.seek_to(key) - + if self.index_iterator.has_next(): index_entry: BlockEntry = self.index_iterator.__next__() - block_handle_bytes = index_entry.__getattribute__("value") - handle_input = MemorySliceInput(block_handle_bytes) - - # Parse block handle - block_handle = BlockHandle( - handle_input.read_var_len_long(), - handle_input.read_var_len_int() - ) - + block_handle = self._parse_block_handle(index_entry.value) + # Create data block reader and seek data_block_reader = self.read_block(block_handle) self.sought_data_block = data_block_reader.iterator() @@ -93,13 +94,7 @@ class SstFileIterator: return None index_entry = self.index_iterator.__next__() - block_handle_bytes = index_entry.value - - # Parse block handle - block_handle = BlockHandle( - struct.unpack('<Q', block_handle_bytes[0:8])[0], - struct.unpack('<I', block_handle_bytes[8:12])[0] - ) + block_handle = self._parse_block_handle(index_entry.value) # Create data block reader data_block_reader = self.read_block(block_handle) diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 4d888c3cb4..1f906c656d 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -452,6 +452,13 @@ class JavaPyReadWriteTest(unittest.TestCase): }) self.assertEqual(expected, actual) + # read is_not_null index (full scan across all data blocks) + read_builder.with_filter(predicate_builder.is_not_null('k')) + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits) + self.assertEqual(len(actual), 2000) + def _test_read_btree_index_null(self): table = self.catalog.get_table('default.test_btree_index_null') diff --git a/paimon-python/pypaimon/tests/sst_file_iterator_test.py b/paimon-python/pypaimon/tests/sst_file_iterator_test.py new file mode 100644 index 0000000000..c7c0a9b3a9 --- /dev/null +++ b/paimon-python/pypaimon/tests/sst_file_iterator_test.py @@ -0,0 +1,221 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Tests for SstFileIterator BlockHandle varlen decoding.""" + +import unittest +from unittest.mock import MagicMock + +from pypaimon.globalindex.btree.block_handle import BlockHandle +from pypaimon.globalindex.btree.block_entry import BlockEntry +from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput +from pypaimon.globalindex.btree.sst_file_reader import SstFileIterator + + +def _encode_var_len(value): + result = bytearray() + while value > 0x7F: + result.append((value & 0x7F) | 0x80) + value >>= 7 + result.append(value & 0x7F) + return bytes(result) + + +def _encode_block_handle(offset, size): + return _encode_var_len(offset) + _encode_var_len(size) + + +def _mock_block_iterator(entries): + """Mock a BlockIterator with has_next/next/seek_to over a list of BlockEntry.""" + state = {'pos': 0} + entry_list = list(entries) + + mock = MagicMock() + mock.has_next = lambda: state['pos'] < len(entry_list) + + def next_entry(_self=None): + if state['pos'] >= len(entry_list): + raise StopIteration + entry = entry_list[state['pos']] + state['pos'] += 1 + return entry + mock.__next__ = next_entry + mock.__iter__ = lambda _self=None: mock + + def seek_to(target_key): + for i, entry in enumerate(entry_list): + if entry.key >= target_key: + state['pos'] = i + return entry.key == target_key + state['pos'] = len(entry_list) + return False + mock.seek_to = seek_to + + return mock + + +class SstFileIteratorTest(unittest.TestCase): + + def _make_iterator(self, index_entries, data_blocks): + mock_index_entries = [] + for key, handle in index_entries: + value = _encode_block_handle(handle.offset, handle.size) + mock_index_entries.append(BlockEntry(key, value)) + + index_iter = _mock_block_iterator(mock_index_entries) + + def read_block(block_handle): + entries = data_blocks.get((block_handle.offset, block_handle.size)) + if entries is None: + raise ValueError( + "Unexpected BlockHandle(offset={}, size={})".format( + block_handle.offset, block_handle.size)) + reader = MagicMock() + reader.iterator = lambda e=entries: _mock_block_iterator(e) + return reader + + return SstFileIterator(read_block, index_iter) + + def test_read_batch_varlen_small_values(self): + handle = BlockHandle(100, 50) + data = [BlockEntry(b"k1", b"v1"), BlockEntry(b"k2", b"v2")] + + it = self._make_iterator( + [(b"k2", handle)], + {(100, 50): data} + ) + + batch = it.read_batch() + self.assertIsNotNone(batch) + entries = [batch.__next__() for _ in range(2)] + self.assertEqual(len(entries), 2) + self.assertEqual(entries[0].key, b"k1") + self.assertEqual(entries[1].key, b"k2") + self.assertIsNone(it.read_batch()) + + def test_read_batch_varlen_large_offset(self): + handle = BlockHandle(300, 200) + data = [BlockEntry(b"a", b"1")] + + it = self._make_iterator( + [(b"a", handle)], + {(300, 200): data} + ) + + batch = it.read_batch() + self.assertIsNotNone(batch) + entry = batch.__next__() + self.assertEqual(entry.key, b"a") + + def test_read_batch_varlen_very_large_offset(self): + handle = BlockHandle(1000000, 65535) + data = [BlockEntry(b"big", b"val")] + + it = self._make_iterator( + [(b"big", handle)], + {(1000000, 65535): data} + ) + + batch = it.read_batch() + self.assertIsNotNone(batch) + entry = batch.__next__() + self.assertEqual(entry.key, b"big") + + def test_read_batch_multiple_blocks(self): + h1 = BlockHandle(0, 100) + h2 = BlockHandle(200, 150) + h3 = BlockHandle(500, 80) + + it = self._make_iterator( + [(b"b", h1), (b"d", h2), (b"f", h3)], + { + (0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")], + (200, 150): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")], + (500, 80): [BlockEntry(b"e", b"5"), BlockEntry(b"f", b"6")], + } + ) + + all_entries = [] + while True: + batch = it.read_batch() + if batch is None: + break + while batch.has_next(): + all_entries.append(batch.__next__()) + + self.assertEqual(len(all_entries), 6) + keys = [e.key for e in all_entries] + self.assertEqual(keys, [b"a", b"b", b"c", b"d", b"e", b"f"]) + + def test_seek_then_read_batch_crosses_blocks(self): + h1 = BlockHandle(0, 100) + h2 = BlockHandle(256, 128) + + it = self._make_iterator( + [(b"b", h1), (b"d", h2)], + { + (0, 100): [BlockEntry(b"a", b"1"), BlockEntry(b"b", b"2")], + (256, 128): [BlockEntry(b"c", b"3"), BlockEntry(b"d", b"4")], + } + ) + + it.seek_to(b"a") + self.assertIsNotNone(it.sought_data_block) + + batch1 = it.read_batch() + self.assertIsNotNone(batch1) + self.assertEqual(batch1.__next__().key, b"a") + + batch2 = it.read_batch() + self.assertIsNotNone(batch2) + entries2 = [] + while batch2.has_next(): + entries2.append(batch2.__next__()) + self.assertEqual(len(entries2), 2) + self.assertEqual(entries2[0].key, b"c") + self.assertEqual(entries2[1].key, b"d") + + self.assertIsNone(it.read_batch()) + + def test_read_batch_empty_index(self): + it = self._make_iterator([], {}) + self.assertIsNone(it.read_batch()) + + def test_varlen_encoding_roundtrip(self): + test_cases = [ + (0, 0), + (127, 127), + (128, 128), + (300, 200), + (16384, 255), + (1000000, 65535), + (2**31 - 1, 2**31 - 1), + ] + for offset, size in test_cases: + encoded = _encode_block_handle(offset, size) + inp = MemorySliceInput(encoded) + decoded_offset = inp.read_var_len_long() + decoded_size = inp.read_var_len_int() + self.assertEqual(decoded_offset, offset, + "offset mismatch for ({}, {})".format(offset, size)) + self.assertEqual(decoded_size, size, + "size mismatch for ({}, {})".format(offset, size)) + + +if __name__ == '__main__': + unittest.main()
