This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8093c3fa6 IMPALA-13854: IcebergPositionDeleteChannel uses incorrect 
capacity
8093c3fa6 is described below

commit 8093c3fa6b44f7f6ec699d2dd47581401f75f363
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Wed Mar 12 16:00:31 2025 +0100

    IMPALA-13854: IcebergPositionDeleteChannel uses incorrect capacity
    
    IcebergPositionDeleteChannel uses incorrect capacity since IMPALA-13509.
    It is set to -1 which means it collects delete records as long as it
    runs out of memory. This patch moves the Channel's capacity calculation
    from the Init() function to the constructor.
    
    Testing
     * e2e test added
    
    Change-Id: I207869c97a699d2706227285595ec7d7dbe1e249
    Reviewed-on: http://gerrit.cloudera.org:8080/22616
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/krpc-data-stream-sender.cc          |  3 +--
 .../QueryTest/iceberg-large-scale-deletes.test     | 25 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  6 ++++++
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 127ced2a1..56dba8c59 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -169,6 +169,7 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
       fragment_instance_id_(fragment_instance_id),
       dest_node_id_(dest_node_id),
       is_local_(is_local) {
+    row_batch_capacity_ = CalculateRowBatchCapacity();
     DCHECK(IsResolvedAddress(address_));
   }
 
@@ -378,8 +379,6 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
 
 Status KrpcDataStreamSender::Channel::Init(
     RuntimeState* state, const shared_ptr<CharMemTrackerAllocator>& allocator) 
{
-  row_batch_capacity_ = CalculateRowBatchCapacity();
-
   // Create a DataStreamService proxy to the destination.
   RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
 
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
new file mode 100644
index 000000000..5da7536fe
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+# Regression test for IMPALA-13854.
+create table range_5_million (i bigint) stored by iceberg
+tblproperties('format-version'='2');
+with v            as (values (0 as x), (1), (2), (3), (4), (5), (6), (7), (8), 
(9)),
+     v10          as (select 10*x as x from v),
+     v100         as (select 10*x as x from v10),
+     v1000        as (select 10*x as x from v100),
+     v10000       as (select 10*x as x from v1000),
+     v100000      as (select 10*x as x from v10000),
+     v1000000     as (select 10*x as x from v100000),
+     range_i   as (select v1000000.x + v100000.x + v10000.x + v1000.x + v100.x 
+ v10.x + v.x as i
+                   from v1000000, v100000, v10000, v1000, v100, v10, v)
+insert into range_5_million select i from range_i where i < 5000000;
+delete from range_5_million where i > 0;
+====
+---- QUERY
+SET MEM_LIMIT=60m;
+select count(*) from range_5_million;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 10c0ae5d5..e98674148 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1697,6 +1697,12 @@ class TestIcebergV2Table(IcebergTestSuite):
     assert hive_output == \
         "2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n"
 
+  def test_large_scale_deletes(self, vector, unique_database):
+    if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 
1:
+      pytest.skip("Only test the optimized v2 operator")
+    self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector,
+        unique_database)
+
   @SkipIfFS.hive
   def test_delete_hive_read(self, vector, unique_database):
     ice_delete = unique_database + ".ice_delete"

Reply via email to