This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb0e2bbf9020af68c8cbce8baa87f37a66071653
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Sep 12 17:07:41 2023 +0200

    IMPALA-12313: (part 1) Refactor modify statements
    
    This change refactors the classes and methods that implement
    modify statements like DELETE and UPDATE. ModifyStmt, DeleteStmt,
    UpdateStmt are created during parsing and contain information about
    the statement: FROM clause, WHERE clause, target table, etc.
    
    The logic that actually implements these operations is dependent
    on the type of the target table. Therefore during analysis, after
    the target table is resolved, we create the *Impl object (e.g.
    IcebergDeleteImpl, KuduUpdateImpl) that implements the logic. The
    impl object is in charge of creating the source statement of the
    operation, doing the necessary rewrites/masking, and also creating
    the data sink.
    
    Testing:
     * N/A: no new functionality / bug fix
    
    Change-Id: If15f64944f2e23064b7112ad5930abc775dd65ec
    Reviewed-on: http://gerrit.cloudera.org:8080/20477
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/analysis/DeleteStmt.java     |  25 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   2 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  |  44 +++
 .../apache/impala/analysis/IcebergModifyImpl.java  | 101 ++++++
 .../org/apache/impala/analysis/KuduDeleteImpl.java |  46 +++
 .../org/apache/impala/analysis/KuduModifyImpl.java |  63 ++++
 .../org/apache/impala/analysis/KuduUpdateImpl.java |  45 +++
 .../org/apache/impala/analysis/ModifyImpl.java     | 268 +++++++++++++++
 .../org/apache/impala/analysis/ModifyStmt.java     | 361 +++------------------
 .../org/apache/impala/analysis/UpdateStmt.java     |  23 +-
 .../java/org/apache/impala/planner/Planner.java    |   2 +-
 11 files changed, 639 insertions(+), 341 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 47164fb43..a02170fb6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -20,6 +20,9 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
@@ -52,21 +55,21 @@ public class DeleteStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_.clone());
   }
 
+  @Override
+  protected void createModifyImpl() {
+    if (table_ instanceof FeKuduTable) {
+      modifyImpl_ = new KuduDeleteImpl(this);
+    } else if (table_ instanceof FeIcebergTable) {
+      modifyImpl_ = new IcebergDeleteImpl(this);
+    }
+  }
+
   public DataSink createDataSink() {
-    // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
-        partitionKeyExprs_, resultExprs_, referencedColumns_, false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        maxTableSinks_);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return tableSink;
+    return modifyImpl_.createDataSink();
   }
 
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer 
analyzer) {
-    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
-    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, 
analyzer, true);
+    modifyImpl_.substituteResultExprs(smap, analyzer);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java 
b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
index 1afcd98b8..cf3dfd63d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
@@ -53,6 +53,8 @@ public abstract class DmlStatementBase extends StatementBase {
   }
 
   public FeTable getTargetTable() { return table_; }
+
+  protected void setTargetTable(FeTable tbl) { table_ = tbl; }
   public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = 
maxTableSinks; }
 
   public boolean hasShuffleHint() { return false; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
new file mode 100644
index 000000000..f32ce3ff2
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class IcebergDeleteImpl extends IcebergModifyImpl {
+  public IcebergDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, 
TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, 
null,
+        modifyStmt_.maxTableSinks_);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
new file mode 100644
index 000000000..30b4c375e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergPositionDeleteTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TIcebergFileFormat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class IcebergModifyImpl extends ModifyImpl {
+  FeIcebergTable originalTargetTable_;
+  IcebergPositionDeleteTable icePosDelTable_;
+
+  public IcebergModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    originalTargetTable_ = (FeIcebergTable)modifyStmt_.getTargetTable();
+    icePosDelTable_ = new IcebergPositionDeleteTable(originalTargetTable_);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    // Make the virtual position delete table the new target table.
+    modifyStmt_.setTargetTable(icePosDelTable_);
+    
modifyStmt_.setMaxTableSinks(analyzer.getQueryOptions().getMax_fs_writers());
+    if (modifyStmt_ instanceof UpdateStmt) {
+      throw new AnalysisException("UPDATE is not supported for Iceberg table " 
+
+          originalTargetTable_.getFullName());
+    }
+
+    if (icePosDelTable_.getFormatVersion() == 1) {
+      throw new AnalysisException("Iceberg V1 table do not support 
DELETE/UPDATE " +
+          "operations: " + originalTargetTable_.getFullName());
+    }
+
+    String deleteMode = 
originalTargetTable_.getIcebergApiTable().properties().get(
+        org.apache.iceberg.TableProperties.DELETE_MODE);
+    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
+      throw new AnalysisException(String.format("Unsupported delete mode: '%s' 
for " +
+          "Iceberg table: %s", deleteMode, 
originalTargetTable_.getFullName()));
+    }
+
+    if (originalTargetTable_.getDeleteFileFormat() != 
TIcebergFileFormat.PARQUET) {
+      throw new AnalysisException("Impala can only write delete files in 
PARQUET, " +
+          "but the given table uses a different file format: " +
+          originalTargetTable_.getFullName());
+    }
+
+    Expr wherePredicate = modifyStmt_.getWherePredicate();
+    if (wherePredicate == null ||
+        org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) 
{
+      // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE 
TABLE t;
+      throw new AnalysisException("For deleting every row, please use 
TRUNCATE.");
+    }
+  }
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    if (originalTargetTable_.isPartitioned()) {
+      String[] partitionCols;
+      partitionCols = new String[] {"PARTITION__SPEC__ID",
+          "ICEBERG__PARTITION__SERIALIZED"};
+      for (String k : partitionCols) {
+        addPartitioningColumn(analyzer, selectList, referencedColumns, 
uniqueSlots,
+            keySlots, colIndexMap, k);
+      }
+    }
+    String[] deleteCols;
+    deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
+    // Add the key columns as slot refs
+    for (String k : deleteCols) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
+          colIndexMap, k, true);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
new file mode 100644
index 000000000..54625fb03
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduDeleteImpl extends KuduModifyImpl {
+  public KuduDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, 
TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(),
+        modifyStmt_.maxTableSinks_);
+        Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
new file mode 100644
index 000000000..76290e977
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class KuduModifyImpl extends ModifyImpl {
+  // Target Kudu table.
+  FeKuduTable kuduTable_;
+
+  public KuduModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    kuduTable_ = (FeKuduTable)modifyStmt.getTargetTable();
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {}
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    // cast result expressions to the correct type of the referenced slot of 
the
+    // target table
+    List<Pair<SlotRef, Expr>> assignments = modifyStmt_.getAssignments();
+    int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
+    for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
+      sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
+          assignments.get(i - keyColumnsOffset).first.getType()));
+    }
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    // Add the key columns as slot refs
+    for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
+          colIndexMap, k, false);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
new file mode 100644
index 000000000..57e8df262
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduUpdateImpl extends KuduModifyImpl {
+  public KuduUpdateImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    DataSink dataSink = TableSink.create(modifyStmt_.table_, 
TableSink.Op.UPDATE,
+        ImmutableList.<Expr>of(), sourceStmt_.getResultExprs(), 
getReferencedColumns(),
+        false, false, new Pair<>(ImmutableList.<Integer>of(), 
TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(), 0);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return dataSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
new file mode 100644
index 000000000..08b97c764
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import static java.lang.String.format;
+
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.KuduColumn;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.rewrite.ExprRewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract class for implementing a Modify statement such as DELETE or 
UPDATE. Child
+ * classes implement logic specific to target table types.
+ */
+abstract class ModifyImpl {
+  abstract void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException;
+
+  abstract void addKeyColumns(Analyzer analyzer,
+      List<SelectListItem> selectList, List<Integer> referencedColumns,
+      Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> 
colIndexMap)
+      throws AnalysisException;
+
+  abstract void analyze(Analyzer analyzer) throws AnalysisException;
+
+  abstract DataSink createDataSink();
+
+  // The Modify statement for this modify impl. The ModifyStmt class holds 
information
+  // about the statement (e.g. target table type, FROM, WHERE clause, etc.)
+  ModifyStmt modifyStmt_;
+  /////////////////////////////////////////
+  // START: Members that are set in createSourceStmt().
+
+  // Result of the analysis of the internal SelectStmt that produces the rows 
that
+  // will be modified.
+  protected SelectStmt sourceStmt_;
+
+  // Output expressions that produce the final results to write to the target 
table. May
+  // include casts.
+  //
+  // In case of DELETE statements it contains the columns that identify the 
deleted
+  // rows (Kudu primary keys, Iceberg file_path / position).
+  protected List<Expr> resultExprs_ = new ArrayList<>();
+
+  // Exprs corresponding to the partitionKeyValues, if specified, or to the 
partition
+  // columns for tables.
+  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
+
+  // For every column of the target table that is referenced in the optional
+  // 'sort.columns' table property, this list will contain the corresponding 
result expr
+  // from 'resultExprs_'. Before insertion, all rows will be sorted by these 
exprs. If the
+  // list is empty, no additional sorting by non-partitioning columns will be 
performed.
+  // The column list must not contain partition columns and must be empty for 
non-Hdfs
+  // tables.
+  protected List<Expr> sortExprs_ = new ArrayList<>();
+
+  // Position mapping of output expressions of the sourceStmt_ to column 
indices in the
+  // target table. The i'th position in this list maps to the 
referencedColumns_[i]'th
+  // position in the target table.
+  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  // END: Members that are set in first run of analyze
+  /////////////////////////////////////////
+
+  public ModifyImpl(ModifyStmt modifyStmt) {
+    modifyStmt_ = modifyStmt;
+  }
+
+  public void reset() {
+    if (sourceStmt_ != null) sourceStmt_.reset();
+  }
+
+  /**
+   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ 
contains
+   * first the SlotRefs for the key Columns, followed by the expressions 
representing the
+   * assignments. This method sets the member variables for the sourceStmt_ 
and the
+   * referencedColumns_.
+   *
+   * This only creates the sourceStmt_ once, following invocations will reuse 
the
+   * previously created statement.
+   */
+  protected void createSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    if (sourceStmt_ == null) {
+      // Builds the select list and column position mapping for the target 
table.
+      ArrayList<SelectListItem> selectList = new ArrayList<>();
+      buildAndValidateAssignmentExprs(analyzer, selectList);
+
+      // Analyze the generated select statement.
+      sourceStmt_ = new SelectStmt(new SelectList(selectList), 
modifyStmt_.fromClause_,
+          modifyStmt_.wherePredicate_,
+          null, null, null, null);
+
+      addCastsToAssignmentsInSourceStmt(analyzer);
+    }
+    sourceStmt_.analyze(analyzer);
+  }
+
+  /**
+   * Validates the list of value assignments that should be used to modify the 
target
+   * table. It verifies that only those columns are referenced that belong to 
the target
+   * table, no key columns are modified, and that a single column is not 
modified multiple
+   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list 
of
+   * SelectListItems to the out parameter selectList that is used to build the 
select list
+   * for sourceStmt_. A list of integers indicating the column position of an 
entry in the
+   * select list in the target table is written to the field 
referencedColumns_.
+   *
+   * In addition to the expressions that are generated for each assignment, the
+   * expression list contains an expression for each key column. The key 
columns
+   * are always prepended to the list of expression representing the 
assignments.
+   */
+  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
+      List<SelectListItem> selectList)
+      throws AnalysisException {
+    // The order of the referenced columns equals the order of the result 
expressions
+    Set<SlotId> uniqueSlots = new HashSet<>();
+    Set<SlotId> keySlots = new HashSet<>();
+
+    // Mapping from column name to index
+    List<Column> cols = modifyStmt_.table_.getColumnsInHiveOrder();
+    Map<String, Integer> colIndexMap = new HashMap<>();
+    for (int i = 0; i < cols.size(); i++) {
+      colIndexMap.put(cols.get(i).getName(), i);
+    }
+
+    addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
+        keySlots, colIndexMap);
+
+    // Assignments are only used in the context of updates.
+    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
+      SlotRef lhsSlotRef = valueAssignment.first;
+      lhsSlotRef.analyze(analyzer);
+
+      Expr rhsExpr = valueAssignment.second;
+      // No subqueries for rhs expression
+      if (rhsExpr.contains(Subquery.class)) {
+        throw new AnalysisException(
+            format("Subqueries are not supported as update expressions for 
column '%s'",
+                lhsSlotRef.toSql()));
+      }
+      rhsExpr.analyze(analyzer);
+
+      // Correct target table
+      if 
(!lhsSlotRef.isBoundByTupleIds(modifyStmt_.targetTableRef_.getId().asList())) {
+        throw new AnalysisException(
+            format("Left-hand side column '%s' in assignment expression 
'%s=%s' does not "
+                + "belong to target table '%s'", lhsSlotRef.toSql(), 
lhsSlotRef.toSql(),
+                rhsExpr.toSql(),
+                
modifyStmt_.targetTableRef_.getDesc().getTable().getFullName()));
+      }
+
+      Column c = lhsSlotRef.getResolvedPath().destColumn();
+      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
+      if (c == null) {
+        throw new AnalysisException(
+            format("Left-hand side in assignment expression '%s=%s' must be a 
column " +
+                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
+      }
+
+      if (keySlots.contains(lhsSlotRef.getSlotId())) {
+        boolean isSystemGeneratedColumn =
+            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
+        throw new AnalysisException(format("%s column '%s' cannot be updated.",
+            isSystemGeneratedColumn ? "System generated key" : "Key",
+            lhsSlotRef.toSql()));
+      }
+
+      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
+        throw new AnalysisException(
+            format("Duplicate value assignment to column: '%s'", 
lhsSlotRef.toSql()));
+      }
+
+      rhsExpr = StatementBase.checkTypeCompatibility(
+          modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
+          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
+      uniqueSlots.add(lhsSlotRef.getSlotId());
+      selectList.add(new SelectListItem(rhsExpr, null));
+      referencedColumns_.add(colIndexMap.get(c.getName()));
+    }
+  }
+
+  protected void addKeyColumn(Analyzer analyzer, List<SelectListItem> 
selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+      Map<String, Integer> colIndexMap, String colName, boolean 
isSortingColumn)
+      throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, 
uniqueSlots,
+        keySlots, colIndexMap, colName);
+    resultExprs_.add(ref);
+    if (isSortingColumn) sortExprs_.add(ref);
+  }
+
+  protected void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> 
selectList,
+  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, 
uniqueSlots,
+        keySlots, colIndexMap, colName);
+    partitionKeyExprs_.add(ref);
+    sortExprs_.add(ref);
+  }
+
+  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> 
selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
+      Map<String, Integer> colIndexMap, String colName) throws 
AnalysisException {
+    List<String> path = 
Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
+        colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    selectList.add(new SelectListItem(ref, null));
+    uniqueSlots.add(ref.getSlotId());
+    keySlots.add(ref.getSlotId());
+    referencedColumns.add(colIndexMap.get(colName));
+    return ref;
+  }
+
+  public List<Expr> getPartitionKeyExprs() {
+     return partitionKeyExprs_;
+  }
+
+  public List<Expr> getSortExprs() {
+    return sortExprs_;
+  }
+
+  public QueryStmt getQueryStmt() {
+    return sourceStmt_;
+  }
+
+  public List<Integer> getReferencedColumns() {
+    return referencedColumns_;
+  }
+
+  public void castResultExprs(List<Type> types) throws AnalysisException {
+    sourceStmt_.castResultExprs(types);
+  }
+
+  public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+    sourceStmt_.rewriteExprs(rewriter);
+  }
+
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer 
analyzer) {
+    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
+    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, 
analyzer, true);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 197f72bad..b3a798918 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -20,25 +20,16 @@ package org.apache.impala.analysis;
 import static java.lang.String.format;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.IcebergPositionDeleteTable;
-import org.apache.impala.catalog.KuduColumn;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.rewrite.ExprRewriter;
-import org.apache.impala.thrift.TIcebergFileFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -53,12 +44,21 @@ import com.google.common.base.Preconditions;
  *   - assignmentExprs (not null, can be empty)
  *   - wherePredicate (nullable)
  *
- * In the analysis phase, a SelectStmt is created with the result expressions 
set to
- * match the right-hand side of the assignments in addition to projecting the 
key columns
- * of the underlying table. During query execution, the plan that
- * is generated from this SelectStmt produces all rows that need to be 
modified.
+ * This class holds information from parsing and semantic analysis. Then it 
delegates
+ * implementation logic to the *Impl classes, e.g. KuduDeleteImpl, 
IcebergDeleteImpl,
+ * etc.
+ * In the analysis phase, the impl object creates a SelectStmt with the result 
expressions
+ * which hold information about the modified records (e.g. primary keys of 
Kudu tables,
+ * file_path / pos information of Iceberg data records).
+ * During query execution, the plan that is generated from this SelectStmt 
produces
+ * all rows that need to be modified.
+ *
+ * UPDATEs:
+ * The result of the SelectStmt contain the right-hand side of the assignments 
in addition
+ * to projecting the key columns of the underlying table.
  */
 public abstract class ModifyStmt extends DmlStatementBase {
+
   // List of explicitly mentioned assignment expressions in the UPDATE's SET 
clause
   protected final List<Pair<SlotRef, Expr>> assignments_;
 
@@ -71,40 +71,14 @@ public abstract class ModifyStmt extends DmlStatementBase {
   // TableRef identifying the target table, set during analysis.
   protected TableRef targetTableRef_;
 
+  // FROM clause of the statement
   protected FromClause fromClause_;
 
   /////////////////////////////////////////
   // START: Members that are set in first run of analyze().
 
-  // Exprs correspond to the partitionKeyValues, if specified, or to the 
partition columns
-  // for tables.
-  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
-
-  // For every column of the target table that is referenced in the optional
-  // 'sort.columns' table property, this list will contain the corresponding 
result expr
-  // from 'resultExprs_'. Before insertion, all rows will be sorted by these 
exprs. If the
-  // list is empty, no additional sorting by non-partitioning columns will be 
performed.
-  // The column list must not contain partition columns and must be empty for 
non-Hdfs
-  // tables.
-  protected List<Expr> sortExprs_ = new ArrayList<>();
-
-  // Output expressions that produce the final results to write to the target 
table. May
-  // include casts. Set in first run of analyze().
-  //
-  // In case of DELETE statements it contains the columns that identify the 
deleted rows.
-  protected List<Expr> resultExprs_ = new ArrayList<>();
-
-  // Result of the analysis of the internal SelectStmt that produces the rows 
that
-  // will be modified.
-  protected SelectStmt sourceStmt_;
-
   // Implementation of the modify statement. Depends on the target table type.
-  private ModifyImpl modifyImpl_;
-
-  // Position mapping of output expressions of the sourceStmt_ to column 
indices in the
-  // target table. The i'th position in this list maps to the 
referencedColumns_[i]'th
-  // position in the target table. Set in createSourceStmt() during analysis.
-  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  protected ModifyImpl modifyImpl_;
 
   // END: Members that are set in first run of analyze
   /////////////////////////////////////////
@@ -137,10 +111,8 @@ public abstract class ModifyStmt extends DmlStatementBase {
   /**
    * The analysis of the ModifyStmt proceeds as follows: First, the FROM 
clause is
    * analyzed and the targetTablePath is verified to be a valid alias into the 
FROM
-   * clause. When the target table is identified, the assignment expressions 
are
-   * validated and as a last step the internal SelectStmt is produced and 
analyzed.
-   * Potential query rewrites for the select statement are implemented here 
and are not
-   * triggered externally by the statement rewriter.
+   * clause. It also identifies the target table. Raises errors for 
unsupported table
+   * types.
    */
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
@@ -172,7 +144,6 @@ public abstract class ModifyStmt extends DmlStatementBase {
 
     Preconditions.checkNotNull(targetTableRef_);
     FeTable dstTbl = targetTableRef_.getTable();
-    table_ = dstTbl;
     // Only Kudu and Iceberg tables can be updated.
     if (!(dstTbl instanceof FeKuduTable) && !(dstTbl instanceof 
FeIcebergTable)) {
       throw new AnalysisException(
@@ -180,154 +151,59 @@ public abstract class ModifyStmt extends 
DmlStatementBase {
               "but the following table is neither: %s",
               dstTbl.getFullName()));
     }
-    if (dstTbl instanceof FeKuduTable) {
-      modifyImpl_ = this.new ModifyKudu();
-    } else if (dstTbl instanceof FeIcebergTable) {
-      modifyImpl_ = this.new ModifyIceberg();
-    }
-
-    modifyImpl_.analyze(analyzer);
-
     // Make sure that the user is allowed to modify the target table. Use ALL 
because no
     // UPDATE / DELETE privilege exists yet (IMPALA-3840).
     analyzer.registerAuthAndAuditEvent(dstTbl, Privilege.ALL);
-
-    // Validates the assignments_ and creates the sourceStmt_.
-    if (sourceStmt_ == null) createSourceStmt(analyzer);
-    sourceStmt_.analyze(analyzer);
+    table_ = dstTbl;
+    if (modifyImpl_ == null) createModifyImpl();
+    modifyImpl_.analyze(analyzer);
+    // Create and analyze the source statement.
+    modifyImpl_.createSourceStmt(analyzer);
     // Add target table to descriptor table.
     analyzer.getDescTbl().setTargetTable(table_);
 
     sqlString_ = toSql();
   }
 
+  /**
+   * Creates the implementation class for this statement. Ony called once 
during the
+   * first run of analyze().
+   */
+  abstract protected void createModifyImpl();
+
   @Override
   public void reset() {
     super.reset();
     fromClause_.reset();
-    if (sourceStmt_ != null) sourceStmt_.reset();
-    modifyImpl_ = null;
+    modifyImpl_.reset();
   }
 
   @Override
-  public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
+  public List<Expr> getPartitionKeyExprs() { return 
modifyImpl_.getPartitionKeyExprs(); }
   @Override
-  public List<Expr> getSortExprs() { return sortExprs_; }
+  public List<Expr> getSortExprs() { return modifyImpl_.getSortExprs(); }
 
-  @Override
-  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
-    return sourceStmt_.resolveTableMask(analyzer);
+  public List<Integer> getReferencedColumns() {
+    return modifyImpl_.getReferencedColumns();
   }
 
-  /**
-   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ 
contains
-   * first the SlotRefs for the key Columns, followed by the expressions 
representing the
-   * assignments. This method sets the member variables for the sourceStmt_ 
and the
-   * referencedColumns_.
-   *
-   * This is only run once, on the first analysis. Following analysis will 
reset() and
-   * reuse previously created statements.
-   */
-  private void createSourceStmt(Analyzer analyzer)
-      throws AnalysisException {
-    // Builds the select list and column position mapping for the target table.
-    ArrayList<SelectListItem> selectList = new ArrayList<>();
-    buildAndValidateAssignmentExprs(analyzer, selectList);
-
-    // Analyze the generated select statement.
-    sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, 
wherePredicate_,
-        null, null, null, null);
-
-    modifyImpl_.addCastsToAssignmentsInSourceStmt(analyzer);
-  }
-
-  /**
-   * Validates the list of value assignments that should be used to modify the 
target
-   * table. It verifies that only those columns are referenced that belong to 
the target
-   * table, no key columns are modified, and that a single column is not 
modified multiple
-   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list 
of
-   * SelectListItems to the out parameter selectList that is used to build the 
select list
-   * for sourceStmt_. A list of integers indicating the column position of an 
entry in the
-   * select list in the target table is written to the out parameter 
referencedColumns.
-   *
-   * In addition to the expressions that are generated for each assignment, the
-   * expression list contains an expression for each key column. The key 
columns
-   * are always prepended to the list of expression representing the 
assignments.
-   */
-  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
-      List<SelectListItem> selectList)
-      throws AnalysisException {
-    // The order of the referenced columns equals the order of the result 
expressions
-    Set<SlotId> uniqueSlots = new HashSet<>();
-    Set<SlotId> keySlots = new HashSet<>();
-
-    // Mapping from column name to index
-    List<Column> cols = table_.getColumnsInHiveOrder();
-    Map<String, Integer> colIndexMap = new HashMap<>();
-    for (int i = 0; i < cols.size(); i++) {
-      colIndexMap.put(cols.get(i).getName(), i);
-    }
-
-    modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns_, 
uniqueSlots,
-        keySlots, colIndexMap);
-
-    // Assignments are only used in the context of updates.
-    for (Pair<SlotRef, Expr> valueAssignment : assignments_) {
-      SlotRef lhsSlotRef = valueAssignment.first;
-      lhsSlotRef.analyze(analyzer);
+  public Expr getWherePredicate() { return wherePredicate_; }
 
-      Expr rhsExpr = valueAssignment.second;
-      // No subqueries for rhs expression
-      if (rhsExpr.contains(Subquery.class)) {
-        throw new AnalysisException(
-            format("Subqueries are not supported as update expressions for 
column '%s'",
-                lhsSlotRef.toSql()));
-      }
-      rhsExpr.analyze(analyzer);
-
-      // Correct target table
-      if (!lhsSlotRef.isBoundByTupleIds(targetTableRef_.getId().asList())) {
-        throw new AnalysisException(
-            format("Left-hand side column '%s' in assignment expression 
'%s=%s' does not "
-                + "belong to target table '%s'", lhsSlotRef.toSql(), 
lhsSlotRef.toSql(),
-                rhsExpr.toSql(), 
targetTableRef_.getDesc().getTable().getFullName()));
-      }
-
-      Column c = lhsSlotRef.getResolvedPath().destColumn();
-      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
-      if (c == null) {
-        throw new AnalysisException(
-            format("Left-hand side in assignment expression '%s=%s' must be a 
column " +
-                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
-      }
-
-      if (keySlots.contains(lhsSlotRef.getSlotId())) {
-        boolean isSystemGeneratedColumn =
-            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
-        throw new AnalysisException(format("%s column '%s' cannot be updated.",
-            isSystemGeneratedColumn ? "System generated key" : "Key",
-            lhsSlotRef.toSql()));
-      }
-
-      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
-        throw new AnalysisException(
-            format("Duplicate value assignment to column: '%s'", 
lhsSlotRef.toSql()));
-      }
+  public List<Pair<SlotRef, Expr>> getAssignments() { return assignments_; }
 
-      rhsExpr = 
checkTypeCompatibility(targetTableRef_.getDesc().getTable().getFullName(),
-          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
-      uniqueSlots.add(lhsSlotRef.getSlotId());
-      selectList.add(new SelectListItem(rhsExpr, null));
-      referencedColumns_.add(colIndexMap.get(c.getName()));
-    }
+  @Override
+  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
+    return getQueryStmt().resolveTableMask(analyzer);
   }
 
   @Override
-  public List<Expr> getResultExprs() { return sourceStmt_.getResultExprs(); }
+  public List<Expr> getResultExprs() {
+    return modifyImpl_.getQueryStmt().getResultExprs();
+  }
 
   @Override
   public void castResultExprs(List<Type> types) throws AnalysisException {
-    sourceStmt_.castResultExprs(types);
+    modifyImpl_.castResultExprs(types);
   }
 
   @Override
@@ -336,165 +212,16 @@ public abstract class ModifyStmt extends 
DmlStatementBase {
     for (Pair<SlotRef, Expr> assignment: assignments_) {
       assignment.second = rewriter.rewrite(assignment.second, analyzer_);
     }
-    sourceStmt_.rewriteExprs(rewriter);
+    modifyImpl_.rewriteExprs(rewriter);
   }
 
-  public QueryStmt getQueryStmt() { return sourceStmt_; }
+  public QueryStmt getQueryStmt() { return modifyImpl_.getQueryStmt(); }
 
   /**
    * Return true if the target table is Kudu table.
-   * Since only Kudu tables can be updated, it must be true.
    */
   public boolean isTargetTableKuduTable() { return (table_ instanceof 
FeKuduTable); }
 
-  private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
-      Map<String, Integer> colIndexMap, String colName, boolean 
isSortingColumn)
-      throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, 
uniqueSlots,
-        keySlots, colIndexMap, colName);
-    resultExprs_.add(ref);
-    if (isSortingColumn) sortExprs_.add(ref);
-  }
-
-  private void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> 
selectList,
-  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
-  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, 
uniqueSlots,
-        keySlots, colIndexMap, colName);
-    partitionKeyExprs_.add(ref);
-    sortExprs_.add(ref);
-  }
-
-  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> 
selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
-      Map<String, Integer> colIndexMap, String colName) throws 
AnalysisException {
-    List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), 
colName);
-    SlotRef ref = new SlotRef(path);
-    ref.analyze(analyzer);
-    selectList.add(new SelectListItem(ref, null));
-    uniqueSlots.add(ref.getSlotId());
-    keySlots.add(ref.getSlotId());
-    referencedColumns.add(colIndexMap.get(colName));
-    return ref;
-  }
-
   @Override
   public abstract String toSql(ToSqlOptions options);
-
-  private interface ModifyImpl {
-    void analyze(Analyzer analyzer) throws AnalysisException;
-
-    void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-      throws AnalysisException;
-
-    void addKeyColumns(Analyzer analyzer,
-        List<SelectListItem> selectList, List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> 
colIndexMap)
-        throws AnalysisException;
-  }
-
-  private class ModifyKudu implements ModifyImpl {
-    // Target Kudu table. Result of analysis.
-    FeKuduTable kuduTable_ = (FeKuduTable)table_;
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {}
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-      // cast result expressions to the correct type of the referenced slot of 
the
-      // target table
-      int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
-      for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) 
{
-        sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
-            assignments_.get(i - keyColumnsOffset).first.getType()));
-      }
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> 
selectList,
-        List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> 
keySlots,
-        Map<String, Integer> colIndexMap) throws AnalysisException {
-      // Add the key columns as slot refs
-      for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
-            colIndexMap, k, false);
-      }
-    }
-  }
-
-  private class ModifyIceberg implements ModifyImpl {
-    FeIcebergTable originalTargetTable_;
-    IcebergPositionDeleteTable icePosDelTable_;
-
-    public ModifyIceberg() {
-      originalTargetTable_ = (FeIcebergTable)table_;
-      icePosDelTable_ = new IcebergPositionDeleteTable((FeIcebergTable)table_);
-      // Make the virtual position delete table the new target table.
-      table_ = icePosDelTable_;
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {
-      setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
-      if (ModifyStmt.this instanceof UpdateStmt) {
-        throw new AnalysisException("UPDATE is not supported for Iceberg table 
" +
-            originalTargetTable_.getFullName());
-      }
-
-      if (icePosDelTable_.getFormatVersion() == 1) {
-        throw new AnalysisException("Iceberg V1 table do not support 
DELETE/UPDATE " +
-            "operations: " + originalTargetTable_.getFullName());
-      }
-
-      String deleteMode = 
originalTargetTable_.getIcebergApiTable().properties().get(
-          org.apache.iceberg.TableProperties.DELETE_MODE);
-      if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
-        throw new AnalysisException(String.format("Unsupported delete mode: 
'%s' for " +
-            "Iceberg table: %s", deleteMode, 
originalTargetTable_.getFullName()));
-      }
-
-      if (originalTargetTable_.getDeleteFileFormat() != 
TIcebergFileFormat.PARQUET) {
-        throw new AnalysisException("Impala can only write delete files in 
PARQUET, " +
-            "but the given table uses a different file format: " +
-            originalTargetTable_.getFullName());
-      }
-
-      if (wherePredicate_ == null ||
-          
org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate_)) {
-        // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE 
TABLE t;
-        throw new AnalysisException("For deleting every row, please use 
TRUNCATE.");
-      }
-    }
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> 
selectList,
-        List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> 
colIndexMap)
-        throws AnalysisException {
-      if (originalTargetTable_.isPartitioned()) {
-        String[] partitionCols;
-        partitionCols = new String[] {"PARTITION__SPEC__ID",
-            "ICEBERG__PARTITION__SERIALIZED"};
-        for (String k : partitionCols) {
-          addPartitioningColumn(analyzer, selectList, referencedColumns, 
uniqueSlots,
-              keySlots, colIndexMap, k);
-        }
-      }
-      String[] deleteCols;
-      deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
-      // Add the key columns as slot refs
-      for (String k : deleteCols) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, 
keySlots,
-            colIndexMap, k, true);
-      }
-    }
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 8f8dfee71..9e36ed1db 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -22,13 +22,12 @@ import static java.lang.String.format;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.TableSink;
-import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 /**
  * Representation of an Update statement.
@@ -58,19 +57,19 @@ public class UpdateStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_);
   }
 
+  @Override
+  protected void createModifyImpl() {
+    // Currently only Kudu tables are supported.
+    Preconditions.checkState(table_ instanceof FeKuduTable);
+    modifyImpl_ = new KuduUpdateImpl(this);
+  }
+
   /**
    * Return an instance of a KuduTableSink specialized as an Update operation.
    */
-  public DataSink createDataSink(List<Expr> resultExprs) {
+  public DataSink createDataSink() {
     // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
-        ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, 
false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        0);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return dataSink;
+    return modifyImpl_.createDataSink();
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 9a35bef7e..c33761a8d 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -175,7 +175,7 @@ public class Planner {
       if (ctx_.isUpdate()) {
         // Set up update sink for root fragment
         rootFragment.setSink(
-            
ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs));
+            ctx_.getAnalysisResult().getUpdateStmt().createDataSink());
       } else if (ctx_.isDelete()) {
         // Set up delete sink for root fragment
         DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();

Reply via email to