This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 44c0e00f59f469ba5435351861a51381c753b4c4 Author: Peter Rozsa <[email protected]> AuthorDate: Thu Oct 6 10:09:39 2022 +0200 IMPALA-9460: ADD PARTITION doesn't accept SET FORMAT This change adds the possibility to set the partition file format at creation by allowing to use the SET FORMAT clause at the end of the ALTER TABLE ADD statement. The file format specification will be applied to all listed partitions, for example: ALTER TABLE tbl ADD PARTITION (i=1) PARTITION (i=2) SET FILEFORMAT PARQUET; will create two Parquet-formatted partition metadata entries. Tests: - analyzer test for the new syntax element - e2e test for adding partition in a specified format Change-Id: I2f78cc3c7eba25383128cd8fd881dd41ddea8b69 Reviewed-on: http://gerrit.cloudera.org:8080/19099 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- fe/src/main/cup/sql-parser.cup | 3 +++ .../analysis/AlterTableAddPartitionStmt.java | 24 ++++++++++++++++++++++ .../apache/impala/service/CatalogOpExecutor.java | 14 +++++++++++-- .../org/apache/impala/analysis/ParserTest.java | 1 + tests/metadata/test_ddl.py | 23 +++++++++++++++++++++ 5 files changed, 63 insertions(+), 2 deletions(-) diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 9656ceb40..52e4412bb 100755 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -1206,6 +1206,9 @@ alter_tbl_stmt ::= | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists partition_def_list:partitions {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions); :} + | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists + partition_def_list:partitions KW_SET KW_FILEFORMAT file_format_val:file_format + {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions, file_format); :} | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name {: RESULT = new AlterTableDropColStmt(table, col_name); :} | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java index efdd7e1d6..4ea4b9353 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java @@ -26,7 +26,9 @@ import org.apache.impala.catalog.FeTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TAlterTableAddPartitionParams; import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableSetFileFormatParams; import org.apache.impala.thrift.TAlterTableType; +import org.apache.impala.thrift.THdfsFileFormat; import java.util.HashSet; import java.util.List; @@ -39,6 +41,8 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt { private final boolean ifNotExists_; private final List<PartitionDef> partitions_; + private THdfsFileFormat fileFormat; + public AlterTableAddPartitionStmt(TableName tableName, boolean ifNotExists, List<PartitionDef> partitions) { super(tableName); @@ -55,6 +59,12 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt { } } + public AlterTableAddPartitionStmt(TableName tableName, + boolean ifNotExists, List<PartitionDef> partitions, THdfsFileFormat fileFormat) { + this(tableName, ifNotExists, partitions); + this.fileFormat = fileFormat; + } + public boolean getIfNotExists() { return ifNotExists_; } @Override @@ -64,6 +74,12 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt { sb.append(getTbl()).append(" ADD"); if (ifNotExists_) sb.append(" IF NOT EXISTS"); for (PartitionDef p : partitions_) sb.append(" " + p.toSql(options)); + + if (fileFormat != null) { + sb.append(" SET FILEFORMAT "); + sb.append(fileFormat); + } + return sb.toString(); } @@ -75,6 +91,14 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt { TAlterTableParams params = super.toThrift(); params.setAlter_type(TAlterTableType.ADD_PARTITION); params.setAdd_partition_params(addPartParams); + + if (fileFormat != null) { + TAlterTableSetFileFormatParams fileFormatParams = + new TAlterTableSetFileFormatParams(); + fileFormatParams.setFile_format(fileFormat); + params.setSet_file_format_params(fileFormatParams); + } + return params; } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 403b204b5..cc734318f 100755 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1049,7 +1049,12 @@ public class CatalogOpExecutor { // Create and add HdfsPartition objects to the corresponding HdfsTable and load // their block metadata. Get the new table object with an updated catalog // version. - refreshedTable = alterTableAddPartitions(tbl, params.getAdd_partition_params()); + THdfsFileFormat format = null; + if(params.isSetSet_file_format_params()) { + format = params.getSet_file_format_params().file_format; + } + refreshedTable = alterTableAddPartitions(tbl, params.getAdd_partition_params(), + format); if (refreshedTable != null) { refreshedTable.setCatalogVersion(newCatalogVersion); // the alter table event is only generated when we add the partition. For @@ -4046,7 +4051,8 @@ public class CatalogOpExecutor { * catalog cache and the HMS. */ private Table alterTableAddPartitions(Table tbl, - TAlterTableAddPartitionParams addPartParams) throws ImpalaException { + TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat) + throws ImpalaException { Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); TableName tableName = tbl.getTableName(); @@ -4072,6 +4078,10 @@ public class CatalogOpExecutor { createHmsPartition(partitionSpec, msTbl, tableName, partParams.getLocation()); allHmsPartitionsToAdd.add(hmsPartition); + if (fileFormat != null) { + setStorageDescriptorFileFormat(hmsPartition.getSd(), fileFormat); + } + THdfsCachingOp cacheOp = partParams.getCache_op(); if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), cacheOp); } diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index db78a3d70..524041c4f 100755 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -2305,6 +2305,7 @@ public class ParserTest extends FrontendTestBase { ParsesOk("ALTER TABLE Foo ADD PARTITION (i=NULL)"); ParsesOk("ALTER TABLE Foo ADD PARTITION (i=NULL, j=2, k=NULL)"); ParsesOk("ALTER TABLE Foo ADD PARTITION (i=abc, j=(5*8+10), k=!true and false)"); + ParsesOk("ALTER TABLE Foo ADD PARTITION (i=1) SET FILEFORMAT PARQUET"); // Multiple partition specs ParsesOk("ALTER TABLE Foo ADD PARTITION (i=1, s='one') " + diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index e4fd873a4..ba590f8b5 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -655,6 +655,29 @@ class TestDdlStatements(TestDdlBase): "insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name)) assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name)) + @SkipIfLocal.hdfs_client + def test_alter_table_set_fileformat(self, vector, unique_database): + # Tests that SET FILEFORMAT clause is set for ALTER TABLE ADD PARTITION statement + fq_tbl_name = unique_database + ".p_fileformat" + self.client.execute( + "create table {0}(i int) partitioned by (p int)".format(fq_tbl_name)) + + # Add a partition with Parquet fileformat + self.execute_query_expect_success(self.client, + "alter table {0} add partition(p=1) set fileformat parquet" + .format(fq_tbl_name)) + + # Add two partitions with ORC fileformat + self.execute_query_expect_success(self.client, + "alter table {0} add partition(p=2) partition(p=3) set fileformat orc" + .format(fq_tbl_name)) + + result = self.execute_query_expect_success(self.client, + "SHOW PARTITIONS %s" % fq_tbl_name) + + assert 1 == len(filter(lambda line: line.find("PARQUET") != -1, result.data)) + assert 2 == len(filter(lambda line: line.find("ORC") != -1, result.data)) + def test_alter_table_create_many_partitions(self, vector, unique_database): """ Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC
