This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d086babdb IMPALA-13598: OPTIMIZE redundantly accumulates memory in
HDFS WRITER
d086babdb is described below
commit d086babdbd249df0069900739f24da280b06a279
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Dec 10 18:53:29 2024 +0100
IMPALA-13598: OPTIMIZE redundantly accumulates memory in HDFS WRITER
When OptimizeStmt created the table sink it didn't set
'inputIsClustered' to true. Therefore HdfsTableSink expected
random input and kept the output writers open for every partition,
which resulted in high memory consumption and potentially an
OOM error when the number of partitions are high.
Since we actually sort the rows before the sink we can set
'inputIsClustered' to true, which means HdfsTableSink can write
files one by one, because whenever it gets a row that belongs
to a new partition it knows that it can close the current output
writer, and open a new one.
Testing:
* added e2e test
Change-Id: I8d451c50c4b6dff9433ab105493051bee106bc63
Reviewed-on: http://gerrit.cloudera.org:8080/22192
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../org/apache/impala/analysis/OptimizeStmt.java | 2 +-
.../queries/QueryTest/iceberg-optimize.test | 24 +++++++++++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index a011e8064..6fc16892c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -199,7 +199,7 @@ public class OptimizeStmt extends DmlStatementBase {
public DataSink createDataSink() {
TableSink tableSink = TableSink.create(table_, TableSink.Op.INSERT,
- partitionKeyExprs_, resultExprs_, new ArrayList<>(), false, false,
+ partitionKeyExprs_, resultExprs_, new ArrayList<>(), false, true,
new Pair<>(sortColumns_, sortingOrder_), -1, null,
maxTableSinks_);
return tableSink;
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
index ce3a8a483..fd9993c9d 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
@@ -281,4 +281,26 @@ SELECT * FROM ice_optimize;
11,true,2024-01-01,11.11,'iceberg'
---- TYPES
INT,BOOLEAN,DATE,DOUBLE,STRING
-====
\ No newline at end of file
+====
+---- QUERY
+# Regression test for IMPALA-13598.
+# OPTIMIZE table with many partitions (with low mem_limit).
+# Then we also check the number of files to verify that
+# the inputs of the writers are actually sorted(/clustered).
+CREATE TABLE ice_tpch_many_parts
+PARTITIONED BY SPEC(truncate(1000, l_orderkey))
+STORED BY ICEBERG
+AS SELECT * FROM tpch_parquet.lineitem
+WHERE l_orderkey % 2 = 0;
+
+INSERT INTO ice_tpch_many_parts
+SELECT * FROM tpch_parquet.lineitem WHERE l_orderkey % 2 = 1;
+
+SET MEM_LIMIT=600m;
+OPTIMIZE TABLE ice_tpch_many_parts;
+SELECT count(*) FROM $DATABASE.ice_tpch_many_parts.`files`;
+---- RESULTS
+6001
+---- TYPES
+BIGINT
+====