This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 05585c19bfcc235ab9d7574c970db04125fb9743 Author: Noemi Pap-Takacs <[email protected]> AuthorDate: Thu Jul 11 16:12:31 2024 +0200 IMPALA-12857: Add flag to enable merge-on-read even if tables are configured with copy-on-write Impala can only modify an Iceberg table via 'merge-on-read'. The 'iceberg_always allow_merge_on_read_operations' backend flag makes it possible to execute 'merge-on-read' operations (DELETE, UPDATE, MERGE) even if the table property is 'copy-on-write'. Testing: - custom cluster test - negative E2E test Change-Id: I3800043e135beeedfb655a238c0644aaa0ef11f4 Reviewed-on: http://gerrit.cloudera.org:8080/21578 Reviewed-by: Daniel Becker <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/common/global-flags.cc | 5 ++ be/src/util/backend-gflag-util.cc | 3 + common/thrift/BackendGflags.thrift | 2 + .../apache/impala/analysis/IcebergDeleteImpl.java | 12 ++-- .../apache/impala/analysis/IcebergModifyImpl.java | 17 +++++ .../apache/impala/analysis/IcebergUpdateImpl.java | 11 ++-- .../org/apache/impala/service/BackendConfig.java | 4 ++ .../queries/QueryTest/iceberg-negative.test | 4 +- .../test_iceberg_always_allow_merge_on_read.py | 73 ++++++++++++++++++++++ 9 files changed, 115 insertions(+), 16 deletions(-) diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 20fdab4a3..072ed4dc4 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -425,6 +425,11 @@ DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true, Impa "does not allow Iceberg data file locations outside of the table directory during " "reads"); +DEFINE_bool(iceberg_always_allow_merge_on_read_operations, false, "Impala can only " + "write delete files with 'merge-on-read'. If this flag is true, Impala allows " + "executing DELETE, UPDATE and MERGE operations on Iceberg tables even if the table " + "property is 'copy-on-write'."); + // Host and port of Statestore Service DEFINE_string(state_store_host, "localhost", "hostname where StatestoreService is running"); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index d7cf69125..5d0f11908 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -112,6 +112,7 @@ DECLARE_int32(iceberg_reload_new_files_threshold); DECLARE_bool(enable_skipping_older_events); DECLARE_bool(enable_json_scanner); DECLARE_bool(iceberg_allow_datafiles_in_table_location_only); +DECLARE_bool(iceberg_always_allow_merge_on_read_operations); DECLARE_int32(catalog_operation_log_size); DECLARE_string(hostname); DECLARE_bool(allow_catalog_cache_op_from_masked_users); @@ -464,6 +465,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner); cfg.__set_iceberg_allow_datafiles_in_table_location_only( FLAGS_iceberg_allow_datafiles_in_table_location_only); + cfg.__set_iceberg_always_allow_merge_on_read_operations( + FLAGS_iceberg_always_allow_merge_on_read_operations); cfg.__set_max_filter_error_rate_from_full_scan( FLAGS_max_filter_error_rate_from_full_scan); cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size); diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 234d0f231..e0121b671 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -308,4 +308,6 @@ struct TBackendGflags { 138: required bool is_release_build 139: required bool enable_catalogd_ha + + 140: required bool iceberg_always_allow_merge_on_read_operations } diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java index d1138d00f..35192150e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java @@ -19,6 +19,7 @@ package org.apache.impala.analysis; import java.util.List; +import org.apache.iceberg.TableProperties; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.Pair; @@ -42,13 +43,6 @@ public class IcebergDeleteImpl extends IcebergModifyImpl { // Make the virtual position delete table the new target table. modifyStmt_.setTargetTable(icePosDelTable_); - String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get( - org.apache.iceberg.TableProperties.DELETE_MODE); - if (deleteMode != null && !deleteMode.equals("merge-on-read")) { - throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " + - "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName())); - } - Expr wherePredicate = modifyStmt_.getWherePredicate(); if (wherePredicate == null || org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) { @@ -82,4 +76,8 @@ public class IcebergDeleteImpl extends IcebergModifyImpl { return new IcebergBufferedDeleteSink(icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_); } + + String getModifyMode() { + return TableProperties.DELETE_MODE; + } } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java index d1084ab60..57f2595df 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java @@ -20,11 +20,13 @@ package org.apache.impala.analysis; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergPositionDeleteTable; import org.apache.impala.common.AnalysisException; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TIcebergFileFormat; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import com.google.common.collect.Lists; @@ -69,6 +71,15 @@ abstract class IcebergModifyImpl extends ModifyImpl { "but the given table uses a different file format: " + originalTargetTable_.getFullName()); } + String modifyMode = getModifyMode(); + String modifyWriteMode = originalTargetTable_.getIcebergApiTable().properties() + .get(modifyMode); + if (modifyWriteMode != null && !Objects.equals(modifyWriteMode, "merge-on-read") + && !isMergeOnReadAlwaysAllowed()) { + throw new AnalysisException(String.format( + "Unsupported '%s': '%s' for Iceberg table: %s", + modifyMode, modifyWriteMode, originalTargetTable_.getFullName())); + } } @Override @@ -113,4 +124,10 @@ abstract class IcebergModifyImpl extends ModifyImpl { ref.analyze(analyzer); return ref; } + + abstract String getModifyMode(); + + protected boolean isMergeOnReadAlwaysAllowed() { + return BackendConfig.INSTANCE.icebergAlwaysAllowMergeOnReadOperations(); + } } diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java index a7f455c00..62be5e9d0 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java @@ -33,7 +33,6 @@ import org.apache.impala.planner.DataSink; import org.apache.impala.planner.IcebergBufferedDeleteSink; import org.apache.impala.planner.MultiDataSink; import org.apache.impala.planner.TableSink; -import org.apache.impala.thrift.TIcebergFileFormat; import org.apache.impala.thrift.TSortingOrder; import com.google.common.base.Preconditions; @@ -64,12 +63,6 @@ public class IcebergUpdateImpl extends IcebergModifyImpl { super.analyze(analyzer); deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_); IcebergUtil.validateIcebergTableForInsert(originalTargetTable_); - String updateMode = originalTargetTable_.getIcebergApiTable().properties().get( - TableProperties.UPDATE_MODE); - if (updateMode != null && !updateMode.equals("merge-on-read")) { - throw new AnalysisException(String.format("Unsupported update mode: '%s' for " + - "Iceberg table: %s", updateMode, originalTargetTable_.getFullName())); - } } @Override @@ -188,4 +181,8 @@ public class IcebergUpdateImpl extends IcebergModifyImpl { ret.addDataSink(deleteSink); return ret; } + + String getModifyMode() { + return TableProperties.UPDATE_MODE; + } } diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 326ee5ebf..e00985df8 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -440,6 +440,10 @@ public class BackendConfig { backendCfg_.iceberg_allow_datafiles_in_table_location_only = flag; } + public boolean icebergAlwaysAllowMergeOnReadOperations() { + return backendCfg_.iceberg_always_allow_merge_on_read_operations; + } + public boolean isJsonScannerEnabled() { return backendCfg_.enable_json_scanner; } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index 96d0b78ea..605df7f5f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -757,7 +757,7 @@ AnalysisException: Impala can only write delete files in PARQUET, but the given alter table ice_delete set tblproperties ('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write'); delete from ice_delete where i = 1; ---- CATCH -AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: $DATABASE.ice_delete +AnalysisException: Unsupported 'write.delete.mode': 'copy-on-write' for Iceberg table: $DATABASE.ice_delete ==== ---- QUERY optimize table non_iceberg_table; @@ -792,7 +792,7 @@ AnalysisException: Left-hand side in assignment appears multiple times 'k=2' create table cow (i int, j int) stored by iceberg tblproperties ('format-version'='2', 'write.update.mode'='copy-on-write'); update cow set i = 2; ---- CATCH -AnalysisException: Unsupported update mode: 'copy-on-write' for Iceberg table: $DATABASE.cow +AnalysisException: Unsupported 'write.update.mode': 'copy-on-write' for Iceberg table: $DATABASE.cow ==== ---- QUERY update ice_complex set id = id + 1; diff --git a/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py b/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py new file mode 100644 index 000000000..0750b26eb --- /dev/null +++ b/tests/custom_cluster/test_iceberg_always_allow_merge_on_read.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + + +class TestIcebergAlwaysAllowMergeOnRead(CustomClusterTestSuite): + """Tests for checking the behaviour of startup flag + 'iceberg_always_allow_merge_on_read_operations'.""" + + TABLE_NAME = "iceberg_copy_on_write" + + def create_test_table(self, db): + self.execute_query("CREATE TABLE {0}.{1} (i int) STORED BY ICEBERG " + "tblproperties('format-version'='2', 'write.delete.mode'='copy-on-write', " + "'write.update.mode'='copy-on-write');".format(db, self.TABLE_NAME)) + self.execute_query("INSERT INTO {0}.{1} values(1),(2);".format(db, self.TABLE_NAME)) + + def delete_statement(self, db): + return "DELETE FROM {0}.{1} WHERE i=1;".format(db, self.TABLE_NAME) + + def update_statement(self, db): + return "UPDATE {0}.{1} SET i=3 WHERE i=2;".format(db, self.TABLE_NAME) + + def analysis_exception(self, db, operation): + return "AnalysisException: Unsupported '{0}': 'copy-on-write' for " \ + "Iceberg table: {1}.{2}".format(operation, db, self.TABLE_NAME) + + @classmethod + def get_workload(self): + return 'functional-query' + + @CustomClusterTestSuite.with_args( + impalad_args='--iceberg_always_allow_merge_on_read_operations=true') + @pytest.mark.execute_serially + def test_enable_merge_on_read(self, unique_database): + """If the flag is enabled, Impala can execute DELETE, UPDATE and MERGE operations + even if the table property is 'copy-on-write'.""" + # TODO IMPALA-12732: add test case for MERGE. + self.create_test_table(unique_database) + self.execute_query_expect_success(self.client, self.delete_statement(unique_database)) + self.execute_query_expect_success(self.client, self.update_statement(unique_database)) + + @CustomClusterTestSuite.with_args( + impalad_args='--iceberg_always_allow_merge_on_read_operations=false') + @pytest.mark.execute_serially + def test_disable_merge_on_read(self, unique_database): + """If the flag is disabled, Impala cannot write delete files with 'merge-on-read' + strategy if the table property is 'copy-on-write'.""" + self.create_test_table(unique_database) + result = self.execute_query_expect_failure(self.client, + self.delete_statement(unique_database)) + assert self.analysis_exception(unique_database, "write.delete.mode") in str(result) + result = self.execute_query_expect_failure(self.client, + self.update_statement(unique_database)) + assert self.analysis_exception(unique_database, "write.update.mode") in str(result)
