This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dc912f016e4804331e5a9fe2960f8c89376195b3
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Thu Oct 13 18:02:46 2022 +0200

    IMPALA-11655: Impala should set write mode "merge-on-read" by default
    
    Similarly to HIVE-26596 Impala should set merge-on-read write mode
    for V2 tables, unless otherwise specified:
    
    * during table creation with 'format-version'='2'
    * during alter table set tblproperties 'format-version'='2'
    
    We do so because in the foreseeable future Impala will only support
    merge-on-read (on the write-side, on the read side copy-on-write is
    also supported). Also, currently Hive only supports merge-on-read.
    
    Testing:
     * e2e tests added
    
    Change-Id: Icaa32472cde98e21fb23f5461175db1bf401db3d
    Reviewed-on: http://gerrit.cloudera.org:8080/19138
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/analysis/CreateTableStmt.java    | 23 ++++++
 .../org/apache/impala/catalog/IcebergTable.java    |  6 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 24 ++++++
 .../java/org/apache/impala/util/IcebergUtil.java   | 11 +++
 .../queries/QueryTest/iceberg-alter.test           | 85 ++++++++++++++++++++++
 .../queries/QueryTest/show-create-table.test       | 74 +++++++++++++++++++
 6 files changed, 223 insertions(+)

diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index c011b3bfd..819981a84 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -624,6 +624,7 @@ public class CreateTableStmt extends StatementBase {
     putGeneratedProperty(IcebergTable.KEY_STORAGE_HANDLER,
         IcebergTable.ICEBERG_STORAGE_HANDLER);
     putGeneratedProperty(TableProperties.ENGINE_HIVE_ENABLED, "true");
+    addMergeOnReadPropertiesIfNeeded();
 
     String fileformat = 
getTblProperties().get(IcebergTable.ICEBERG_FILE_FORMAT);
     TIcebergFileFormat icebergFileFormat = 
IcebergUtil.getIcebergFileFormat(fileformat);
@@ -652,6 +653,28 @@ public class CreateTableStmt extends StatementBase {
     validateIcebergTableProperties(catalog);
   }
 
+  /**
+   * When creating an Iceberg table that supports row-level modifications
+   * (format-version >= 2) we set write modes to "merge-on-read" which is the 
write
+   * mode Impala will eventually support (IMPALA-11664).
+   */
+  private void addMergeOnReadPropertiesIfNeeded() {
+    Map<String, String> tblProps = getTblProperties();
+    String formatVersion = tblProps.get(TableProperties.FORMAT_VERSION);
+    if (formatVersion == null ||
+        Integer.valueOf(formatVersion) < IcebergTable.ICEBERG_FORMAT_V2) {
+      return;
+    }
+
+    // Only add "merge-on-read" if none of the write modes are specified.
+    final String MERGE_ON_READ = IcebergTable.MERGE_ON_READ;
+    if (!IcebergUtil.isAnyWriteModeSet(tblProps)) {
+      putGeneratedProperty(TableProperties.DELETE_MODE, MERGE_ON_READ);
+      putGeneratedProperty(TableProperties.UPDATE_MODE, MERGE_ON_READ);
+      putGeneratedProperty(TableProperties.MERGE_MODE, MERGE_ON_READ);
+    }
+  }
+
   private void validateIcebergParquetCompressionCodec(
       TIcebergFileFormat icebergFileFormat) throws AnalysisException {
     if (icebergFileFormat != TIcebergFileFormat.PARQUET) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 2b36bfa55..b10449d23 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -75,6 +75,10 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   // Iceberg catalog type key in tblproperties
   public static final String ICEBERG_CATALOG = "iceberg.catalog";
 
+  // Iceberg format version numbers
+  public static final int ICEBERG_FORMAT_V1 = 1;
+  public static final int ICEBERG_FORMAT_V2 = 2;
+
   // Iceberg table catalog location key in tblproperties when using 
HadoopCatalog
   // This property is necessary for both managed and external Iceberg table 
with
   // 'hadoop.catalog'
@@ -94,6 +98,8 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   public static final String PARQUET_COMPRESSION_LEVEL =
       "write.parquet.compression-level";
 
+  public static final String MERGE_ON_READ = "merge-on-read";
+
   // Default values for parquet compression codec.
   public static final THdfsCompression DEFAULT_PARQUET_COMPRESSION_CODEC =
       THdfsCompression.SNAPPY;
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index cc734318f..04e525c6d 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.impala.analysis.AlterTableSortByStmt;
@@ -1412,10 +1413,33 @@ public class CatalogOpExecutor {
     TAlterTableSetTblPropertiesParams setPropsParams =
         params.getSet_tbl_properties_params();
     if (setPropsParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return 
false;
+
+    addMergeOnReadPropertiesIfNeeded(tbl, setPropsParams.getProperties());
     IcebergCatalogOpExecutor.setTblProperties(iceTxn, 
setPropsParams.getProperties());
     return true;
   }
 
+  /**
+   * Iceberg format from V2 supports row-level modifications. We set write 
modes to
+   * "merge-on-read" which is the write mode Impala will eventually
+   * support (IMPALA-11664). Unless the user specified otherwise in the table 
properties.
+   */
+  private void addMergeOnReadPropertiesIfNeeded(IcebergTable tbl,
+      Map<String, String> properties) {
+    String formatVersion = properties.get(TableProperties.FORMAT_VERSION);
+    if (formatVersion == null ||
+        Integer.valueOf(formatVersion) < IcebergTable.ICEBERG_FORMAT_V2) {
+      return;
+    }
+    if (!IcebergUtil.isAnyWriteModeSet(properties) &&
+        
!IcebergUtil.isAnyWriteModeSet(tbl.getMetaStoreTable().getParameters())) {
+      final String MERGE_ON_READ = IcebergTable.MERGE_ON_READ;
+      properties.put(TableProperties.DELETE_MODE, MERGE_ON_READ);
+      properties.put(TableProperties.UPDATE_MODE, MERGE_ON_READ);
+      properties.put(TableProperties.MERGE_MODE, MERGE_ON_READ);
+    }
+  }
+
   /**
    * Unsets table properties for an Iceberg table. Returns true on success, 
returns false
    * if the operation is not applicable at the Iceberg table level, e.g. 
setting SERDE
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index b4b4a7bda..f3185c4bb 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -51,6 +51,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -832,6 +833,16 @@ public class IcebergUtil {
     return IcebergTable.UNSET_PARQUET_ROW_GROUP_SIZE;
   }
 
+  /**
+   * @return true if any of the write modes is being set in 'tblProperties'.
+   */
+  public static boolean isAnyWriteModeSet(Map<String, String> tblProperties) {
+    return
+        tblProperties.get(TableProperties.DELETE_MODE) != null ||
+        tblProperties.get(TableProperties.UPDATE_MODE) != null ||
+        tblProperties.get(TableProperties.MERGE_MODE) != null;
+  }
+
   public static Long parseParquetPageSize(Map<String, String> tblProperties,
       String property, String descr, StringBuilder errMsg) {
     if (tblProperties.containsKey(property)) {
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
index 12518af2b..df2e7adb9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
@@ -351,3 +351,88 @@ DESCRIBE FORMATTED iceberg_changing_parq_tblprops;
 ---- TYPES
 string, string, string
 ====
+---- QUERY
+CREATE TABLE iceberg_upgrade_v2_no_write_mode (i INT) STORED AS ICEBERG;
+DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','write.delete.mode   ','merge-on-read       '
+'','write.update.mode   ','merge-on-read       '
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+# Setting format-version to 1 doesn't add write modes.
+ALTER TABLE iceberg_upgrade_v2_no_write_mode SET 
TBLPROPERTIES('format-version'='1');
+DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','write.delete.mode   ','merge-on-read       '
+'','write.update.mode   ','merge-on-read       '
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_upgrade_v2_no_write_mode SET 
TBLPROPERTIES('format-version'='2');
+DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.delete.mode   ','merge-on-read       '
+'','write.update.mode   ','merge-on-read       '
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_upgrade_v2_delete_mode (i INT) STORED AS ICEBERG;
+ALTER TABLE iceberg_upgrade_v2_delete_mode
+SET TBLPROPERTIES('format-version'='2', 'write.delete.mode'='copy-on-write');
+DESCRIBE FORMATTED iceberg_upgrade_v2_delete_mode;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.delete.mode   ','copy-on-write       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_upgrade_v2_delete_mode;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','write.update.mode   ','merge-on-read       '
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_upgrade_v2_update_mode (i INT) STORED AS ICEBERG;
+ALTER TABLE iceberg_upgrade_v2_update_mode
+SET TBLPROPERTIES('format-version'='2', 'write.update.mode'='copy-on-write');
+DESCRIBE FORMATTED iceberg_upgrade_v2_update_mode;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.update.mode   ','copy-on-write       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_upgrade_v2_update_mode;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','write.delete.mode   ','merge-on-read       '
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_upgrade_v2_merge_mode (i INT) STORED AS ICEBERG;
+ALTER TABLE iceberg_upgrade_v2_merge_mode
+SET TBLPROPERTIES('format-version'='2', 'write.merge.mode'='merge-on-read');
+DESCRIBE FORMATTED iceberg_upgrade_v2_merge_mode;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.merge.mode    ','merge-on-read       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_upgrade_v2_merge_mode;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','write.update.mode   ','merge-on-read       '
+'','write.delete.mode   ','merge-on-read       '
+---- TYPES
+string, string, string
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test 
b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index ed1809229..c2485aafa 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -938,3 +938,77 @@ LOCATION '$$location_uri$$'
 TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'engine.hive.enabled'='true', 'table_type'='ICEBERG')
 ====
+---- CREATE_TABLE
+# Creating V1 tables explicitly should not set 'merge-on-read' write modes if 
no write mode is
+# specified.
+CREATE TABLE ice_explicit_v1 (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='1')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_explicit_v1 (i INT NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG')
+====
+---- CREATE_TABLE
+# Creating V2 tables should set 'merge-on-read' write modes if no write mode 
is specified.
+CREATE TABLE ice_v2 (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2 (i INT NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 
'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')
+====
+---- CREATE_TABLE
+# Creating V2 tables should not set write mode if user specified any of it to 
any value.
+CREATE TABLE ice_v2_explicit_delete (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_delete (i INT 
NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 
'write.delete.mode'='merge-on-read')
+====
+---- CREATE_TABLE
+# Creating V2 tables should not set write mode if user specified any of it to 
any value.
+CREATE TABLE ice_v2_explicit_delete_2 (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2', 'write.delete.mode'='copy-on-write')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_delete_2 (i 
INT NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 
'write.delete.mode'='copy-on-write')
+====
+---- CREATE_TABLE
+# Creating V2 tables should not set write mode if user specified any of it to 
any value.
+CREATE TABLE ice_v2_explicit_update (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2', 'write.update.mode'='copy-on-write')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_update (i INT 
NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 
'write.update.mode'='copy-on-write')
+====
+---- CREATE_TABLE
+# Creating V2 tables should not set write mode if user specified any of it to 
any value.
+CREATE TABLE ice_v2_explicit_merge (i int)
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2', 'write.merge.mode'='copy-on-write')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_merge (i INT 
NULL)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
+ 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 
'write.merge.mode'='copy-on-write')
+====

Reply via email to