This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bfb1225f IMPALA-12058: Impala throw exception for CTAS with non 
unique primary key
7bfb1225f is described below

commit 7bfb1225f3dbf46f424b350c14142c8359af3046
Author: wzhou-code <[email protected]>
AuthorDate: Tue Apr 11 19:54:42 2023 -0700

    IMPALA-12058: Impala throw exception for CTAS with non unique primary key
    
    The auto-incrementing column is added to target table by Impala frontend
    for CTAS statement when creating Kudu table with non unique primary key
    so that the Impala internal table has same layout as the table created
    by Kudu engine. This is required for insertion operation of CTAS.
    But auto-incrementing column is not in expression. It causes Planner
    throws exception when computing Lineage graph for the column.
    
    This patch fixes the issue by omiting auto-incrementing column for Kudu
    table when adding target column labels for computing Lineage graph.
    
    Testing:
     - Added end-to-end test case for creating Kudu tables with Lineage
       enabled.
     - Passed core tests.
    
    Change-Id: Ic87b7fac8eb857e9dc5c59bdb448270b23f6dd5a
    Reviewed-on: http://gerrit.cloudera.org:8080/19725
    Reviewed-by: Kurt Deschler <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../main/java/org/apache/impala/planner/Planner.java   | 18 +++++++++++++++++-
 .../java/org/apache/impala/planner/PlannerContext.java |  1 +
 tests/custom_cluster/test_kudu.py                      | 13 +++++++++++++
 3 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 260d1e8cd..891a6004f 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -54,6 +54,7 @@ import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MathUtil;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
+import org.apache.kudu.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -219,7 +220,22 @@ public class Planner {
             }
             graph.addTargetColumnLabels(targetColLabels);
           } else {
-            graph.addTargetColumnLabels(targetTable);
+            Preconditions.checkState(ctx_.isCtas());
+            if (((FeKuduTable)targetTable).hasAutoIncrementingColumn()) {
+              // Omit auto-incrementing column for Kudu table since the column 
is not in
+              // expression. The auto-incrementing column is only added to 
target table
+              // for CTAS statement so that the table has same layout as the 
table
+              // created by Kudu engine. We don't need to compute Lineage 
graph for the
+              // column.
+              List<ColumnLabel> targetColLabels = new ArrayList<>();
+              for (String column: targetTable.getColumnNames()) {
+                if (column.equals(Schema.getAutoIncrementingColumnName())) 
continue;
+                targetColLabels.add(new ColumnLabel(column, 
targetTable.getTableName()));
+              }
+              graph.addTargetColumnLabels(targetColLabels);
+            } else {
+              graph.addTargetColumnLabels(targetTable);
+            }
           }
         } else if (targetTable instanceof FeHBaseTable) {
           graph.addTargetColumnLabels(targetTable);
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java 
b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 7d77678bc..802b73893 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -106,6 +106,7 @@ public class PlannerContext {
     return analysisResult_.isInsertStmt() || 
analysisResult_.isCreateTableAsSelectStmt();
   }
   public boolean isInsert() { return analysisResult_.isInsertStmt(); }
+  public boolean isCtas() { return 
analysisResult_.isCreateTableAsSelectStmt(); }
   public boolean isUpdateOrDelete() {
     return analysisResult_.isUpdateStmt() || analysisResult_.isDeleteStmt(); }
   public boolean isQuery() { return analysisResult_.isQueryStmt(); }
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 1e3d39a8b..98772d69c 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import os
 import pytest
+import tempfile
 from kudu.schema import INT32
 from time import sleep
 
@@ -139,6 +140,8 @@ class TestKuduClientTimeout(CustomKuduTest):
 
 
 class TestKuduHMSIntegration(CustomKuduTest):
+  START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
+
   # TODO(IMPALA-8614): parameterize the common tests in query_test/test_kudu.py
   # to run with HMS integration enabled. Also avoid restarting Impala to reduce
   # tests time.
@@ -173,6 +176,16 @@ class TestKuduHMSIntegration(CustomKuduTest):
     vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
     self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
 
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  
@CustomClusterTestSuite.with_args(impalad_args="-kudu_client_rpc_timeout_ms=30000
 "
+                                    "--lineage_event_log_dir={0}"
+                                    .format(START_END_TIME_LINEAGE_LOG_DIR))
+  def test_create_kudu_tables_with_lineage_enabled(self, vector, 
unique_database):
+    """Same as above test_create_managed_kudu_tables, but with lineage 
enabled."""
+    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
+    self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
+
   @pytest.mark.execute_serially
   def test_implicit_external_table_props(self, cursor, kudu_client):
     """Check that table properties added internally for external table during

Reply via email to