This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 8093c3fa6 IMPALA-13854: IcebergPositionDeleteChannel uses incorrect
capacity
8093c3fa6 is described below
commit 8093c3fa6b44f7f6ec699d2dd47581401f75f363
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Wed Mar 12 16:00:31 2025 +0100
IMPALA-13854: IcebergPositionDeleteChannel uses incorrect capacity
IcebergPositionDeleteChannel uses incorrect capacity since IMPALA-13509.
It is set to -1 which means it collects delete records as long as it
runs out of memory. This patch moves the Channel's capacity calculation
from the Init() function to the constructor.
Testing
* e2e test added
Change-Id: I207869c97a699d2706227285595ec7d7dbe1e249
Reviewed-on: http://gerrit.cloudera.org:8080/22616
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/krpc-data-stream-sender.cc | 3 +--
.../QueryTest/iceberg-large-scale-deletes.test | 25 ++++++++++++++++++++++
tests/query_test/test_iceberg.py | 6 ++++++
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/krpc-data-stream-sender.cc
b/be/src/runtime/krpc-data-stream-sender.cc
index 127ced2a1..56dba8c59 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -169,6 +169,7 @@ class KrpcDataStreamSender::Channel : public
CacheLineAligned {
fragment_instance_id_(fragment_instance_id),
dest_node_id_(dest_node_id),
is_local_(is_local) {
+ row_batch_capacity_ = CalculateRowBatchCapacity();
DCHECK(IsResolvedAddress(address_));
}
@@ -378,8 +379,6 @@ class KrpcDataStreamSender::Channel : public
CacheLineAligned {
Status KrpcDataStreamSender::Channel::Init(
RuntimeState* state, const shared_ptr<CharMemTrackerAllocator>& allocator)
{
- row_batch_capacity_ = CalculateRowBatchCapacity();
-
// Create a DataStreamService proxy to the destination.
RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
new file mode 100644
index 000000000..5da7536fe
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-large-scale-deletes.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+# Regression test for IMPALA-13854.
+create table range_5_million (i bigint) stored by iceberg
+tblproperties('format-version'='2');
+with v as (values (0 as x), (1), (2), (3), (4), (5), (6), (7), (8),
(9)),
+ v10 as (select 10*x as x from v),
+ v100 as (select 10*x as x from v10),
+ v1000 as (select 10*x as x from v100),
+ v10000 as (select 10*x as x from v1000),
+ v100000 as (select 10*x as x from v10000),
+ v1000000 as (select 10*x as x from v100000),
+ range_i as (select v1000000.x + v100000.x + v10000.x + v1000.x + v100.x
+ v10.x + v.x as i
+ from v1000000, v100000, v10000, v1000, v100, v10, v)
+insert into range_5_million select i from range_i where i < 5000000;
+delete from range_5_million where i > 0;
+====
+---- QUERY
+SET MEM_LIMIT=60m;
+select count(*) from range_5_million;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 10c0ae5d5..e98674148 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1697,6 +1697,12 @@ class TestIcebergV2Table(IcebergTestSuite):
assert hive_output == \
"2,true,1,11,1.1,2.222,123.321,2022-02-22,impala\n"
+ def test_large_scale_deletes(self, vector, unique_database):
+ if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] ==
1:
+ pytest.skip("Only test the optimized v2 operator")
+ self.run_test_case('QueryTest/iceberg-large-scale-deletes', vector,
+ unique_database)
+
@SkipIfFS.hive
def test_delete_hive_read(self, vector, unique_database):
ice_delete = unique_database + ".ice_delete"