This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit dc912f016e4804331e5a9fe2960f8c89376195b3 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Thu Oct 13 18:02:46 2022 +0200 IMPALA-11655: Impala should set write mode "merge-on-read" by default Similarly to HIVE-26596 Impala should set merge-on-read write mode for V2 tables, unless otherwise specified: * during table creation with 'format-version'='2' * during alter table set tblproperties 'format-version'='2' We do so because in the foreseeable future Impala will only support merge-on-read (on the write-side, on the read side copy-on-write is also supported). Also, currently Hive only supports merge-on-read. Testing: * e2e tests added Change-Id: Icaa32472cde98e21fb23f5461175db1bf401db3d Reviewed-on: http://gerrit.cloudera.org:8080/19138 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/analysis/CreateTableStmt.java | 23 ++++++ .../org/apache/impala/catalog/IcebergTable.java | 6 ++ .../apache/impala/service/CatalogOpExecutor.java | 24 ++++++ .../java/org/apache/impala/util/IcebergUtil.java | 11 +++ .../queries/QueryTest/iceberg-alter.test | 85 ++++++++++++++++++++++ .../queries/QueryTest/show-create-table.test | 74 +++++++++++++++++++ 6 files changed, 223 insertions(+) diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java index c011b3bfd..819981a84 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java @@ -624,6 +624,7 @@ public class CreateTableStmt extends StatementBase { putGeneratedProperty(IcebergTable.KEY_STORAGE_HANDLER, IcebergTable.ICEBERG_STORAGE_HANDLER); putGeneratedProperty(TableProperties.ENGINE_HIVE_ENABLED, "true"); + addMergeOnReadPropertiesIfNeeded(); String fileformat = getTblProperties().get(IcebergTable.ICEBERG_FILE_FORMAT); TIcebergFileFormat icebergFileFormat = IcebergUtil.getIcebergFileFormat(fileformat); @@ -652,6 +653,28 @@ public class CreateTableStmt extends StatementBase { validateIcebergTableProperties(catalog); } + /** + * When creating an Iceberg table that supports row-level modifications + * (format-version >= 2) we set write modes to "merge-on-read" which is the write + * mode Impala will eventually support (IMPALA-11664). + */ + private void addMergeOnReadPropertiesIfNeeded() { + Map<String, String> tblProps = getTblProperties(); + String formatVersion = tblProps.get(TableProperties.FORMAT_VERSION); + if (formatVersion == null || + Integer.valueOf(formatVersion) < IcebergTable.ICEBERG_FORMAT_V2) { + return; + } + + // Only add "merge-on-read" if none of the write modes are specified. + final String MERGE_ON_READ = IcebergTable.MERGE_ON_READ; + if (!IcebergUtil.isAnyWriteModeSet(tblProps)) { + putGeneratedProperty(TableProperties.DELETE_MODE, MERGE_ON_READ); + putGeneratedProperty(TableProperties.UPDATE_MODE, MERGE_ON_READ); + putGeneratedProperty(TableProperties.MERGE_MODE, MERGE_ON_READ); + } + } + private void validateIcebergParquetCompressionCodec( TIcebergFileFormat icebergFileFormat) throws AnalysisException { if (icebergFileFormat != TIcebergFileFormat.PARQUET) { diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 2b36bfa55..b10449d23 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -75,6 +75,10 @@ public class IcebergTable extends Table implements FeIcebergTable { // Iceberg catalog type key in tblproperties public static final String ICEBERG_CATALOG = "iceberg.catalog"; + // Iceberg format version numbers + public static final int ICEBERG_FORMAT_V1 = 1; + public static final int ICEBERG_FORMAT_V2 = 2; + // Iceberg table catalog location key in tblproperties when using HadoopCatalog // This property is necessary for both managed and external Iceberg table with // 'hadoop.catalog' @@ -94,6 +98,8 @@ public class IcebergTable extends Table implements FeIcebergTable { public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"; + public static final String MERGE_ON_READ = "merge-on-read"; + // Default values for parquet compression codec. public static final THdfsCompression DEFAULT_PARQUET_COMPRESSION_CODEC = THdfsCompression.SNAPPY; diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index cc734318f..04e525c6d 100755 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.mr.Catalogs; import org.apache.impala.analysis.AlterTableSortByStmt; @@ -1412,10 +1413,33 @@ public class CatalogOpExecutor { TAlterTableSetTblPropertiesParams setPropsParams = params.getSet_tbl_properties_params(); if (setPropsParams.getTarget() != TTablePropertyType.TBL_PROPERTY) return false; + + addMergeOnReadPropertiesIfNeeded(tbl, setPropsParams.getProperties()); IcebergCatalogOpExecutor.setTblProperties(iceTxn, setPropsParams.getProperties()); return true; } + /** + * Iceberg format from V2 supports row-level modifications. We set write modes to + * "merge-on-read" which is the write mode Impala will eventually + * support (IMPALA-11664). Unless the user specified otherwise in the table properties. + */ + private void addMergeOnReadPropertiesIfNeeded(IcebergTable tbl, + Map<String, String> properties) { + String formatVersion = properties.get(TableProperties.FORMAT_VERSION); + if (formatVersion == null || + Integer.valueOf(formatVersion) < IcebergTable.ICEBERG_FORMAT_V2) { + return; + } + if (!IcebergUtil.isAnyWriteModeSet(properties) && + !IcebergUtil.isAnyWriteModeSet(tbl.getMetaStoreTable().getParameters())) { + final String MERGE_ON_READ = IcebergTable.MERGE_ON_READ; + properties.put(TableProperties.DELETE_MODE, MERGE_ON_READ); + properties.put(TableProperties.UPDATE_MODE, MERGE_ON_READ); + properties.put(TableProperties.MERGE_MODE, MERGE_ON_READ); + } + } + /** * Unsets table properties for an Iceberg table. Returns true on success, returns false * if the operation is not applicable at the Iceberg table level, e.g. setting SERDE diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index b4b4a7bda..f3185c4bb 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -51,6 +51,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.TableIdentifier; @@ -832,6 +833,16 @@ public class IcebergUtil { return IcebergTable.UNSET_PARQUET_ROW_GROUP_SIZE; } + /** + * @return true if any of the write modes is being set in 'tblProperties'. + */ + public static boolean isAnyWriteModeSet(Map<String, String> tblProperties) { + return + tblProperties.get(TableProperties.DELETE_MODE) != null || + tblProperties.get(TableProperties.UPDATE_MODE) != null || + tblProperties.get(TableProperties.MERGE_MODE) != null; + } + public static Long parseParquetPageSize(Map<String, String> tblProperties, String property, String descr, StringBuilder errMsg) { if (tblProperties.containsKey(property)) { diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test index 12518af2b..df2e7adb9 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test @@ -351,3 +351,88 @@ DESCRIBE FORMATTED iceberg_changing_parq_tblprops; ---- TYPES string, string, string ==== +---- QUERY +CREATE TABLE iceberg_upgrade_v2_no_write_mode (i INT) STORED AS ICEBERG; +DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode; +---- RESULTS: VERIFY_IS_NOT_IN +'','write.delete.mode ','merge-on-read ' +'','write.update.mode ','merge-on-read ' +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +# Setting format-version to 1 doesn't add write modes. +ALTER TABLE iceberg_upgrade_v2_no_write_mode SET TBLPROPERTIES('format-version'='1'); +DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode; +---- RESULTS: VERIFY_IS_NOT_IN +'','write.delete.mode ','merge-on-read ' +'','write.update.mode ','merge-on-read ' +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +ALTER TABLE iceberg_upgrade_v2_no_write_mode SET TBLPROPERTIES('format-version'='2'); +DESCRIBE FORMATTED iceberg_upgrade_v2_no_write_mode; +---- RESULTS: VERIFY_IS_SUBSET +'','write.delete.mode ','merge-on-read ' +'','write.update.mode ','merge-on-read ' +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +CREATE TABLE iceberg_upgrade_v2_delete_mode (i INT) STORED AS ICEBERG; +ALTER TABLE iceberg_upgrade_v2_delete_mode +SET TBLPROPERTIES('format-version'='2', 'write.delete.mode'='copy-on-write'); +DESCRIBE FORMATTED iceberg_upgrade_v2_delete_mode; +---- RESULTS: VERIFY_IS_SUBSET +'','write.delete.mode ','copy-on-write ' +---- TYPES +string, string, string +==== +---- QUERY +DESCRIBE FORMATTED iceberg_upgrade_v2_delete_mode; +---- RESULTS: VERIFY_IS_NOT_IN +'','write.update.mode ','merge-on-read ' +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +CREATE TABLE iceberg_upgrade_v2_update_mode (i INT) STORED AS ICEBERG; +ALTER TABLE iceberg_upgrade_v2_update_mode +SET TBLPROPERTIES('format-version'='2', 'write.update.mode'='copy-on-write'); +DESCRIBE FORMATTED iceberg_upgrade_v2_update_mode; +---- RESULTS: VERIFY_IS_SUBSET +'','write.update.mode ','copy-on-write ' +---- TYPES +string, string, string +==== +---- QUERY +DESCRIBE FORMATTED iceberg_upgrade_v2_update_mode; +---- RESULTS: VERIFY_IS_NOT_IN +'','write.delete.mode ','merge-on-read ' +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +CREATE TABLE iceberg_upgrade_v2_merge_mode (i INT) STORED AS ICEBERG; +ALTER TABLE iceberg_upgrade_v2_merge_mode +SET TBLPROPERTIES('format-version'='2', 'write.merge.mode'='merge-on-read'); +DESCRIBE FORMATTED iceberg_upgrade_v2_merge_mode; +---- RESULTS: VERIFY_IS_SUBSET +'','write.merge.mode ','merge-on-read ' +---- TYPES +string, string, string +==== +---- QUERY +DESCRIBE FORMATTED iceberg_upgrade_v2_merge_mode; +---- RESULTS: VERIFY_IS_NOT_IN +'','write.update.mode ','merge-on-read ' +'','write.delete.mode ','merge-on-read ' +---- TYPES +string, string, string +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test index ed1809229..c2485aafa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test +++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test @@ -938,3 +938,77 @@ LOCATION '$$location_uri$$' TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', 'engine.hive.enabled'='true', 'table_type'='ICEBERG') ==== +---- CREATE_TABLE +# Creating V1 tables explicitly should not set 'merge-on-read' write modes if no write mode is +# specified. +CREATE TABLE ice_explicit_v1 (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='1') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_explicit_v1 (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG') +==== +---- CREATE_TABLE +# Creating V2 tables should set 'merge-on-read' write modes if no write mode is specified. +CREATE TABLE ice_v2 (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='2') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2 (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read') +==== +---- CREATE_TABLE +# Creating V2 tables should not set write mode if user specified any of it to any value. +CREATE TABLE ice_v2_explicit_delete (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='2', 'write.delete.mode'='merge-on-read') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_delete (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.delete.mode'='merge-on-read') +==== +---- CREATE_TABLE +# Creating V2 tables should not set write mode if user specified any of it to any value. +CREATE TABLE ice_v2_explicit_delete_2 (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='2', 'write.delete.mode'='copy-on-write') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_delete_2 (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.delete.mode'='copy-on-write') +==== +---- CREATE_TABLE +# Creating V2 tables should not set write mode if user specified any of it to any value. +CREATE TABLE ice_v2_explicit_update (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='2', 'write.update.mode'='copy-on-write') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_update (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.update.mode'='copy-on-write') +==== +---- CREATE_TABLE +# Creating V2 tables should not set write mode if user specified any of it to any value. +CREATE TABLE ice_v2_explicit_merge (i int) +STORED AS ICEBERG +TBLPROPERTIES('format-version'='2', 'write.merge.mode'='copy-on-write') +---- RESULTS-HIVE-3 +CREATE EXTERNAL TABLE show_create_table_test_db.ice_v2_explicit_merge (i INT NULL) +STORED AS ICEBERG +LOCATION '$$location_uri$$' +TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet', + 'engine.hive.enabled'='true', 'table_type'='ICEBERG', 'write.merge.mode'='copy-on-write') +====
