This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e4c05fce5998a1c2d10e111cf72fed00995ec85f
Author: Xuebin Su <[email protected]>
AuthorDate: Sat Aug 31 00:38:58 2024 +0800

    IMPALA-13286: Make stream row counters 64-bit
    
    Previously, adding a large number of zero-sized rows to a
    BufferedTupleStream could cause impalad to crash. This was because
    the row counters for a page include
    
    - Page::num_rows, and
    - ReadIterator::read_page_rows_returned_
    
    were 32-bit integers. When the row size is zero, all rows are
    added to one single page. And if the number of rows in the whole
    stream exceeds the limit, the two counters will overflow, making the
    DCHECK failed.
    
    This patch fixes the issue by making the row counters 64-bit integers
    so that they will not easily overflow even when row size is zero.
    DCHECKs are also added to make sure that overflow will not happen.
    
    Testing:
    - Added unit test StreamStateTest::TestAddAndGetZeroSizedRows to ensure
      it is OK to add and get rows when the number of rows in the stream
      exceeds INT_MAX or UINT_MAX.
    - Ran the previously failed query manually and it completed with the
      correct result after the patch.
    
    Change-Id: I4d4cf8f424360717de0c4a5571a638a4c11b9606
    Reviewed-on: http://gerrit.cloudera.org:8080/21741
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/buffered-tuple-stream-test.cc | 96 ++++++++++++++++++++++++++++
 be/src/runtime/buffered-tuple-stream.cc      | 11 +++-
 be/src/runtime/buffered-tuple-stream.h       | 11 ++--
 3 files changed, 112 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 55dce370d..80b2069bb 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -91,6 +91,12 @@ class SimpleTupleStreamTest : public testing::Test {
     string_desc_ =
         pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, 
nullable_tuples));
 
+    DescriptorTblBuilder zero_sized_row_builder(
+        test_env_->exec_env()->frontend(), &pool_);
+    zero_sized_row_builder.DeclareTuple();
+    zero_sized_row_desc_ = pool_.Add(
+        new RowDescriptor(*zero_sized_row_builder.Build(), tuple_ids, 
nullable_tuples));
+
     // Construct descriptors for big rows with and without nullable tuples.
     // Each tuple contains 8 slots of TYPE_INT and a single byte for null 
indicator.
     DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), 
&pool_);
@@ -454,6 +460,7 @@ class SimpleTupleStreamTest : public testing::Test {
   ObjectPool pool_;
   RowDescriptor* int_desc_;
   RowDescriptor* string_desc_;
+  RowDescriptor* zero_sized_row_desc_;
 
   static const int64_t BIG_ROW_BYTES = 16 * 1024;
   RowDescriptor* big_row_desc_;
@@ -601,6 +608,15 @@ class StreamStateTest : public SimpleTupleStreamTest {
   // Test that stream's debug string is capped only for the first
   // BufferedTupleStream::MAX_PAGE_ITER_DEBUG.
   void TestShortDebugString();
+
+  // Helper method to add one zero-sized row to the stream.
+  void TestAddOneZeroSizedRow(BufferedTupleStream& stream, RowBatch& batch);
+
+  // Helper method to get one zero-sized row from the stream.
+  void TestGetOneZeroSizedRow(BufferedTupleStream& stream);
+
+  // Test adding more than INT_MAX or UINT_MAX zero-size rows to a stream.
+  void TestAddAndGetZeroSizedRows();
 };
 
 // Basic API test. No data should be going to disk.
@@ -2243,6 +2259,86 @@ TEST_F(StreamStateTest, 
UnpinFullyExhaustedReadPageOnReadOnlyStreamNoAttach) {
 TEST_F(StreamStateTest, ShortDebugString) {
   TestShortDebugString();
 }
+
+void StreamStateTest::TestAddOneZeroSizedRow(
+    BufferedTupleStream& stream, RowBatch& batch) {
+  Status status;
+  uint8_t* write_ptr_before = stream.write_ptr_;
+  int64_t num_rows_before =
+      stream.write_page_ == nullptr ? 0 : stream.write_page_->num_rows;
+  bool b = stream.AddRow(batch.GetRow(0), &status);
+  ASSERT_OK(status);
+  ASSERT_TRUE(b);
+  if (write_ptr_before != nullptr) {
+    ASSERT_EQ(stream.write_ptr_, write_ptr_before);
+  }
+  ASSERT_EQ(stream.write_page_->num_rows, num_rows_before + 1);
+}
+
+void StreamStateTest::TestGetOneZeroSizedRow(BufferedTupleStream& stream) {
+  RowBatch read_batch(zero_sized_row_desc_, 1, &tracker_);
+  uint8_t* read_ptr_before = stream.read_it_.read_ptr_;
+  int64_t num_rows_before = stream.read_it_.read_page_rows_returned_;
+  bool eos = false;
+  Status status = stream.GetNext(&read_batch, &eos);
+  ASSERT_OK(status);
+  if (read_ptr_before != nullptr) {
+    ASSERT_EQ(stream.read_it_.read_ptr_, read_ptr_before);
+  }
+  ASSERT_EQ(stream.read_it_.read_page_rows_returned_, num_rows_before + 1);
+}
+
+void StreamStateTest::TestAddAndGetZeroSizedRows() {
+  Init(BUFFER_POOL_LIMIT);
+  BufferedTupleStream stream(
+      runtime_state_, zero_sized_row_desc_, &client_, PAGE_LEN, PAGE_LEN);
+  ASSERT_OK(stream.Init("StreamStateTest::TestAddAndGetZeroSizedRows", false));
+  bool got_reservation = false;
+  ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+
+  RowBatch write_batch(zero_sized_row_desc_, 1, &tracker_);
+  write_batch.CommitRows(1);
+
+  // Adding 1 row to initialize the row counters
+  TestAddOneZeroSizedRow(stream, write_batch);
+
+  // Set the row counters to mock a stream with INT_MAX rows
+  stream.num_rows_ = INT_MAX;
+  stream.write_page_->num_rows = INT_MAX;
+
+  // Test if the stream can hold more than INT_MAX rows.
+  TestAddOneZeroSizedRow(stream, write_batch);
+
+  // Set the row counters to mock a stream with UINT_MAX rows
+  stream.num_rows_ = UINT_MAX;
+  stream.write_page_->num_rows = UINT_MAX;
+
+  // Test if the stream can hold more than UINT_MAX rows.
+  TestAddOneZeroSizedRow(stream, write_batch);
+
+  stream.DoneWriting();
+
+  // Test if we can get 1 row after getting 0 rows.
+  TestGetOneZeroSizedRow(stream);
+
+  // Test if we can get 1 row after getting INT_MAX rows.
+  stream.read_it_.IncrRowsReturned(INT_MAX - 1);
+  ASSERT_EQ(stream.read_it_.read_page_rows_returned_, INT_MAX);
+  TestGetOneZeroSizedRow(stream);
+
+  // Test if we can get 1 row after getting UINT_MAX rows.
+  stream.read_it_.IncrRowsReturned(UINT_MAX - INT_MAX - 1);
+  ASSERT_EQ(stream.read_it_.read_page_rows_returned_, UINT_MAX);
+  TestGetOneZeroSizedRow(stream);
+
+  ASSERT_EQ(stream.read_it_.GetRowsLeftInPage(), 0);
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+TEST_F(StreamStateTest, AddAndGetZeroSizedRows) {
+  TestAddAndGetZeroSizedRows();
+}
 }
 
 int main(int argc, char** argv) {
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 19dcb9ff6..2202b6124 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -832,8 +832,13 @@ Status BufferedTupleStream::GetNextInternal(ReadIterator* 
RESTRICT read_iter,
   DCHECK(read_iter->read_page_->is_pinned()) << DebugString();
   DCHECK_GE(read_iter->read_page_rows_returned_, 0);
 
-  int rows_left_in_page = read_iter->GetRowsLeftInPage();
-  int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), 
rows_left_in_page);
+  int64_t rows_left_in_page = read_iter->GetRowsLeftInPage();
+  // We are casting an int64_t to int here but this is OK because rows_to_fill 
is
+  // no greater than batch->capacity(), which in turn is no greater than 
INT_MAX.
+  int64_t rows_to_fill_temp = std::min(
+      static_cast<int64_t>(batch->capacity() - batch->num_rows()), 
rows_left_in_page);
+  DCHECK_LE(rows_to_fill_temp, INT_MAX);
+  int rows_to_fill = static_cast<int>(rows_to_fill_temp);
   DCHECK_GE(rows_to_fill, 1);
   uint8_t* tuple_row_mem = 
reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
 
@@ -1034,6 +1039,8 @@ bool BufferedTupleStream::AddRow(TupleRow* row, Status* 
status) noexcept {
   if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, 
write_end_ptr_))) {
     return AddRowSlow(row, status);
   }
+  DCHECK_LT(num_rows_, INT64_MAX);
+  DCHECK_LT(write_page_->num_rows, INT64_MAX);
   ++num_rows_;
   ++write_page_->num_rows;
   return true;
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index 6002d112d..0191d96de 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -450,8 +450,10 @@ class BufferedTupleStream {
 
     BufferPool::PageHandle handle;
 
-    /// Number of rows written to the page.
-    int num_rows = 0;
+    /// Number of rows written to the page. Its type is int64_t since when the 
row size
+    /// is zero, all rows will be added to one page. And the number of rows in 
the whole
+    /// stream can exceed INT_MAX.
+    int64_t num_rows = 0;
 
     /// Whether we called GetBuffer() on the page since it was last pinned. 
This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() 
may have
@@ -506,8 +508,9 @@ class BufferedTupleStream {
     /// Total number of rows returned via this read iterator since Init() was 
called.
     int64_t rows_returned_ = 0;
 
-    /// Number of rows returned from the current 'read_page_'.
-    uint32_t read_page_rows_returned_ = -1;
+    /// Number of rows returned from the current 'read_page_'. The type needs 
to be
+    /// compatible with 'Page::num_rows' for them to compare.
+    int64_t read_page_rows_returned_ = -1;
 
     /// Pointer into 'read_page_' to the byte after the last row read.
     uint8_t* read_ptr_ = nullptr;

Reply via email to