This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 44c0e00f59f469ba5435351861a51381c753b4c4
Author: Peter Rozsa <[email protected]>
AuthorDate: Thu Oct 6 10:09:39 2022 +0200

    IMPALA-9460: ADD PARTITION doesn't accept SET FORMAT
    
    This change adds the possibility to set the partition file format
    at creation by allowing to use the SET FORMAT clause at the end of
    the ALTER TABLE ADD statement. The file format specification will
    be applied to all listed partitions, for example:
    ALTER TABLE tbl ADD PARTITION (i=1) PARTITION (i=2) SET FILEFORMAT
    PARQUET; will create two Parquet-formatted partition metadata
    entries.
    
    Tests:
     - analyzer test for the new syntax element
     - e2e test for adding partition in a specified format
    
    Change-Id: I2f78cc3c7eba25383128cd8fd881dd41ddea8b69
    Reviewed-on: http://gerrit.cloudera.org:8080/19099
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 fe/src/main/cup/sql-parser.cup                     |  3 +++
 .../analysis/AlterTableAddPartitionStmt.java       | 24 ++++++++++++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 14 +++++++++++--
 .../org/apache/impala/analysis/ParserTest.java     |  1 +
 tests/metadata/test_ddl.py                         | 23 +++++++++++++++++++++
 5 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 9656ceb40..52e4412bb 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -1206,6 +1206,9 @@ alter_tbl_stmt ::=
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
     partition_def_list:partitions
   {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, 
partitions); :}
+  | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
+    partition_def_list:partitions KW_SET KW_FILEFORMAT 
file_format_val:file_format
+  {: RESULT = new AlterTableAddPartitionStmt(table, if_not_exists, partitions, 
file_format); :}
   | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column 
ident_or_default:col_name
   {: RESULT = new AlterTableDropColStmt(table, col_name); :}
   | KW_ALTER KW_TABLE table_name:table KW_ADD if_not_exists_val:if_not_exists
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
index efdd7e1d6..4ea4b9353 100644
--- 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
@@ -26,7 +26,9 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableParams;
+import org.apache.impala.thrift.TAlterTableSetFileFormatParams;
 import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.THdfsFileFormat;
 
 import java.util.HashSet;
 import java.util.List;
@@ -39,6 +41,8 @@ public class AlterTableAddPartitionStmt extends 
AlterTableStmt {
   private final boolean ifNotExists_;
   private final List<PartitionDef> partitions_;
 
+  private THdfsFileFormat fileFormat;
+
   public AlterTableAddPartitionStmt(TableName tableName,
       boolean ifNotExists, List<PartitionDef> partitions) {
     super(tableName);
@@ -55,6 +59,12 @@ public class AlterTableAddPartitionStmt extends 
AlterTableStmt {
     }
   }
 
+  public AlterTableAddPartitionStmt(TableName tableName,
+      boolean ifNotExists, List<PartitionDef> partitions, THdfsFileFormat 
fileFormat) {
+    this(tableName, ifNotExists, partitions);
+    this.fileFormat = fileFormat;
+  }
+
   public boolean getIfNotExists() { return ifNotExists_; }
 
   @Override
@@ -64,6 +74,12 @@ public class AlterTableAddPartitionStmt extends 
AlterTableStmt {
     sb.append(getTbl()).append(" ADD");
     if (ifNotExists_) sb.append(" IF NOT EXISTS");
     for (PartitionDef p : partitions_) sb.append(" " + p.toSql(options));
+
+    if (fileFormat != null) {
+      sb.append(" SET FILEFORMAT ");
+      sb.append(fileFormat);
+    }
+
     return sb.toString();
   }
 
@@ -75,6 +91,14 @@ public class AlterTableAddPartitionStmt extends 
AlterTableStmt {
     TAlterTableParams params = super.toThrift();
     params.setAlter_type(TAlterTableType.ADD_PARTITION);
     params.setAdd_partition_params(addPartParams);
+
+    if (fileFormat != null) {
+      TAlterTableSetFileFormatParams fileFormatParams =
+          new TAlterTableSetFileFormatParams();
+      fileFormatParams.setFile_format(fileFormat);
+      params.setSet_file_format_params(fileFormatParams);
+    }
+
     return params;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 403b204b5..cc734318f 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1049,7 +1049,12 @@ public class CatalogOpExecutor {
           // Create and add HdfsPartition objects to the corresponding 
HdfsTable and load
           // their block metadata. Get the new table object with an updated 
catalog
           // version.
-          refreshedTable = alterTableAddPartitions(tbl, 
params.getAdd_partition_params());
+          THdfsFileFormat format = null;
+          if(params.isSetSet_file_format_params()) {
+            format = params.getSet_file_format_params().file_format;
+          }
+          refreshedTable = alterTableAddPartitions(tbl, 
params.getAdd_partition_params(),
+              format);
           if (refreshedTable != null) {
             refreshedTable.setCatalogVersion(newCatalogVersion);
             // the alter table event is only generated when we add the 
partition. For
@@ -4046,7 +4051,8 @@ public class CatalogOpExecutor {
    * catalog cache and the HMS.
    */
   private Table alterTableAddPartitions(Table tbl,
-      TAlterTableAddPartitionParams addPartParams) throws ImpalaException {
+      TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat)
+      throws ImpalaException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
 
     TableName tableName = tbl.getTableName();
@@ -4072,6 +4078,10 @@ public class CatalogOpExecutor {
           createHmsPartition(partitionSpec, msTbl, tableName, 
partParams.getLocation());
       allHmsPartitionsToAdd.add(hmsPartition);
 
+      if (fileFormat != null) {
+        setStorageDescriptorFileFormat(hmsPartition.getSd(), fileFormat);
+      }
+
       THdfsCachingOp cacheOp = partParams.getCache_op();
       if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), 
cacheOp);
     }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index db78a3d70..524041c4f 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2305,6 +2305,7 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=NULL)");
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=NULL, j=2, k=NULL)");
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=abc, j=(5*8+10), k=!true and 
false)");
+    ParsesOk("ALTER TABLE Foo ADD PARTITION (i=1) SET FILEFORMAT PARQUET");
 
     // Multiple partition specs
     ParsesOk("ALTER TABLE Foo ADD PARTITION (i=1, s='one') " +
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index e4fd873a4..ba590f8b5 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -655,6 +655,29 @@ class TestDdlStatements(TestDdlBase):
         "insert into table {0} partition(j=1, s='1') select 
1".format(fq_tbl_name))
     assert '1' == self.execute_scalar("select count(*) from 
{0}".format(fq_tbl_name))
 
+  @SkipIfLocal.hdfs_client
+  def test_alter_table_set_fileformat(self, vector, unique_database):
+    # Tests that SET FILEFORMAT clause is set for ALTER TABLE ADD PARTITION 
statement
+    fq_tbl_name = unique_database + ".p_fileformat"
+    self.client.execute(
+        "create table {0}(i int) partitioned by (p int)".format(fq_tbl_name))
+
+    # Add a partition with Parquet fileformat
+    self.execute_query_expect_success(self.client,
+        "alter table {0} add partition(p=1) set fileformat parquet"
+        .format(fq_tbl_name))
+
+    # Add two partitions with ORC fileformat
+    self.execute_query_expect_success(self.client,
+        "alter table {0} add partition(p=2) partition(p=3) set fileformat orc"
+        .format(fq_tbl_name))
+
+    result = self.execute_query_expect_success(self.client,
+        "SHOW PARTITIONS %s" % fq_tbl_name)
+
+    assert 1 == len(filter(lambda line: line.find("PARQUET") != -1, 
result.data))
+    assert 2 == len(filter(lambda line: line.find("ORC") != -1, result.data))
+
   def test_alter_table_create_many_partitions(self, vector, unique_database):
     """
     Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC

Reply via email to