This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 19110b490d4e84af9bb4d37438b0ac93aecdedc1
Author: Peter Rozsa <[email protected]>
AuthorDate: Mon Oct 21 09:28:36 2024 +0200

    IMPALA-13362: Implement WHEN NOT MATCHED BY SOURCE syntax for MERGE 
statement
    
    This change adds support for a new MERGE clause that covers the
    condition when the source statement's rows do not match the target
    tables rows. Example: MERGE INTO target t using source s on t.id = s.id
    WHEN NOT MATCHED BY SOURCE THEN UPDATE set t.column = "a";
    
    This change also adds support to use WHEN NOT MATCHED BY TARGET
    explicitly, this is equivalent to  WHEN NOT MATCHED.
    
    Tests:
     - Parser tests for the new language elements.
     - Analyzer and planner test for WHEN NOT MATCHED BY SOURCE/TARGET
       clauses.
     - E2E tests for WHEN NOT MATCHED BY SOURCE clause.
    
    Change-Id: Ia0e0607682a616ef6ad9eccf499dc0c5c9278c5f
    Reviewed-on: http://gerrit.cloudera.org:8080/21988
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/iceberg-merge-node.cc                  |  81 +++---
 be/src/exec/iceberg-merge-node.h                   |  32 ++-
 common/thrift/PlanNodes.thrift                     |   7 +
 fe/src/main/cup/sql-parser.cup                     |  61 ++++-
 .../java/org/apache/impala/analysis/MergeCase.java |  33 ++-
 .../org/apache/impala/analysis/MergeDelete.java    |  13 +-
 .../org/apache/impala/analysis/MergeInsert.java    |  13 +-
 .../java/org/apache/impala/analysis/MergeStmt.java |   5 +-
 .../org/apache/impala/analysis/MergeUpdate.java    |  12 +-
 .../apache/impala/planner/IcebergMergeNode.java    |   5 +-
 .../impala/analysis/AnalyzeModifyStmtsTest.java    |  20 +-
 .../org/apache/impala/analysis/ParserTest.java     |  25 ++
 .../queries/PlannerTest/iceberg-merge.test         | 131 +++++++++-
 .../QueryTest/iceberg-merge-partition-sort.test    | 120 +++++++++
 ...erg-merge.test => iceberg-merge-partition.test} | 285 ++++++++-------------
 .../queries/QueryTest/iceberg-merge.test           | 239 ++---------------
 tests/query_test/test_iceberg.py                   |   6 +
 17 files changed, 585 insertions(+), 503 deletions(-)

diff --git a/be/src/exec/iceberg-merge-node.cc 
b/be/src/exec/iceberg-merge-node.cc
index 8511ae425..f591ead46 100644
--- a/be/src/exec/iceberg-merge-node.cc
+++ b/be/src/exec/iceberg-merge-node.cc
@@ -33,7 +33,6 @@
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
-#include "runtime/types.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -70,7 +69,8 @@ Status IcebergMergeCasePlan::Init(const TIcebergMergeCase& 
tmerge_case,
       tmerge_case.output_expressions, *row_desc, state, pool, &output_exprs_));
   RETURN_IF_ERROR(ScalarExpr::Create(
       tmerge_case.filter_conjuncts, *row_desc, state, pool, 
&filter_conjuncts_));
-  type_ = tmerge_case.type;
+  case_type_ = tmerge_case.type;
+  match_type_ = tmerge_case.match_type;
   return Status::OK();
 }
 
@@ -97,10 +97,17 @@ IcebergMergeNode::IcebergMergeNode(
 
   for (auto* merge_case_plan : pnode.merge_case_plans_) {
     auto merge_case = pool->Add(new IcebergMergeCase(merge_case_plan));
-    if (merge_case->IsMatchedCase()) {
-      matched_cases_.push_back(merge_case);
-    } else {
-      not_matched_cases_.push_back(merge_case);
+    all_cases_.push_back(merge_case);
+    switch (merge_case->match_type_) {
+      case TMergeMatchType::MATCHED:
+        matched_cases_.push_back(merge_case);
+        break;
+      case TMergeMatchType::NOT_MATCHED_BY_TARGET:
+        not_matched_by_target_cases_.push_back(merge_case);
+        break;
+      case TMergeMatchType::NOT_MATCHED_BY_SOURCE:
+        not_matched_by_source_cases_.push_back(merge_case);
+        break;
     }
   }
 }
@@ -120,10 +127,7 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) {
       ScalarExprEvaluator::Create(partition_meta_exprs_, state, 
state->obj_pool(),
           expr_perm_pool_.get(), expr_results_pool_.get(), 
&partition_meta_evaluators_));
 
-  for (auto* merge_case : matched_cases_) {
-    RETURN_IF_ERROR(merge_case->Prepare(state, *this));
-  }
-  for (auto* merge_case : not_matched_cases_) {
+  for (auto* merge_case : all_cases_) {
     RETURN_IF_ERROR(merge_case->Prepare(state, *this));
   }
 
@@ -142,10 +146,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, 
state));
 
-  for (auto* merge_case : matched_cases_) {
-    RETURN_IF_ERROR(merge_case->Open(state));
-  }
-  for (auto* merge_case : not_matched_cases_) {
+  for (auto* merge_case : all_cases_) {
     RETURN_IF_ERROR(merge_case->Open(state));
   }
 
@@ -185,27 +186,34 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* 
output_batch) {
     }
     previous_row_target_tuple_ = row->GetTuple(target_tuple_idx_);
 
-    auto row_present = row_present_evaluator_->GetTinyIntVal(row);
+    auto row_present = row_present_evaluator_->GetTinyIntVal(row).val;
     IcebergMergeCase* selected_case = nullptr;
 
-    if (row_present == TIcebergMergeRowPresent::BOTH) { // Matches
-      for (auto* matched_case : matched_cases_) {
-        if (EvalConjuncts(matched_case->filter_evaluators_.data(),
-                matched_case->filter_evaluators_.size(), row)) {
-          selected_case = matched_case;
-          break;
-        }
+
+    std::vector<IcebergMergeCase*>* cases = nullptr;
+    switch (row_present) {
+      case TIcebergMergeRowPresent::BOTH: {
+        cases = &matched_cases_;
+        break;
+      }
+      case TIcebergMergeRowPresent::SOURCE: {
+        cases = &not_matched_by_target_cases_;
+        break;
       }
+      case TIcebergMergeRowPresent::TARGET: {
+        cases = &not_matched_by_source_cases_;
+        break;
+      }
+      default:
+        return Status("Invalid row presence value in MERGE statement's result 
set.");
     }
-    if (row_present == TIcebergMergeRowPresent::SOURCE) { // Not matches
-      for (auto* not_matched_case : not_matched_cases_) {
-        if (EvalConjuncts(not_matched_case->filter_evaluators_.data(),
-                not_matched_case->filter_evaluators_.size(), row)) {
-          selected_case = not_matched_case;
-          break;
-        }
+    for (auto* merge_case : *cases) {
+      if (CheckCase(merge_case, row)) {
+        selected_case = merge_case;
+        break;
       }
     }
+
     if (!selected_case) continue;
     // Add a new row to output_batch
     AddRow(output_batch, selected_case, row);
@@ -214,6 +222,11 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* 
output_batch) {
   return Status::OK();
 }
 
+bool IcebergMergeNode::CheckCase(const IcebergMergeCase* merge_case, TupleRow* 
row) {
+  return EvalConjuncts(
+      merge_case->filter_evaluators_.data(), 
merge_case->filter_evaluators_.size(), row);
+}
+
 void IcebergMergeNode::AddRow(
     RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row) {
   TupleRow* dst_row = output_batch->GetRow(output_batch->AddRow());
@@ -264,11 +277,8 @@ Status IcebergMergeNode::Reset(RuntimeState* state, 
RowBatch* row_batch) {
 void IcebergMergeNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   child_row_batch_.reset();
-  for (auto matched_case : matched_cases_) {
-    matched_case->Close(state);
-  }
-  for (auto not_matched_case : not_matched_cases_) {
-    not_matched_case->Close(state);
+  for (auto merge_case : all_cases_) {
+    merge_case->Close(state);
   }
   row_present_evaluator_->Close(state);
   ScalarExprEvaluator::Close(position_meta_evaluators_, state);
@@ -287,7 +297,8 @@ const std::vector<ScalarExprEvaluator*>& 
IcebergMergeNode::PartitionMetaEvals()
 IcebergMergeCase::IcebergMergeCase(const IcebergMergeCasePlan* merge_case_plan)
   : filter_conjuncts_(merge_case_plan->filter_conjuncts_),
     output_exprs_(merge_case_plan->output_exprs_),
-    type_(merge_case_plan->type_) {}
+    case_type_(merge_case_plan->case_type_),
+    match_type_(merge_case_plan->match_type_) {}
 
 Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& 
parent) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, 
state->obj_pool(),
diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h
index 8ff059233..59c3ffbdd 100644
--- a/be/src/exec/iceberg-merge-node.h
+++ b/be/src/exec/iceberg-merge-node.h
@@ -98,17 +98,22 @@ class IcebergMergeNode : public ExecNode {
   /// is a duplicate, the second check tests if the row contains both the 
target and
   /// the source, or just the source. In the 'BOTH' case, the 'WHEN MATCHED' 
cases are
   /// tested by their filter evaluators. The evaluation respects the order of 
cases as
-  /// they are defined in the query. In the 'SOURCE' case, the 'WHEN NOT 
MATCHED' cases
-  /// are getting checked similarly to the 'BOTH' case. The 'selected_case' 
pointer
-  /// stores the first matched case. If the 'selected_case' is set, then a new 
row is
-  /// added to the output row batch, and the output expressions are evaluated 
into the
-  /// new row. The merge action is also set derived from the type of the 
selected case.
+  /// they are defined in the query. In the 'SOURCE' case, the
+  /// 'WHEN NOT MATCHED (BY TARGET)' cases are checked, in the 'TARGET' case 
the
+  /// 'WHEN NOT MATCHED BY SOURCE' cases are checked similarly to the 'BOTH' 
case.
+  /// The 'selected_case' pointer stores the first matched case. If the 
'selected_case'
+  /// is set, then a new row is added to the output row batch, and the output
+  /// expressions are evaluated into the new row. The merge action is also set
+  /// derived from the type of the selected case.
   Status EvaluateCases(RowBatch* output_batch);
   void AddRow(RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* 
row);
+  bool CheckCase(const IcebergMergeCase * merge_case, TupleRow* row);
   bool IsDuplicateRow(TupleRow* actual_row);
 
   std::vector<IcebergMergeCase*> matched_cases_;
-  std::vector<IcebergMergeCase*> not_matched_cases_;
+  std::vector<IcebergMergeCase*> not_matched_by_target_cases_;
+  std::vector<IcebergMergeCase*> not_matched_by_source_cases_;
+  std::vector<IcebergMergeCase*> all_cases_;
   std::unique_ptr<RowBatch> child_row_batch_;
   int child_row_idx_;
   bool child_eos_;
@@ -147,7 +152,8 @@ class IcebergMergeCasePlan {
   /// Filter conjuncts applied after matching the case
   std::vector<ScalarExpr*> filter_conjuncts_;
   std::vector<ScalarExpr*> output_exprs_;
-  TMergeCaseType::type type_{};
+  TMergeCaseType::type case_type_{};
+  TMergeMatchType::type match_type_{};
 };
 
 class IcebergMergeCase {
@@ -164,23 +170,21 @@ class IcebergMergeCase {
   Status Open(RuntimeState* state);
   void Close(RuntimeState* state);
 
-  [[nodiscard]] bool IsMatchedCase() const {
-    return type_ == TMergeCaseType::DELETE || type_ == TMergeCaseType::UPDATE;
-  }
   [[nodiscard]] TIcebergMergeSinkAction::type SinkAction() const {
-    if (type_ == TMergeCaseType::DELETE) {
+    if (case_type_ == TMergeCaseType::DELETE) {
       return TIcebergMergeSinkAction::DELETE;
     }
-    if (type_ == TMergeCaseType::INSERT) {
+    if (case_type_ == TMergeCaseType::INSERT) {
       return TIcebergMergeSinkAction::DATA;
     }
-    DCHECK(type_ == TMergeCaseType::UPDATE);
+    DCHECK(case_type_ == TMergeCaseType::UPDATE);
     return TIcebergMergeSinkAction::BOTH;
   }
 
   std::vector<ScalarExpr*> filter_conjuncts_;
   std::vector<ScalarExpr*> output_exprs_;
-  TMergeCaseType::type type_{};
+  TMergeCaseType::type case_type_{};
+  TMergeMatchType::type match_type_{};
 
   std::vector<ScalarExprEvaluator*> filter_evaluators_;
   std::vector<ScalarExprEvaluator*> output_evaluators_;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index a500205b5..47872faf2 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -732,6 +732,12 @@ enum TMergeCaseType {
   DELETE = 2
 }
 
+enum TMergeMatchType {
+  MATCHED = 0
+  NOT_MATCHED_BY_TARGET = 1
+  NOT_MATCHED_BY_SOURCE = 2
+}
+
 enum TIcebergMergeRowPresent {
   BOTH = 0
   SOURCE = 1
@@ -746,6 +752,7 @@ struct TIcebergMergeCase {
   2: optional list<Exprs.TExpr> filter_conjuncts
   // Type of the merge case that reflects the operation in it.
   3: required TMergeCaseType type
+  4: required TMergeMatchType match_type
 }
 
 struct TIcebergMergeNode {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index ab417fd79..7ccb032d2 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -65,6 +65,8 @@ import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.common.NotImplementedException;
@@ -469,10 +471,11 @@ nonterminal List<MergeCase> merge_case_list;
 nonterminal MergeCase merge_case;
 nonterminal MergeCase merge_when_matched;
 nonterminal MergeCase merge_when_not_matched;
-nonterminal MergeCase merge_matched;
+nonterminal MergeCase merge_update_and_delete;
 nonterminal MergeUpdate merge_update;
 nonterminal MergeDelete merge_delete;
 nonterminal MergeInsert merge_insert;
+nonterminal List<Expr> merge_then;
 nonterminal OptimizeStmt optimize_stmt;
 nonterminal List<Pair<SlotRef, Expr>> update_set_expr_list;
 nonterminal StatementBase explain_stmt;
@@ -607,6 +610,7 @@ nonterminal Boolean server_ident;
 nonterminal Boolean source_ident;
 nonterminal Boolean sources_ident;
 nonterminal Boolean uri_ident;
+nonterminal String target_or_source_ident;
 nonterminal testcase_ident;
 
 // For Create/Drop/Show function ddl
@@ -1042,24 +1046,54 @@ merge_case ::=
   {: RESULT = not_matched; :};
 
 merge_when_matched ::=
-  KW_WHEN KW_MATCHED KW_THEN merge_matched:matched
-  {: RESULT = matched; :}
-  | KW_WHEN KW_MATCHED KW_AND expr_list:exprs KW_THEN merge_matched:matched
+  KW_WHEN KW_MATCHED merge_then:exprs merge_update_and_delete:matched
   {:
     matched.setFilterExprs(exprs);
     RESULT = matched;
   :};
 
 merge_when_not_matched ::=
-  KW_WHEN KW_NOT KW_MATCHED KW_THEN merge_insert:insert
-  {: RESULT = insert; :}
-  | KW_WHEN KW_NOT KW_MATCHED KW_AND expr_list:exprs KW_THEN 
merge_insert:insert
+  KW_WHEN KW_NOT KW_MATCHED merge_then:exprs merge_insert:insert
   {:
     insert.setFilterExprs(exprs);
+    insert.setMatchType(TMergeMatchType.NOT_MATCHED_BY_TARGET);
     RESULT = insert;
+  :}
+  | KW_WHEN KW_NOT KW_MATCHED KW_BY target_or_source_ident:target_or_source 
merge_then:exprs merge_insert:insert
+  {:
+    // User wrote WHEN NOT MATCHED BY SOURCE with INSERT operation
+    // which doesn't make sense, as there is nothing to insert.
+    if (target_or_source.equals("SOURCE")) {
+      parser.parseError("SOURCE", SqlParserSymbols.KW_INSERT, "UPDATE, 
DELETE");
+    }
+    insert.setFilterExprs(exprs);
+    insert.setMatchType(TMergeMatchType.NOT_MATCHED_BY_TARGET);
+    RESULT = insert;
+  :}
+  | KW_WHEN KW_NOT KW_MATCHED KW_BY target_or_source_ident:target_or_source 
merge_then:exprs merge_update_and_delete:matched
+  {:
+    // User wrote WHEN NOT MATCHED BY TARGET with DELETE/UPDATE operation
+    // which doesn't make sense, as there is nothing to delete/update.
+    if (target_or_source.equals("TARGET")) {
+      int kw = SqlParserSymbols.KW_DELETE;
+      if (matched.caseType().equals(TMergeCaseType.UPDATE)) {
+        kw = SqlParserSymbols.KW_UPDATE;
+      }
+      parser.parseError("TARGET", kw, "INSERT");
+    }
+    matched.setMatchType(TMergeMatchType.NOT_MATCHED_BY_SOURCE);
+    matched.setFilterExprs(exprs);
+    RESULT = matched;
   :};
 
-merge_matched ::=
+merge_then ::=
+  KW_AND expr_list:exprs KW_THEN
+  {: RESULT = exprs; :}
+  |
+  KW_THEN
+  {: RESULT = Collections.emptyList(); :};
+
+merge_update_and_delete ::=
   merge_update:update
   {: RESULT = update; :}
   | merge_delete:delete
@@ -2492,6 +2526,17 @@ testcase_ident ::=
   :}
   ;
 
+target_or_source_ident ::=
+  IDENT:ident
+  {:
+    String targetOrSource = ident.toUpperCase();
+    if (!targetOrSource.equals("TARGET") && !targetOrSource.equals("SOURCE")) {
+      parser.parseError("identifier", SqlParserSymbols.IDENT, "TARGET, 
SOURCE");
+    }
+    RESULT = targetOrSource;
+  :}
+  ;
+
 option_ident ::=
   IDENT:ident
   {:
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java 
b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
index 08b3460a5..114998efe 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
@@ -26,6 +26,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 
 /**
  * Base class for different merge cases in MERGE statements. Each merge case
@@ -40,25 +41,29 @@ public abstract class MergeCase extends StatementBase {
   protected TableName targetTableName_;
   protected List<Column> targetTableColumns_;
   protected TableRef targetTableRef_;
+  protected TMergeMatchType matchType_;
 
   protected MergeCase() {
+    matchType_ = TMergeMatchType.MATCHED;
     filterExprs_ = Collections.emptyList();
     resultExprs_ = Collections.emptyList();
   }
 
   protected MergeCase(List<Expr> resultExprs, List<Expr> filterExprs,
       TableName targetTableName, List<Column> targetTableColumns,
-      TableRef targetTableRef) {
+      TableRef targetTableRef, TMergeMatchType matchType) {
     targetTableName_ = targetTableName;
     targetTableColumns_ = targetTableColumns;
     targetTableRef_ = targetTableRef;
     resultExprs_ = resultExprs;
     filterExprs_ = filterExprs;
+    matchType_ = matchType;
   }
 
   public List<Expr> getFilterExprs() { return filterExprs_; }
 
   public void setFilterExprs(List<Expr> exprs) { filterExprs_ = exprs; }
+  public void setMatchType(TMergeMatchType matchType) { matchType_ = 
matchType; }
 
   public void setParent(MergeStmt parent) {
     targetTableName_ = parent.getTargetTable().getTableName();
@@ -101,7 +106,7 @@ public abstract class MergeCase extends StatementBase {
   public String toSql(ToSqlOptions options) {
     StringBuilder builder = new StringBuilder();
     builder.append("WHEN ");
-    builder.append(matchType().value());
+    builder.append(matchTypeAsString());
     if (!filterExprs_.isEmpty()) {
       StringJoiner expressionJoiner = new StringJoiner(" AND ");
       builder.append(" AND ");
@@ -127,15 +132,23 @@ public abstract class MergeCase extends StatementBase {
   @Override
   public List<Expr> getResultExprs() { return resultExprs_; }
 
-  public enum MatchType {
-    MATCHED("MATCHED"),
-    NOT_MATCHED("NOT MATCHED");
-
-    private final String value_;
+  public TMergeMatchType matchType(){
+    return matchType_;
+  }
 
-    MatchType(String value) { value_ = value; }
-    public String value() { return value_; }
+  public String matchTypeAsString() {
+    switch (matchType_) {
+      case MATCHED:
+        return "MATCHED";
+      case NOT_MATCHED_BY_TARGET:
+        return "NOT MATCHED BY TARGET";
+      case NOT_MATCHED_BY_SOURCE:
+        return "NOT MATCHED BY SOURCE";
+      default:
+        throw new IllegalStateException(
+            String.format("Invalid TMergeMatchType value: %s", matchType_));
+    }
   }
-  public abstract MatchType matchType();
+
   public abstract TMergeCaseType caseType();
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java 
b/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java
index f7e36f4dc..df9d05fe5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 
 /**
  * Delete clause for MERGE statement. This clause does not have any extra 
clauses, it
@@ -38,8 +39,9 @@ public class MergeDelete extends MergeCase {
 
   protected MergeDelete(List<Expr> resultExprs, List<Expr> filterExprs,
       TableName targetTableName, List<Column> targetTableColumns,
-      TableRef targetTableRef) {
-    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef);
+      TableRef targetTableRef, TMergeMatchType matchType) {
+    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef,
+        matchType);
   }
 
   @Override
@@ -60,17 +62,12 @@ public class MergeDelete extends MergeCase {
     return String.format("%sDELETE", parent);
   }
 
-  @Override
-  public MatchType matchType() {
-    return MatchType.MATCHED;
-  }
-
   @Override
   public TMergeCaseType caseType() { return TMergeCaseType.DELETE; }
 
   @Override
   public MergeDelete clone() {
     return new MergeDelete(Expr.cloneList(resultExprs_), 
Expr.cloneList(getFilterExprs()),
-        targetTableName_, targetTableColumns_, targetTableRef_);
+        targetTableName_, targetTableColumns_, targetTableRef_, matchType_);
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java 
b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
index 13d0311a1..119655f73 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
@@ -29,6 +29,7 @@ import java.util.StringJoiner;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 
 /**
  * Insert clause for MERGE statements. This clause supports 2 different usage 
modes:
@@ -49,8 +50,10 @@ public class MergeInsert extends MergeCase {
 
   protected MergeInsert(List<Expr> resultExprs, List<Expr> filterExprs,
       TableName targetTableName, List<Column> targetTableColumns, TableRef 
targetTableRef,
-      List<String> columnPermutation, SelectList selectList) {
-    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef);
+      TMergeMatchType matchType, List<String> columnPermutation,
+      SelectList selectList) {
+    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef,
+        matchType);
     columnPermutation_ = columnPermutation;
     selectList_ = selectList;
   }
@@ -91,16 +94,14 @@ public class MergeInsert extends MergeCase {
     return builder.toString();
   }
 
-  @Override
-  public MatchType matchType() { return MatchType.NOT_MATCHED; }
-
   @Override
   public TMergeCaseType caseType() { return TMergeCaseType.INSERT; }
 
   @Override
   public MergeInsert clone() {
     return new MergeInsert(Expr.cloneList(resultExprs_), 
Expr.cloneList(getFilterExprs()),
-        targetTableName_, targetTableColumns_, targetTableRef_, 
columnPermutation_,
+        targetTableName_, targetTableColumns_, targetTableRef_, matchType_,
+        columnPermutation_,
         selectList_);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
index 4153e3e38..0d0a3d043 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
@@ -26,6 +26,7 @@ import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.PlannerContext;
 import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 import org.apache.impala.thrift.TSortingOrder;
 
 /**
@@ -163,8 +164,8 @@ public class MergeStmt extends DmlStatementBase {
   public List<MergeCase> getCases() { return cases_; }
 
   public boolean hasOnlyMatchedCases() {
-    return cases_.stream().noneMatch(
-        mergeCase -> mergeCase.caseType().equals(TMergeCaseType.INSERT));
+    return cases_.stream().allMatch(mergeCase -> mergeCase.matchType().equals(
+        TMergeMatchType.MATCHED));
   }
 
   public boolean hasOnlyInsertCases() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java 
b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
index 52ce89253..6eae3bd18 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TMergeCaseType;
+import org.apache.impala.thrift.TMergeMatchType;
 
 public class MergeUpdate extends MergeCase {
   private final List<Pair<SlotRef, Expr>> assignmentExprs_;
@@ -39,8 +40,9 @@ public class MergeUpdate extends MergeCase {
 
   protected MergeUpdate(List<Expr> resultExprs, List<Expr> filterExprs,
       TableName targetTableName, List<Column> targetTableColumns, TableRef 
targetTableRef,
-      List<Pair<SlotRef, Expr>> assignmentExprs) {
-    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef);
+      TMergeMatchType matchType, List<Pair<SlotRef, Expr>> assignmentExprs) {
+    super(resultExprs, filterExprs, targetTableName, targetTableColumns, 
targetTableRef,
+        matchType);
     assignmentExprs_ = assignmentExprs;
   }
 
@@ -93,9 +95,6 @@ public class MergeUpdate extends MergeCase {
     return builder.toString();
   }
 
-  @Override
-  public MatchType matchType() { return MatchType.MATCHED; }
-
   @Override
   public TMergeCaseType caseType() { return TMergeCaseType.UPDATE; }
 
@@ -125,6 +124,7 @@ public class MergeUpdate extends MergeCase {
   @Override
   public MergeUpdate clone() {
     return new MergeUpdate(Expr.cloneList(resultExprs_), 
Expr.cloneList(getFilterExprs()),
-        targetTableName_, targetTableColumns_, targetTableRef_, 
assignmentExprs_);
+        targetTableName_, targetTableColumns_, targetTableRef_, matchType_,
+        assignmentExprs_);
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
index 4b8bb84cb..e608d131e 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
@@ -68,7 +68,8 @@ public class IcebergMergeNode extends PlanNode {
     List<TIcebergMergeCase> mergeCases = new ArrayList<>();
     for (MergeCase mergeCase : cases_) {
       TIcebergMergeCase tMergeCase = new TIcebergMergeCase(
-          Expr.treesToThrift(mergeCase.getResultExprs()), 
mergeCase.caseType());
+          Expr.treesToThrift(mergeCase.getResultExprs()), mergeCase.caseType(),
+          mergeCase.matchType());
       if (!mergeCase.getFilterExprs().isEmpty()) {
         
tMergeCase.setFilter_conjuncts(Expr.treesToThrift(mergeCase.getFilterExprs()));
       }
@@ -134,7 +135,7 @@ public class IcebergMergeNode extends PlanNode {
     for (int i = 0; i < cases_.size(); i++) {
       MergeCase mergeCase = cases_.get(i);
       joiner.add(String.format("%sCASE %d: %s", detailPrefix, i,
-          cases_.get(i).matchType().value()));
+          cases_.get(i).matchTypeAsString()));
       for (String detail : mergeCase.getExplainStrings(detailLevel)) {
         joiner.add(String.format("%s%s%s", detailPrefix, detailPrefix, 
detail));
       }
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
index eab3fd753..2aeda2e0c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
@@ -281,7 +281,6 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
         + "using functional_parquet.iceberg_non_partitioned "
         + "on target.id = functional_parquet.iceberg_non_partitioned.id "
         + "when matched then delete");
-
     // INSERT
     // Inserting values originated from the target table (NULL values)
     AnalyzesOk("merge into "
@@ -342,6 +341,25 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
         + "then insert (id, user) values (s.id, concat(s.user, 'string'))"
         + "when not matched then "
         + "insert values (s.id, s.user, 'ab', s.event_time);");
+    // NOT MATCHED BY TARGET
+    AnalyzesOk("merge into "
+        + "functional_parquet.iceberg_v2_partitioned_position_deletes target "
+        + "using functional_parquet.iceberg_non_partitioned s "
+        + "on target.id = s.id "
+        + "when not matched by target and s.id <= 12 "
+        + "then insert (id, user) values (s.id, 'user')");
+    // NOT MATCHED BY SOURCE
+    AnalyzesOk("merge into "
+        + "functional_parquet.iceberg_v2_partitioned_position_deletes target "
+        + "using functional_parquet.iceberg_non_partitioned s "
+        + "on target.id = s.id "
+        + "when not matched by source "
+        + "then update set user = 'updated'");
+    AnalyzesOk("merge into "
+        + "functional_parquet.iceberg_v2_partitioned_position_deletes target "
+        + "using functional_parquet.iceberg_non_partitioned s "
+        + "on target.id = s.id "
+        + "when not matched by source and target.user = 'something' then 
delete");
 
     // Inline view as target
     AnalysisError("merge into "
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 75d33f8ff..702acff21 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2009,6 +2009,19 @@ public class ParserTest extends FrontendTestBase {
         + "then update set t.a = s.a");
     ParsesOk("merge into t using s on t.id = s.id when not matched "
         + "then insert values(a,b,c)");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by target "
+        + "then insert values(a,b,c)");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by target "
+        + "and t.a > 10 then insert values(a,b,c)");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by source "
+        + "then delete");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by source "
+        + "and funcn(a) > 10 then delete");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by source "
+        + "then update set b = 12");
+    ParsesOk("merge into t using s on t.id = s.id when not matched by source "
+        + "and func(b) != func(a) then update set b = 12");
+
 
     ParserError("merge into t using s on t.id = s.id "
         + "when matched and t.a > s.b then delete from");
@@ -2018,6 +2031,18 @@ public class ParserTest extends FrontendTestBase {
         + "when not matched and t.a > s.b then update set a = b");
     ParserError("merge into t using s on t.id = s.id "
         + "when not matched and t.a > s.b then delete");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by target then delete");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by target and a = 1 then delete");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by target then update set b = a");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by target and x = y then update set b = a, c = d");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by source then insert values (1, 2, 3)");
+    ParserError("merge into t using s on t.id = s.id "
+        + "when not matched by source and a <> b then insert values (1, 2, 
3)");
 
     // Invalid column permutation
     ParserError("merge into target t using (select * from source) s "
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
index 3a6e31b34..1082cd650 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
@@ -63,7 +63,7 @@ when not matched then insert values(source.id, source.user)
 WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
 |
 03:MERGE
-|  CASE 0: NOT MATCHED
+|  CASE 0: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, source.`user`
 |  |  type: INSERT
 |  row-size=80B cardinality=23
@@ -85,7 +85,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, 
OVERWRITE=false]
 WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
 |
 03:MERGE
-|  CASE 0: NOT MATCHED
+|  CASE 0: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, source.`user`
 |  |  type: INSERT
 |  row-size=80B cardinality=23
@@ -349,7 +349,7 @@ MERGE SINK
 |  |  filter predicates: target.id = 2
 |  |  result expressions: target.id, target.`user`, target.action, 
source.event_time
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  filter predicates: source.id > 10
 |  |  result expressions: source.id, source.`user`, source.action, 
target.event_time
 |  |  type: INSERT
@@ -357,7 +357,7 @@ MERGE SINK
 |  |  filter predicates: target.id > 10
 |  |  result expressions: target.id, target.`user`, target.action, 
target.event_time
 |  |  type: DELETE
-|  CASE 3: NOT MATCHED
+|  CASE 3: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, source.`user`, NULL, NULL
 |  |  type: INSERT
 |  CASE 4: MATCHED
@@ -403,7 +403,7 @@ MERGE SINK
 |  |  filter predicates: target.id = 2
 |  |  result expressions: target.id, target.`user`, target.action, 
source.event_time
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  filter predicates: source.id > 10
 |  |  result expressions: source.id, source.`user`, source.action, 
target.event_time
 |  |  type: INSERT
@@ -411,7 +411,7 @@ MERGE SINK
 |  |  filter predicates: target.id > 10
 |  |  result expressions: target.id, target.`user`, target.action, 
target.event_time
 |  |  type: DELETE
-|  CASE 3: NOT MATCHED
+|  CASE 3: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, source.`user`, NULL, NULL
 |  |  type: INSERT
 |  CASE 4: MATCHED
@@ -571,7 +571,7 @@ MERGE SINK
 |  CASE 0: MATCHED
 |  |  result expressions: target.id, target.int_col, action, 
target.date_string_col, target.`year`, target.`month`
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, NULL, source.action, NULL, NULL, NULL
 |  |  type: INSERT
 |  row-size=120B cardinality=14.62K
@@ -604,7 +604,7 @@ MERGE SINK
 |  CASE 0: MATCHED
 |  |  result expressions: target.id, target.int_col, action, 
target.date_string_col, target.`year`, target.`month`
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  result expressions: source.id, NULL, source.action, NULL, NULL, NULL
 |  |  type: INSERT
 |  row-size=120B cardinality=14.62K
@@ -646,7 +646,7 @@ MERGE SINK
 |  CASE 0: MATCHED
 |  |  result expressions: target.id, target.`user`, 
functional_parquet.iceberg_partition_evolution.string_col, target.event_time
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
 |  |  type: INSERT
 |  row-size=120B cardinality=14.61K
@@ -687,7 +687,7 @@ MERGE SINK
 |  CASE 0: MATCHED
 |  |  result expressions: target.id, target.`user`, 
functional_parquet.iceberg_partition_evolution.string_col, target.event_time
 |  |  type: UPDATE
-|  CASE 1: NOT MATCHED
+|  CASE 1: NOT MATCHED BY TARGET
 |  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
 |  |  type: INSERT
 |  row-size=120B cardinality=14.61K
@@ -720,3 +720,114 @@ MERGE SINK
    Iceberg snapshot id: 8885697082976537578
    row-size=80B cardinality=20
 ====
+# Merge into a partitioned Iceberg table containing explicit NOT MATCHED BY 
TARGET, implicit NOT MATCHED BY TARGET and NOT MATCHED BY SOURCE merge cases
+merge into functional_parquet.iceberg_v2_partitioned_position_deletes target
+using (select * from functional_parquet.iceberg_partition_evolution) source
+on target.id = source.id
+when not matched then insert (id, user, action) values(source.id, 
source.string_col, concat("something ", source.string_col))
+when not matched by target and target.id < 15 then insert (id, user, action) 
values(source.id, source.string_col, concat("something ", source.string_col))
+when not matched by source and target.user = 'something' then delete
+when not matched by source then update set target.id = 10;
+---- PLAN
+MERGE SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+06:SORT
+|  order by: action ASC NULLS LAST
+|  row-size=81B cardinality=14.61K
+|
+05:MERGE
+|  CASE 0: NOT MATCHED BY TARGET
+|  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
+|  |  type: INSERT
+|  CASE 1: NOT MATCHED BY TARGET
+|  |  filter predicates: target.id < 15
+|  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
+|  |  type: INSERT
+|  CASE 2: NOT MATCHED BY SOURCE
+|  |  filter predicates: target.`user` = 'something'
+|  |  result expressions: target.id, target.`user`, target.action, 
target.event_time
+|  |  type: DELETE
+|  CASE 3: NOT MATCHED BY SOURCE
+|  |  result expressions: 10, target.`user`, target.action, target.event_time
+|  |  type: UPDATE
+|  row-size=120B cardinality=14.61K
+|
+04:HASH JOIN [FULL OUTER JOIN]
+|  hash predicates: target.id = 
functional_parquet.iceberg_partition_evolution.id
+|  row-size=120B cardinality=14.61K
+|
+|--03:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+|     HDFS partitions=1/1 files=1460 size=2.49MB
+|     Iceberg snapshot id: 547864005421580562
+|     row-size=40B cardinality=14.60K
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=80B cardinality=10
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
target-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     Iceberg snapshot id: 8885697082976537578
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes 
target]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   Iceberg snapshot id: 8885697082976537578
+   row-size=80B cardinality=20
+---- DISTRIBUTEDPLAN
+MERGE SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+10:SORT
+|  order by: action ASC NULLS LAST
+|  row-size=81B cardinality=14.61K
+|
+09:EXCHANGE [HASH(target.action)]
+|
+05:MERGE
+|  CASE 0: NOT MATCHED BY TARGET
+|  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
+|  |  type: INSERT
+|  CASE 1: NOT MATCHED BY TARGET
+|  |  filter predicates: target.id < 15
+|  |  result expressions: functional_parquet.iceberg_partition_evolution.id, 
functional_parquet.iceberg_partition_evolution.string_col, concat('something ', 
functional_parquet.iceberg_partition_evolution.string_col), NULL
+|  |  type: INSERT
+|  CASE 2: NOT MATCHED BY SOURCE
+|  |  filter predicates: target.`user` = 'something'
+|  |  result expressions: target.id, target.`user`, target.action, 
target.event_time
+|  |  type: DELETE
+|  CASE 3: NOT MATCHED BY SOURCE
+|  |  result expressions: 10, target.`user`, target.action, target.event_time
+|  |  type: UPDATE
+|  row-size=120B cardinality=14.61K
+|
+04:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
+|  hash predicates: target.id = 
functional_parquet.iceberg_partition_evolution.id
+|  row-size=120B cardinality=14.61K
+|
+|--08:EXCHANGE [HASH(functional_parquet.iceberg_partition_evolution.id)]
+|  |
+|  03:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+|     HDFS partitions=1/1 files=1460 size=2.49MB
+|     Iceberg snapshot id: 547864005421580562
+|     row-size=40B cardinality=14.60K
+|
+07:EXCHANGE [HASH(target.id)]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=80B cardinality=10
+|
+|--06:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
target-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     Iceberg snapshot id: 8885697082976537578
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes 
target]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   Iceberg snapshot id: 8885697082976537578
+   row-size=80B cardinality=20
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test
new file mode 100644
index 000000000..4bc57cb68
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test
@@ -0,0 +1,120 @@
+====
+---- QUERY
+# Table creation and initial data loading
+create table target_part_sort(id int, bool_col boolean, int_col int, float_col 
float,
+    decimal_col decimal(20,0), date_col date, string_col string)
+  partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
+  sort by (int_col)
+  stored as iceberg tblproperties("format-version"="2") ;
+
+create table source(id int, bool_col boolean, int_col int, float_col float,
+    decimal_col decimal(20,0), date_col date, string_col string)
+  stored as iceberg tblproperties("format-version"="2");
+
+insert into source select id, bool_col, int_col,
+  float_col, cast(bigint_col as decimal(20,0)), to_date(timestamp_col),
+  string_col from functional.alltypes order by id limit 7;
+====
+---- QUERY
+# Merge into partitioned and sorted target table from the source table as an 
inline view
+# using inserts as the target table is empty
+merge into target_part_sort target using (select * from source) source on 
target.id = source.id
+when not matched and source.id % 2 = 0 then insert values(
+    cast(source.id + 1000 as int), source.bool_col, source.int_col,
+    source.float_col, source.decimal_col,
+    source.date_col, 'constant string value')
+when not matched then insert values(
+    cast(source.id + 2000 as int), source.bool_col, source.int_col,
+    source.float_col, source.decimal_col,
+    source.date_col, concat(source.string_col, " case 2"))
+---- DML_RESULTS: target_part_sort
+1000,true,0,0,0,2009-01-01,'constant string value'
+1002,true,2,2.200000047683716,20,2009-01-01,'constant string value'
+1004,true,4,4.400000095367432,40,2009-01-01,'constant string value'
+1006,true,6,6.599999904632568,60,2009-01-01,'constant string value'
+2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
+2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
+2005,false,5,5.5,50,2009-01-01,'5 case 2'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned and sorted target table from the source table as
+# an union of inline views duplicating the source rows
+# causing query abortion on the backend
+merge into target_part_sort target using (select * from source union all 
select * from source) source on target.id = source.id + 1002
+when not matched by target then insert values(
+    cast(source.id + 1000 as int), source.bool_col, source.int_col,
+    source.float_col, source.decimal_col,
+    source.date_col, 'constant string value')
+---- CATCH
+Duplicate row found
+====
+---- QUERY
+# Merge into partitioned and sorted target table from the source table as
+# an inline view using an update to combine target and source values as a 
source value
+merge into target_part_sort target using (select * from source) source on 
target.id = source.id + 1000
+when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
+---- DML_RESULTS: target_part_sort
+1000,true,0,0,0,2009-01-01,'constant string value'
+1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
+1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
+1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
+2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
+2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
+2005,false,5,5.5,50,2009-01-01,'5 case 2'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned and sorted target table from the source table as
+# an inline view using an update to combine target and source values as a 
source value
+# using join condition on string columns
+merge into target_part_sort target using (select * from source) source on 
target.string_col = concat(source.string_col, ' case 2')
+when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
+---- DML_RESULTS: target_part_sort
+1000,true,0,0,0,2009-01-01,'constant string value'
+1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
+1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
+1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
+2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2'
+2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2'
+2005,false,5,11,50,2009-01-01,'5 case 2'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned and sorted target table from the source table
+# using all permutation of merge clauses (with an unconditional delete)
+merge into target_part_sort target using
+(select id, bool_col, int_col, float_col, decimal_col, date_col, string_col 
from source union all
+ select cast(id + 2000 as int), bool_col, int_col, float_col, decimal_col, 
date_col, string_col from source) source
+on target.id = source.id
+when not matched by target and source.id = 2006 then insert values (source.id, 
source.bool_col, source.int_col, source.float_col, source.decimal_col, 
source.date_col, "first")
+when not matched then insert values (source.id, source.bool_col, 
source.int_col, source.float_col, source.decimal_col, source.date_col, "second")
+when matched and target.id = 2005 then update set string_col = "third"
+when matched and target.id = 2003 then delete
+when matched then delete
+when not matched by source and target.id = 1000 then update set string_col = 
"fourth", decimal_col = 1000000
+when not matched by source then update set string_col = "fifth", float_col = 
-683925235.2
+---- DML_RESULTS: target_part_sort
+0,true,0,0,0,2009-01-01,'second'
+1,false,1,1.100000023841858,10,2009-01-01,'second'
+2,true,2,2.200000047683716,20,2009-01-01,'second'
+3,false,3,3.299999952316284,30,2009-01-01,'second'
+4,true,4,4.400000095367432,40,2009-01-01,'second'
+5,false,5,5.5,50,2009-01-01,'second'
+6,true,6,6.599999904632568,60,2009-01-01,'second'
+1000,true,0,0,1000000,2009-01-01,'fourth'
+1002,true,2,-683925248,20,2009-01-01,'fifth'
+1004,true,4,-683925248,40,2009-01-01,'fifth'
+1006,true,6,-683925248,60,2009-01-01,'fifth'
+2000,true,0,0,0,2009-01-01,'second'
+2002,true,2,2.200000047683716,20,2009-01-01,'second'
+2004,true,4,4.400000095367432,40,2009-01-01,'second'
+2005,false,5,11,50,2009-01-01,'third'
+2006,true,6,6.599999904632568,60,2009-01-01,'first'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
similarity index 60%
copy from 
testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test
copy to 
testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
index ad9c9e997..b5e3afdb3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
@@ -1,21 +1,11 @@
 ====
 ---- QUERY
-# Table structure creation and initial data loading
-create table target(id int, bool_col boolean, int_col int, float_col float,
-    decimal_col decimal(20,0), date_col date, string_col string)
-  stored as iceberg tblproperties("format-version"="2");
-
+# Table creation and initial data loading
 create table target_part(id int, bool_col boolean, int_col int, float_col 
float,
     decimal_col decimal(20,0), date_col date, string_col string)
   partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
   stored as iceberg tblproperties("format-version"="2");
 
-create table target_part_sort(id int, bool_col boolean, int_col int, float_col 
float,
-    decimal_col decimal(20,0), date_col date, string_col string)
-  partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
-  sort by (int_col)
-  stored as iceberg tblproperties("format-version"="2") ;
-
 create table source(id int, bool_col boolean, int_col int, float_col float,
     decimal_col decimal(20,0), date_col date, string_col string)
   stored as iceberg tblproperties("format-version"="2");
@@ -25,103 +15,6 @@ insert into source select id, bool_col, int_col,
   string_col from functional.alltypes order by id limit 7;
 ====
 ---- QUERY
-# Merge into unpartitioned target table from the source table
-# using when not matched insert case as the target table is empty now
-merge into target using source on target.id = source.id
-when not matched then insert values(
-    source.id, source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, source.string_col)
----- DML_RESULTS: target
-3,false,3,3.29999995232,30,2009-01-01,'3'
-5,false,5,5.5,50,2009-01-01,'5'
-1,false,1,1.10000002384,10,2009-01-01,'1'
-4,true,4,4.40000009537,40,2009-01-01,'4'
-0,true,0,0.0,0,2009-01-01,'0'
-6,true,6,6.59999990463,60,2009-01-01,'6'
-2,true,2,2.20000004768,20,2009-01-01,'2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
----- RUNTIME_PROFILE
-NumModifiedRows: 7
-NumDeletedRows: 0
-====
----- QUERY
-# Merge into unpartitioned target table from the source table
-# using update case to update int_col to a constant values
-merge into target using source on target.id = source.id
-when matched and source.id % 2 = 1 then update set int_col = 555
-when matched and source.id % 2 = 0 then update set int_col = 222
----- DML_RESULTS: target
-3,false,555,3.29999995232,30,2009-01-01,'3'
-5,false,555,5.5,50,2009-01-01,'5'
-1,false,555,1.10000002384,10,2009-01-01,'1'
-4,true,222,4.40000009537,40,2009-01-01,'4'
-0,true,222,0.0,0,2009-01-01,'0'
-6,true,222,6.59999990463,60,2009-01-01,'6'
-2,true,222,2.20000004768,20,2009-01-01,'2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
----- RUNTIME_PROFILE
-NumModifiedRows: 7
-NumDeletedRows: 7
-====
----- QUERY
-# Merge into unpartitioned target table from the source table
-# Using when matched delete case to delete some values from the target table
-merge into target using source on target.id = source.id
-when matched and source.id % 2 = 1 then delete
----- DML_RESULTS: target
-4,true,222,4.40000009537,40,2009-01-01,'4'
-0,true,222,0.0,0,2009-01-01,'0'
-6,true,222,6.59999990463,60,2009-01-01,'6'
-2,true,222,2.20000004768,20,2009-01-01,'2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
----- RUNTIME_PROFILE
-NumModifiedRows: 0
-NumDeletedRows: 3
-====
----- QUERY
-# Merge into unpartitioned target table from the source table
-# using multiple cases to insert/update/delete specific rows
-merge into target using source on target.id = source.id
-when matched and source.id = 6 then delete
-when matched and target.id % 2 = 0 then update set string_col = 
concat(source.string_col, " case 2")
-when not matched and source.id = 5 then insert (id, int_col) values 
(source.id, source.int_col)
----- DML_RESULTS: target
-4,true,222,4.40000009537,40,2009-01-01,'4 case 2'
-0,true,222,0.0,0,2009-01-01,'0 case 2'
-2,true,222,2.20000004768,20,2009-01-01,'2 case 2'
-5,NULL,5,NULL,NULL,NULL,'NULL'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
----- RUNTIME_PROFILE
-NumModifiedRows: 4
-NumDeletedRows: 4
-====
----- QUERY
-# Validate the number of snapshots written to target
-select count(1) snapshots from $DATABASE.target.snapshots
----- RESULTS
-4
----- TYPES
-BIGINT
-====
----- QUERY
-# Validate the files written for target
-show files in target
----- RESULTS: VERIFY_IS_SUBSET
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY'
----- TYPES
-STRING,STRING,STRING,STRING
-====
----- QUERY
 # Merge into partitioned target table from the source table
 # Using insert to fill target_part table
 merge into target_part target using source on target.id = source.id
@@ -208,74 +101,6 @@ 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket
 STRING,STRING,STRING,STRING
 ====
 ---- QUERY
-# Merge into partitioned and sorted target table from the source table as an 
inline view
-# using inserts as the target table is empty
-merge into target_part_sort target using (select * from source) source on 
target.id = source.id
-when not matched and source.id % 2 = 0 then insert values(
-    cast(source.id + 1000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, 'constant string value')
-when not matched then insert values(
-    cast(source.id + 2000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, concat(source.string_col, " case 2"))
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,2.200000047683716,20,2009-01-01,'constant string value'
-1004,true,4,4.400000095367432,40,2009-01-01,'constant string value'
-1006,true,6,6.599999904632568,60,2009-01-01,'constant string value'
-2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
-2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
-2005,false,5,5.5,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an union of inline views duplicating the source rows
-# causing query abortion on the backend
-merge into target_part_sort target using (select * from source union all 
select * from source) source on target.id = source.id + 1002
-when not matched then insert values(
-    cast(source.id + 1000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, 'constant string value')
----- CATCH
-Duplicate row found
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an inline view using an update to combine target and source values as a 
source value
-merge into target_part_sort target using (select * from source) source on 
target.id = source.id + 1000
-when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
-1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
-1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
-2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
-2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
-2005,false,5,5.5,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an inline view using an update to combine target and source values as a 
source value
-# using join condition on string columns
-merge into target_part_sort target using (select * from source) source on 
target.string_col = concat(source.string_col, ' case 2')
-when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
-1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
-1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
-2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2'
-2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2'
-2005,false,5,11,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
 # Modifying the partition layout of 'target_part' by removing bucket 
partitions from int_col
 alter table target_part set partition spec (truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
 ====
@@ -331,4 +156,110 @@ 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_tr
 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY'
 ---- TYPES
 STRING,STRING,STRING,STRING
-====
\ No newline at end of file
+====
+---- QUERY
+# Merge into partitioned target table from the source table as
+# an inline view using an update to set the value of int_col
+# when source is not matching and target.id > 4
+merge into target_part target using (select * from source) source on target.id 
= cast(source.id + 10 as int)
+when not matched by source and target.id > 4 then update set int_col = 
cast(target.int_col + 10 as int)
+---- DML_RESULTS: target_part
+3,false,13,3.29999995232,30,2009-01-01,'3'
+5,false,25,5.5,50,2009-01-01,'4'
+4,true,14,4.40000009537,40,2009-01-01,'3'
+6,true,26,6.59999990463,60,2009-01-01,'5'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from the source table
+# using not matched by source clause
+merge into target_part target using source source on target.id = 
cast(source.id + 10 as int)
+when not matched by source then update set bool_col = !target.bool_col
+---- DML_RESULTS: target_part
+3,true,13,3.29999995232,30,2009-01-01,'3'
+4,false,14,4.40000009537,40,2009-01-01,'3'
+5,true,25,5.5,50,2009-01-01,'4'
+6,false,26,6.59999990463,60,2009-01-01,'5'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from the source table as
+# an inline view using a delete when source is not matching and target.id < 4
+merge into target_part target using (select * from source) source on target.id 
= cast(source.id + 10 as int)
+when not matched by source and target.id < 4 then delete
+---- DML_RESULTS: target_part
+5,true,25,5.5,50,2009-01-01,'4'
+4,false,14,4.40000009537,40,2009-01-01,'3'
+6,false,26,6.59999990463,60,2009-01-01,'5'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from the source table as
+# an inline view using not matched by source clause with multiple filter 
predicates
+merge into target_part target using source on target.id = cast(source.id + 10 
as int)
+when not matched by source
+and target.id > 4 and target.id < 6 and (target.float_col > 3.0 or 
target.float_col < 7.0)
+then update set date_col = '2024-12-12'
+---- DML_RESULTS: target_part
+5,true,25,5.5,50,2024-12-12,'4'
+4,false,14,4.40000009537,40,2009-01-01,'3'
+6,false,26,6.59999990463,60,2009-01-01,'5'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from the source table
+# using an explicit not matched by target clause to insert rows
+merge into target_part target using source on target.id = cast(source.id + 100 
as int)
+when not matched by target then insert values (cast(source.id + 7 as int), 
source.bool_col, source.int_col, source.float_col, source.decimal_col, 
source.date_col, source.string_col)
+---- DML_RESULTS: target_part
+4,false,14,4.400000095367432,40,2009-01-01,'3'
+5,true,25,5.5,50,2024-12-12,'4'
+6,false,26,6.599999904632568,60,2009-01-01,'5'
+7,true,0,0,0,2009-01-01,'0'
+8,false,1,1.100000023841858,10,2009-01-01,'1'
+9,true,2,2.200000047683716,20,2009-01-01,'2'
+10,false,3,3.299999952316284,30,2009-01-01,'3'
+11,true,4,4.400000095367432,40,2009-01-01,'4'
+12,false,5,5.5,50,2009-01-01,'5'
+13,true,6,6.599999904632568,60,2009-01-01,'6'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from the source table
+# using all permutation of merge clauses (minus the unconditional delete)
+merge into target_part target using
+(select id, bool_col, int_col, float_col, decimal_col, date_col, string_col 
from source union all
+ select cast(id + 10 as int), bool_col, int_col, float_col, decimal_col, 
date_col, string_col from source) source
+on target.id = source.id
+when not matched by target and source.id = 15 then insert values (source.id, 
source.bool_col, source.int_col, source.float_col, source.decimal_col, 
source.date_col, "first")
+when not matched then insert values (source.id, source.bool_col, 
source.int_col, source.float_col, source.decimal_col, source.date_col, "second")
+when matched and target.id = 6 then update set string_col = "third"
+when matched and target.id = 4 then delete
+when matched then update set string_col = "fourth", date_col = "1900-01-01"
+when not matched by source and target.id = 9 then update set string_col = 
"fifth", decimal_col = 1000000
+when not matched by source then update set string_col = "sixth", float_col = 
-683925235.2
+---- DML_RESULTS: target_part
+0,true,0,0,0,2009-01-01,'second'
+1,false,1,1.100000023841858,10,2009-01-01,'second'
+2,true,2,2.200000047683716,20,2009-01-01,'second'
+3,false,3,3.299999952316284,30,2009-01-01,'second'
+5,true,25,5.5,50,1900-01-01,'fourth'
+6,false,26,6.599999904632568,60,2009-01-01,'third'
+7,true,0,-683925248,0,2009-01-01,'sixth'
+8,false,1,-683925248,10,2009-01-01,'sixth'
+9,true,2,2.200000047683716,1000000,2009-01-01,'fifth'
+10,false,3,3.299999952316284,30,1900-01-01,'fourth'
+11,true,4,4.400000095367432,40,1900-01-01,'fourth'
+12,false,5,5.5,50,1900-01-01,'fourth'
+13,true,6,6.599999904632568,60,1900-01-01,'fourth'
+14,true,4,4.400000095367432,40,2009-01-01,'second'
+15,false,5,5.5,50,2009-01-01,'first'
+16,true,6,6.599999904632568,60,2009-01-01,'second'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test
index ad9c9e997..24a971dfa 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test
@@ -1,21 +1,10 @@
 ====
 ---- QUERY
-# Table structure creation and initial data loading
+# Table creation and initial data loading
 create table target(id int, bool_col boolean, int_col int, float_col float,
     decimal_col decimal(20,0), date_col date, string_col string)
   stored as iceberg tblproperties("format-version"="2");
 
-create table target_part(id int, bool_col boolean, int_col int, float_col 
float,
-    decimal_col decimal(20,0), date_col date, string_col string)
-  partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
-  stored as iceberg tblproperties("format-version"="2");
-
-create table target_part_sort(id int, bool_col boolean, int_col int, float_col 
float,
-    decimal_col decimal(20,0), date_col date, string_col string)
-  partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
-  sort by (int_col)
-  stored as iceberg tblproperties("format-version"="2") ;
-
 create table source(id int, bool_col boolean, int_col int, float_col float,
     decimal_col decimal(20,0), date_col date, string_col string)
   stored as iceberg tblproperties("format-version"="2");
@@ -101,10 +90,23 @@ NumModifiedRows: 4
 NumDeletedRows: 4
 ====
 ---- QUERY
+# Merge into unpartitioned target table from the source table
+# using not matched by source clause
+merge into target using source on target.id = cast(source.id + 1 as int)
+when not matched by source then update set date_col = '2022-12-12'
+---- DML_RESULTS: target
+4,true,222,4.40000009537,40,2009-01-01,'4 case 2'
+0,true,222,0.0,0,2022-12-12,'0 case 2'
+2,true,222,2.20000004768,20,2009-01-01,'2 case 2'
+5,NULL,5,NULL,NULL,NULL,'NULL'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
 # Validate the number of snapshots written to target
 select count(1) snapshots from $DATABASE.target.snapshots
 ---- RESULTS
-4
+5
 ---- TYPES
 BIGINT
 ====
@@ -120,215 +122,4 @@ 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*
 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY'
 ---- TYPES
 STRING,STRING,STRING,STRING
-====
----- QUERY
-# Merge into partitioned target table from the source table
-# Using insert to fill target_part table
-merge into target_part target using source on target.id = source.id
-when not matched then insert values(
-    cast(source.id + 10 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, 'constant string value')
----- DML_RESULTS: target_part
-13,false,3,3.29999995232,30,2009-01-01,'constant string value'
-15,false,5,5.5,50,2009-01-01,'constant string value'
-11,false,1,1.10000002384,10,2009-01-01,'constant string value'
-14,true,4,4.40000009537,40,2009-01-01,'constant string value'
-10,true,0,0.0,0,2009-01-01,'constant string value'
-16,true,6,6.59999990463,60,2009-01-01,'constant string value'
-12,true,2,2.20000004768,20,2009-01-01,'constant string value'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned target table from the source table
-# Using update clause to update string_col value and delete clause
-# to delete rows where source.id is lower than 3
-merge into target_part target using source on target.id = source.id + 10
-when matched and source.id < 3 then delete
-when matched then update set string_col = source.string_col
----- DML_RESULTS: target_part
-13,false,3,3.29999995232,30,2009-01-01,'3'
-15,false,5,5.5,50,2009-01-01,'5'
-14,true,4,4.40000009537,40,2009-01-01,'4'
-16,true,6,6.59999990463,60,2009-01-01,'6'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned target table from itself shifted by 1
-# using update clause to update string_col and int_col
-merge into target_part target using target_part source on target.id = 
source.id + 1
-when matched then update set string_col = source.string_col, int_col = 
cast(source.int_col + 100 as int)
----- DML_RESULTS: target_part
-13,false,3,3.29999995232,30,2009-01-01,'3'
-15,false,104,5.5,50,2009-01-01,'4'
-14,true,103,4.40000009537,40,2009-01-01,'3'
-16,true,105,6.59999990463,60,2009-01-01,'5'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Validate the number of snapshots written to target_part
-select count(1) snapshots from $DATABASE.target_part.snapshots
----- RESULTS
-3
----- TYPES
-BIGINT
-====
----- QUERY
-# Validate the files written for target_part
-show files in target_part
----- RESULTS: VERIFY_IS_SUBSET
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=4/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=0/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=0/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=9/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=9/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=18/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=18/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=6/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=6/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY'
----- TYPES
-STRING,STRING,STRING,STRING
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as an 
inline view
-# using inserts as the target table is empty
-merge into target_part_sort target using (select * from source) source on 
target.id = source.id
-when not matched and source.id % 2 = 0 then insert values(
-    cast(source.id + 1000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, 'constant string value')
-when not matched then insert values(
-    cast(source.id + 2000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, concat(source.string_col, " case 2"))
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,2.200000047683716,20,2009-01-01,'constant string value'
-1004,true,4,4.400000095367432,40,2009-01-01,'constant string value'
-1006,true,6,6.599999904632568,60,2009-01-01,'constant string value'
-2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
-2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
-2005,false,5,5.5,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an union of inline views duplicating the source rows
-# causing query abortion on the backend
-merge into target_part_sort target using (select * from source union all 
select * from source) source on target.id = source.id + 1002
-when not matched then insert values(
-    cast(source.id + 1000 as int), source.bool_col, source.int_col,
-    source.float_col, source.decimal_col,
-    source.date_col, 'constant string value')
----- CATCH
-Duplicate row found
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an inline view using an update to combine target and source values as a 
source value
-merge into target_part_sort target using (select * from source) source on 
target.id = source.id + 1000
-when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
-1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
-1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
-2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2'
-2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2'
-2005,false,5,5.5,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Merge into partitioned and sorted target table from the source table as
-# an inline view using an update to combine target and source values as a 
source value
-# using join condition on string columns
-merge into target_part_sort target using (select * from source) source on 
target.string_col = concat(source.string_col, ' case 2')
-when matched then update set float_col = cast(target.float_col + 
source.float_col as float)
----- DML_RESULTS: target_part_sort
-1000,true,0,0,0,2009-01-01,'constant string value'
-1002,true,2,4.400000095367432,20,2009-01-01,'constant string value'
-1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value'
-1006,true,6,13.19999980926514,60,2009-01-01,'constant string value'
-2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2'
-2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2'
-2005,false,5,11,50,2009-01-01,'5 case 2'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Modifying the partition layout of 'target_part' by removing bucket 
partitions from int_col
-alter table target_part set partition spec (truncate(3, decimal_col), 
year(date_col), truncate(3, string_col))
-====
----- QUERY
-# Merge into partitioned target table from the source table as
-# an inline view using an update to set the value of int_col
-merge into target_part target using (select * from source) source on target.id 
= source.id + 10
-when matched then update set int_col = source.int_col, id = source.id
----- DML_RESULTS: target_part
-3,false,3,3.29999995232,30,2009-01-01,'3'
-5,false,5,5.5,50,2009-01-01,'4'
-4,true,4,4.40000009537,40,2009-01-01,'3'
-6,true,6,6.59999990463,60,2009-01-01,'5'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Validate the files written for target_part (different partitioning)
-show files in target_part
----- RESULTS: VERIFY_IS_SUBSET
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
----- TYPES
-STRING,STRING,STRING,STRING
-====
----- QUERY
-# Modifying the partition layout of 'target_part' by removing all partition 
transforms
-alter table target_part set partition spec (void(int_col))
-====
----- QUERY
-# Merge into partitioned target table from the source table as
-# an inline view using an update to set the value of int_col
-merge into target_part target using (select * from source) source on target.id 
= source.id
-when matched then update set int_col = cast(source.int_col + 10 as int)
----- DML_RESULTS: target_part
-3,false,13,3.29999995232,30,2009-01-01,'3'
-5,false,15,5.5,50,2009-01-01,'4'
-4,true,14,4.40000009537,40,2009-01-01,'3'
-6,true,16,6.59999990463,60,2009-01-01,'5'
----- TYPES
-INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
-====
----- QUERY
-# Validate the files written for target_part (no partitioning)
-show files in target_part
----- RESULTS: VERIFY_IS_SUBSET
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/[a-z0-9_-]+_data\.0\.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/delete-.*.parq','.*','','$ERASURECODE_POLICY'
-row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY'
----- TYPES
-STRING,STRING,STRING,STRING
 ====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 53a1e1834..624e4dd4e 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1989,6 +1989,12 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_merge(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-merge', vector, unique_database)
 
+  def test_merge_partition(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-merge-partition', vector, 
unique_database)
+
+  def test_merge_partition_sort(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, 
unique_database)
+
   def test_merge_long(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database)
 

Reply via email to