This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d086babdb IMPALA-13598: OPTIMIZE redundantly accumulates memory in 
HDFS WRITER
d086babdb is described below

commit d086babdbd249df0069900739f24da280b06a279
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Dec 10 18:53:29 2024 +0100

    IMPALA-13598: OPTIMIZE redundantly accumulates memory in HDFS WRITER
    
    When OptimizeStmt created the table sink it didn't set
    'inputIsClustered' to true. Therefore HdfsTableSink expected
    random input and kept the output writers open for every partition,
    which resulted in high memory consumption and potentially an
    OOM error when the number of partitions are high.
    
    Since we actually sort the rows before the sink we can set
    'inputIsClustered' to true, which means HdfsTableSink can write
    files one by one, because whenever it gets a row that belongs
    to a new partition it knows that it can close the current output
    writer, and open a new one.
    
    Testing:
     * added e2e test
    
    Change-Id: I8d451c50c4b6dff9433ab105493051bee106bc63
    Reviewed-on: http://gerrit.cloudera.org:8080/22192
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/analysis/OptimizeStmt.java   |  2 +-
 .../queries/QueryTest/iceberg-optimize.test        | 24 +++++++++++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index a011e8064..6fc16892c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -199,7 +199,7 @@ public class OptimizeStmt extends DmlStatementBase {
 
   public DataSink createDataSink() {
     TableSink tableSink = TableSink.create(table_, TableSink.Op.INSERT,
-        partitionKeyExprs_, resultExprs_, new ArrayList<>(), false, false,
+        partitionKeyExprs_, resultExprs_, new ArrayList<>(), false, true,
         new Pair<>(sortColumns_, sortingOrder_), -1, null,
         maxTableSinks_);
     return tableSink;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
index ce3a8a483..fd9993c9d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
@@ -281,4 +281,26 @@ SELECT * FROM ice_optimize;
 11,true,2024-01-01,11.11,'iceberg'
 ---- TYPES
 INT,BOOLEAN,DATE,DOUBLE,STRING
-====
\ No newline at end of file
+====
+---- QUERY
+# Regression test for IMPALA-13598.
+# OPTIMIZE table with many partitions (with low mem_limit).
+# Then we also check the number of files to verify that
+# the inputs of the writers are actually sorted(/clustered).
+CREATE TABLE ice_tpch_many_parts
+PARTITIONED BY SPEC(truncate(1000, l_orderkey))
+STORED BY ICEBERG
+AS SELECT * FROM tpch_parquet.lineitem
+WHERE l_orderkey % 2 = 0;
+
+INSERT INTO ice_tpch_many_parts
+SELECT * FROM tpch_parquet.lineitem WHERE l_orderkey % 2 = 1;
+
+SET MEM_LIMIT=600m;
+OPTIMIZE TABLE ice_tpch_many_parts;
+SELECT count(*) FROM $DATABASE.ice_tpch_many_parts.`files`;
+---- RESULTS
+6001
+---- TYPES
+BIGINT
+====

Reply via email to