This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05585c19bfcc235ab9d7574c970db04125fb9743
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Thu Jul 11 16:12:31 2024 +0200

    IMPALA-12857: Add flag to enable merge-on-read even if tables are 
configured with copy-on-write
    
    Impala can only modify an Iceberg table via 'merge-on-read'. The
    'iceberg_always allow_merge_on_read_operations' backend flag makes it
    possible to execute 'merge-on-read' operations (DELETE, UPDATE, MERGE)
    even if the table property is 'copy-on-write'.
    
    Testing:
     - custom cluster test
     - negative E2E test
    
    Change-Id: I3800043e135beeedfb655a238c0644aaa0ef11f4
    Reviewed-on: http://gerrit.cloudera.org:8080/21578
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/common/global-flags.cc                      |  5 ++
 be/src/util/backend-gflag-util.cc                  |  3 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  | 12 ++--
 .../apache/impala/analysis/IcebergModifyImpl.java  | 17 +++++
 .../apache/impala/analysis/IcebergUpdateImpl.java  | 11 ++--
 .../org/apache/impala/service/BackendConfig.java   |  4 ++
 .../queries/QueryTest/iceberg-negative.test        |  4 +-
 .../test_iceberg_always_allow_merge_on_read.py     | 73 ++++++++++++++++++++++
 9 files changed, 115 insertions(+), 16 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 20fdab4a3..072ed4dc4 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -425,6 +425,11 @@ 
DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true, Impa
     "does not allow Iceberg data file locations outside of the table directory 
during "
     "reads");
 
+DEFINE_bool(iceberg_always_allow_merge_on_read_operations, false, "Impala can 
only "
+    "write delete files with 'merge-on-read'. If this flag is true, Impala 
allows "
+    "executing DELETE, UPDATE and MERGE operations on Iceberg tables even if 
the table "
+    "property is 'copy-on-write'.");
+
 // Host and port of Statestore Service
 DEFINE_string(state_store_host, "localhost",
     "hostname where StatestoreService is running");
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index d7cf69125..5d0f11908 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -112,6 +112,7 @@ DECLARE_int32(iceberg_reload_new_files_threshold);
 DECLARE_bool(enable_skipping_older_events);
 DECLARE_bool(enable_json_scanner);
 DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
+DECLARE_bool(iceberg_always_allow_merge_on_read_operations);
 DECLARE_int32(catalog_operation_log_size);
 DECLARE_string(hostname);
 DECLARE_bool(allow_catalog_cache_op_from_masked_users);
@@ -464,6 +465,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
   cfg.__set_iceberg_allow_datafiles_in_table_location_only(
       FLAGS_iceberg_allow_datafiles_in_table_location_only);
+  cfg.__set_iceberg_always_allow_merge_on_read_operations(
+      FLAGS_iceberg_always_allow_merge_on_read_operations);
   cfg.__set_max_filter_error_rate_from_full_scan(
       FLAGS_max_filter_error_rate_from_full_scan);
   cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size);
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 234d0f231..e0121b671 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -308,4 +308,6 @@ struct TBackendGflags {
   138: required bool is_release_build
 
   139: required bool enable_catalogd_ha
+
+  140: required bool iceberg_always_allow_merge_on_read_operations
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
index d1138d00f..35192150e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -19,6 +19,7 @@ package org.apache.impala.analysis;
 
 import java.util.List;
 
+import org.apache.iceberg.TableProperties;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
@@ -42,13 +43,6 @@ public class IcebergDeleteImpl extends IcebergModifyImpl {
     // Make the virtual position delete table the new target table.
     modifyStmt_.setTargetTable(icePosDelTable_);
 
-    String deleteMode = 
originalTargetTable_.getIcebergApiTable().properties().get(
-      org.apache.iceberg.TableProperties.DELETE_MODE);
-    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
-      throw new AnalysisException(String.format("Unsupported delete mode: '%s' 
for " +
-          "Iceberg table: %s", deleteMode, 
originalTargetTable_.getFullName()));
-    }
-
     Expr wherePredicate = modifyStmt_.getWherePredicate();
     if (wherePredicate == null ||
         org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) 
{
@@ -82,4 +76,8 @@ public class IcebergDeleteImpl extends IcebergModifyImpl {
     return new IcebergBufferedDeleteSink(icePosDelTable_, 
deletePartitionKeyExprs_,
         deleteResultExprs_);
   }
+
+  String getModifyMode() {
+    return TableProperties.DELETE_MODE;
+  }
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
index d1084ab60..57f2595df 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -20,11 +20,13 @@ package org.apache.impala.analysis;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TIcebergFileFormat;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import com.google.common.collect.Lists;
 
@@ -69,6 +71,15 @@ abstract class IcebergModifyImpl extends ModifyImpl {
           "but the given table uses a different file format: " +
           originalTargetTable_.getFullName());
     }
+    String modifyMode = getModifyMode();
+    String modifyWriteMode = 
originalTargetTable_.getIcebergApiTable().properties()
+        .get(modifyMode);
+    if (modifyWriteMode != null && !Objects.equals(modifyWriteMode, 
"merge-on-read")
+        && !isMergeOnReadAlwaysAllowed()) {
+      throw new AnalysisException(String.format(
+          "Unsupported '%s': '%s' for Iceberg table: %s",
+          modifyMode, modifyWriteMode, originalTargetTable_.getFullName()));
+    }
   }
 
   @Override
@@ -113,4 +124,10 @@ abstract class IcebergModifyImpl extends ModifyImpl {
     ref.analyze(analyzer);
     return ref;
   }
+
+  abstract String getModifyMode();
+
+  protected boolean isMergeOnReadAlwaysAllowed() {
+    return BackendConfig.INSTANCE.icebergAlwaysAllowMergeOnReadOperations();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
index a7f455c00..62be5e9d0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
@@ -33,7 +33,6 @@ import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.IcebergBufferedDeleteSink;
 import org.apache.impala.planner.MultiDataSink;
 import org.apache.impala.planner.TableSink;
-import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
@@ -64,12 +63,6 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
     super.analyze(analyzer);
     deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_);
     IcebergUtil.validateIcebergTableForInsert(originalTargetTable_);
-    String updateMode = 
originalTargetTable_.getIcebergApiTable().properties().get(
-        TableProperties.UPDATE_MODE);
-    if (updateMode != null && !updateMode.equals("merge-on-read")) {
-      throw new AnalysisException(String.format("Unsupported update mode: '%s' 
for " +
-          "Iceberg table: %s", updateMode, 
originalTargetTable_.getFullName()));
-    }
   }
 
   @Override
@@ -188,4 +181,8 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
     ret.addDataSink(deleteSink);
     return ret;
   }
+
+  String getModifyMode() {
+    return TableProperties.UPDATE_MODE;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 326ee5ebf..e00985df8 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -440,6 +440,10 @@ public class BackendConfig {
     backendCfg_.iceberg_allow_datafiles_in_table_location_only = flag;
   }
 
+  public boolean icebergAlwaysAllowMergeOnReadOperations() {
+    return backendCfg_.iceberg_always_allow_merge_on_read_operations;
+  }
+
   public boolean isJsonScannerEnabled() {
     return backendCfg_.enable_json_scanner;
   }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 96d0b78ea..605df7f5f 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -757,7 +757,7 @@ AnalysisException: Impala can only write delete files in 
PARQUET, but the given
 alter table ice_delete set tblproperties 
('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write');
 delete from ice_delete where i = 1;
 ---- CATCH
-AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: 
$DATABASE.ice_delete
+AnalysisException: Unsupported 'write.delete.mode': 'copy-on-write' for 
Iceberg table: $DATABASE.ice_delete
 ====
 ---- QUERY
 optimize table non_iceberg_table;
@@ -792,7 +792,7 @@ AnalysisException: Left-hand side in assignment appears 
multiple times 'k=2'
 create table cow (i int, j int) stored by iceberg tblproperties 
('format-version'='2', 'write.update.mode'='copy-on-write');
 update cow set i = 2;
 ---- CATCH
-AnalysisException: Unsupported update mode: 'copy-on-write' for Iceberg table: 
$DATABASE.cow
+AnalysisException: Unsupported 'write.update.mode': 'copy-on-write' for 
Iceberg table: $DATABASE.cow
 ====
 ---- QUERY
 update ice_complex set id = id + 1;
diff --git a/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py 
b/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py
new file mode 100644
index 000000000..0750b26eb
--- /dev/null
+++ b/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestIcebergAlwaysAllowMergeOnRead(CustomClusterTestSuite):
+  """Tests for checking the behaviour of startup flag
+   'iceberg_always_allow_merge_on_read_operations'."""
+
+  TABLE_NAME = "iceberg_copy_on_write"
+
+  def create_test_table(self, db):
+    self.execute_query("CREATE TABLE {0}.{1} (i int) STORED BY ICEBERG "
+        "tblproperties('format-version'='2', 
'write.delete.mode'='copy-on-write', "
+        "'write.update.mode'='copy-on-write');".format(db, self.TABLE_NAME))
+    self.execute_query("INSERT INTO {0}.{1} values(1),(2);".format(db, 
self.TABLE_NAME))
+
+  def delete_statement(self, db):
+    return "DELETE FROM {0}.{1} WHERE i=1;".format(db, self.TABLE_NAME)
+
+  def update_statement(self, db):
+    return "UPDATE {0}.{1} SET i=3 WHERE i=2;".format(db, self.TABLE_NAME)
+
+  def analysis_exception(self, db, operation):
+    return "AnalysisException: Unsupported '{0}': 'copy-on-write' for " \
+        "Iceberg table: {1}.{2}".format(operation, db, self.TABLE_NAME)
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @CustomClusterTestSuite.with_args(
+      impalad_args='--iceberg_always_allow_merge_on_read_operations=true')
+  @pytest.mark.execute_serially
+  def test_enable_merge_on_read(self, unique_database):
+    """If the flag is enabled, Impala can execute DELETE, UPDATE and MERGE 
operations
+    even if the table property is 'copy-on-write'."""
+    # TODO IMPALA-12732: add test case for MERGE.
+    self.create_test_table(unique_database)
+    self.execute_query_expect_success(self.client, 
self.delete_statement(unique_database))
+    self.execute_query_expect_success(self.client, 
self.update_statement(unique_database))
+
+  @CustomClusterTestSuite.with_args(
+      impalad_args='--iceberg_always_allow_merge_on_read_operations=false')
+  @pytest.mark.execute_serially
+  def test_disable_merge_on_read(self, unique_database):
+    """If the flag is disabled, Impala cannot write delete files with 
'merge-on-read'
+    strategy if the table property is 'copy-on-write'."""
+    self.create_test_table(unique_database)
+    result = self.execute_query_expect_failure(self.client,
+                    self.delete_statement(unique_database))
+    assert self.analysis_exception(unique_database, "write.delete.mode") in 
str(result)
+    result = self.execute_query_expect_failure(self.client,
+                    self.update_statement(unique_database))
+    assert self.analysis_exception(unique_database, "write.update.mode") in 
str(result)

Reply via email to