This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 19110b490d4e84af9bb4d37438b0ac93aecdedc1 Author: Peter Rozsa <[email protected]> AuthorDate: Mon Oct 21 09:28:36 2024 +0200 IMPALA-13362: Implement WHEN NOT MATCHED BY SOURCE syntax for MERGE statement This change adds support for a new MERGE clause that covers the condition when the source statement's rows do not match the target tables rows. Example: MERGE INTO target t using source s on t.id = s.id WHEN NOT MATCHED BY SOURCE THEN UPDATE set t.column = "a"; This change also adds support to use WHEN NOT MATCHED BY TARGET explicitly, this is equivalent to WHEN NOT MATCHED. Tests: - Parser tests for the new language elements. - Analyzer and planner test for WHEN NOT MATCHED BY SOURCE/TARGET clauses. - E2E tests for WHEN NOT MATCHED BY SOURCE clause. Change-Id: Ia0e0607682a616ef6ad9eccf499dc0c5c9278c5f Reviewed-on: http://gerrit.cloudera.org:8080/21988 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/iceberg-merge-node.cc | 81 +++--- be/src/exec/iceberg-merge-node.h | 32 ++- common/thrift/PlanNodes.thrift | 7 + fe/src/main/cup/sql-parser.cup | 61 ++++- .../java/org/apache/impala/analysis/MergeCase.java | 33 ++- .../org/apache/impala/analysis/MergeDelete.java | 13 +- .../org/apache/impala/analysis/MergeInsert.java | 13 +- .../java/org/apache/impala/analysis/MergeStmt.java | 5 +- .../org/apache/impala/analysis/MergeUpdate.java | 12 +- .../apache/impala/planner/IcebergMergeNode.java | 5 +- .../impala/analysis/AnalyzeModifyStmtsTest.java | 20 +- .../org/apache/impala/analysis/ParserTest.java | 25 ++ .../queries/PlannerTest/iceberg-merge.test | 131 +++++++++- .../QueryTest/iceberg-merge-partition-sort.test | 120 +++++++++ ...erg-merge.test => iceberg-merge-partition.test} | 285 ++++++++------------- .../queries/QueryTest/iceberg-merge.test | 239 ++--------------- tests/query_test/test_iceberg.py | 6 + 17 files changed, 585 insertions(+), 503 deletions(-) diff --git a/be/src/exec/iceberg-merge-node.cc b/be/src/exec/iceberg-merge-node.cc index 8511ae425..f591ead46 100644 --- a/be/src/exec/iceberg-merge-node.cc +++ b/be/src/exec/iceberg-merge-node.cc @@ -33,7 +33,6 @@ #include "runtime/runtime-state.h" #include "runtime/tuple-row.h" #include "runtime/tuple.h" -#include "runtime/types.h" #include "util/debug-util.h" #include "util/runtime-profile-counters.h" @@ -70,7 +69,8 @@ Status IcebergMergeCasePlan::Init(const TIcebergMergeCase& tmerge_case, tmerge_case.output_expressions, *row_desc, state, pool, &output_exprs_)); RETURN_IF_ERROR(ScalarExpr::Create( tmerge_case.filter_conjuncts, *row_desc, state, pool, &filter_conjuncts_)); - type_ = tmerge_case.type; + case_type_ = tmerge_case.type; + match_type_ = tmerge_case.match_type; return Status::OK(); } @@ -97,10 +97,17 @@ IcebergMergeNode::IcebergMergeNode( for (auto* merge_case_plan : pnode.merge_case_plans_) { auto merge_case = pool->Add(new IcebergMergeCase(merge_case_plan)); - if (merge_case->IsMatchedCase()) { - matched_cases_.push_back(merge_case); - } else { - not_matched_cases_.push_back(merge_case); + all_cases_.push_back(merge_case); + switch (merge_case->match_type_) { + case TMergeMatchType::MATCHED: + matched_cases_.push_back(merge_case); + break; + case TMergeMatchType::NOT_MATCHED_BY_TARGET: + not_matched_by_target_cases_.push_back(merge_case); + break; + case TMergeMatchType::NOT_MATCHED_BY_SOURCE: + not_matched_by_source_cases_.push_back(merge_case); + break; } } } @@ -120,10 +127,7 @@ Status IcebergMergeNode::Prepare(RuntimeState* state) { ScalarExprEvaluator::Create(partition_meta_exprs_, state, state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(), &partition_meta_evaluators_)); - for (auto* merge_case : matched_cases_) { - RETURN_IF_ERROR(merge_case->Prepare(state, *this)); - } - for (auto* merge_case : not_matched_cases_) { + for (auto* merge_case : all_cases_) { RETURN_IF_ERROR(merge_case->Prepare(state, *this)); } @@ -142,10 +146,7 @@ Status IcebergMergeNode::Open(RuntimeState* state) { RETURN_IF_ERROR(ScalarExprEvaluator::Open(position_meta_evaluators_, state)); RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_meta_evaluators_, state)); - for (auto* merge_case : matched_cases_) { - RETURN_IF_ERROR(merge_case->Open(state)); - } - for (auto* merge_case : not_matched_cases_) { + for (auto* merge_case : all_cases_) { RETURN_IF_ERROR(merge_case->Open(state)); } @@ -185,27 +186,34 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* output_batch) { } previous_row_target_tuple_ = row->GetTuple(target_tuple_idx_); - auto row_present = row_present_evaluator_->GetTinyIntVal(row); + auto row_present = row_present_evaluator_->GetTinyIntVal(row).val; IcebergMergeCase* selected_case = nullptr; - if (row_present == TIcebergMergeRowPresent::BOTH) { // Matches - for (auto* matched_case : matched_cases_) { - if (EvalConjuncts(matched_case->filter_evaluators_.data(), - matched_case->filter_evaluators_.size(), row)) { - selected_case = matched_case; - break; - } + + std::vector<IcebergMergeCase*>* cases = nullptr; + switch (row_present) { + case TIcebergMergeRowPresent::BOTH: { + cases = &matched_cases_; + break; + } + case TIcebergMergeRowPresent::SOURCE: { + cases = ¬_matched_by_target_cases_; + break; } + case TIcebergMergeRowPresent::TARGET: { + cases = ¬_matched_by_source_cases_; + break; + } + default: + return Status("Invalid row presence value in MERGE statement's result set."); } - if (row_present == TIcebergMergeRowPresent::SOURCE) { // Not matches - for (auto* not_matched_case : not_matched_cases_) { - if (EvalConjuncts(not_matched_case->filter_evaluators_.data(), - not_matched_case->filter_evaluators_.size(), row)) { - selected_case = not_matched_case; - break; - } + for (auto* merge_case : *cases) { + if (CheckCase(merge_case, row)) { + selected_case = merge_case; + break; } } + if (!selected_case) continue; // Add a new row to output_batch AddRow(output_batch, selected_case, row); @@ -214,6 +222,11 @@ Status IcebergMergeNode::EvaluateCases(RowBatch* output_batch) { return Status::OK(); } +bool IcebergMergeNode::CheckCase(const IcebergMergeCase* merge_case, TupleRow* row) { + return EvalConjuncts( + merge_case->filter_evaluators_.data(), merge_case->filter_evaluators_.size(), row); +} + void IcebergMergeNode::AddRow( RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row) { TupleRow* dst_row = output_batch->GetRow(output_batch->AddRow()); @@ -264,11 +277,8 @@ Status IcebergMergeNode::Reset(RuntimeState* state, RowBatch* row_batch) { void IcebergMergeNode::Close(RuntimeState* state) { if (is_closed()) return; child_row_batch_.reset(); - for (auto matched_case : matched_cases_) { - matched_case->Close(state); - } - for (auto not_matched_case : not_matched_cases_) { - not_matched_case->Close(state); + for (auto merge_case : all_cases_) { + merge_case->Close(state); } row_present_evaluator_->Close(state); ScalarExprEvaluator::Close(position_meta_evaluators_, state); @@ -287,7 +297,8 @@ const std::vector<ScalarExprEvaluator*>& IcebergMergeNode::PartitionMetaEvals() IcebergMergeCase::IcebergMergeCase(const IcebergMergeCasePlan* merge_case_plan) : filter_conjuncts_(merge_case_plan->filter_conjuncts_), output_exprs_(merge_case_plan->output_exprs_), - type_(merge_case_plan->type_) {} + case_type_(merge_case_plan->case_type_), + match_type_(merge_case_plan->match_type_) {} Status IcebergMergeCase::Prepare(RuntimeState* state, IcebergMergeNode& parent) { RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, state->obj_pool(), diff --git a/be/src/exec/iceberg-merge-node.h b/be/src/exec/iceberg-merge-node.h index 8ff059233..59c3ffbdd 100644 --- a/be/src/exec/iceberg-merge-node.h +++ b/be/src/exec/iceberg-merge-node.h @@ -98,17 +98,22 @@ class IcebergMergeNode : public ExecNode { /// is a duplicate, the second check tests if the row contains both the target and /// the source, or just the source. In the 'BOTH' case, the 'WHEN MATCHED' cases are /// tested by their filter evaluators. The evaluation respects the order of cases as - /// they are defined in the query. In the 'SOURCE' case, the 'WHEN NOT MATCHED' cases - /// are getting checked similarly to the 'BOTH' case. The 'selected_case' pointer - /// stores the first matched case. If the 'selected_case' is set, then a new row is - /// added to the output row batch, and the output expressions are evaluated into the - /// new row. The merge action is also set derived from the type of the selected case. + /// they are defined in the query. In the 'SOURCE' case, the + /// 'WHEN NOT MATCHED (BY TARGET)' cases are checked, in the 'TARGET' case the + /// 'WHEN NOT MATCHED BY SOURCE' cases are checked similarly to the 'BOTH' case. + /// The 'selected_case' pointer stores the first matched case. If the 'selected_case' + /// is set, then a new row is added to the output row batch, and the output + /// expressions are evaluated into the new row. The merge action is also set + /// derived from the type of the selected case. Status EvaluateCases(RowBatch* output_batch); void AddRow(RowBatch* output_batch, IcebergMergeCase* merge_case, TupleRow* row); + bool CheckCase(const IcebergMergeCase * merge_case, TupleRow* row); bool IsDuplicateRow(TupleRow* actual_row); std::vector<IcebergMergeCase*> matched_cases_; - std::vector<IcebergMergeCase*> not_matched_cases_; + std::vector<IcebergMergeCase*> not_matched_by_target_cases_; + std::vector<IcebergMergeCase*> not_matched_by_source_cases_; + std::vector<IcebergMergeCase*> all_cases_; std::unique_ptr<RowBatch> child_row_batch_; int child_row_idx_; bool child_eos_; @@ -147,7 +152,8 @@ class IcebergMergeCasePlan { /// Filter conjuncts applied after matching the case std::vector<ScalarExpr*> filter_conjuncts_; std::vector<ScalarExpr*> output_exprs_; - TMergeCaseType::type type_{}; + TMergeCaseType::type case_type_{}; + TMergeMatchType::type match_type_{}; }; class IcebergMergeCase { @@ -164,23 +170,21 @@ class IcebergMergeCase { Status Open(RuntimeState* state); void Close(RuntimeState* state); - [[nodiscard]] bool IsMatchedCase() const { - return type_ == TMergeCaseType::DELETE || type_ == TMergeCaseType::UPDATE; - } [[nodiscard]] TIcebergMergeSinkAction::type SinkAction() const { - if (type_ == TMergeCaseType::DELETE) { + if (case_type_ == TMergeCaseType::DELETE) { return TIcebergMergeSinkAction::DELETE; } - if (type_ == TMergeCaseType::INSERT) { + if (case_type_ == TMergeCaseType::INSERT) { return TIcebergMergeSinkAction::DATA; } - DCHECK(type_ == TMergeCaseType::UPDATE); + DCHECK(case_type_ == TMergeCaseType::UPDATE); return TIcebergMergeSinkAction::BOTH; } std::vector<ScalarExpr*> filter_conjuncts_; std::vector<ScalarExpr*> output_exprs_; - TMergeCaseType::type type_{}; + TMergeCaseType::type case_type_{}; + TMergeMatchType::type match_type_{}; std::vector<ScalarExprEvaluator*> filter_evaluators_; std::vector<ScalarExprEvaluator*> output_evaluators_; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index a500205b5..47872faf2 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -732,6 +732,12 @@ enum TMergeCaseType { DELETE = 2 } +enum TMergeMatchType { + MATCHED = 0 + NOT_MATCHED_BY_TARGET = 1 + NOT_MATCHED_BY_SOURCE = 2 +} + enum TIcebergMergeRowPresent { BOTH = 0 SOURCE = 1 @@ -746,6 +752,7 @@ struct TIcebergMergeCase { 2: optional list<Exprs.TExpr> filter_conjuncts // Type of the merge case that reflects the operation in it. 3: required TMergeCaseType type + 4: required TMergeMatchType match_type } struct TIcebergMergeNode { diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index ab417fd79..7ccb032d2 100755 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -65,6 +65,8 @@ import org.apache.impala.thrift.TShowStatsOp; import org.apache.impala.thrift.TTablePropertyType; import org.apache.impala.thrift.TPrincipalType; import org.apache.impala.thrift.TSortingOrder; +import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; import org.apache.impala.service.BackendConfig; import org.apache.impala.util.IcebergUtil; import org.apache.impala.common.NotImplementedException; @@ -469,10 +471,11 @@ nonterminal List<MergeCase> merge_case_list; nonterminal MergeCase merge_case; nonterminal MergeCase merge_when_matched; nonterminal MergeCase merge_when_not_matched; -nonterminal MergeCase merge_matched; +nonterminal MergeCase merge_update_and_delete; nonterminal MergeUpdate merge_update; nonterminal MergeDelete merge_delete; nonterminal MergeInsert merge_insert; +nonterminal List<Expr> merge_then; nonterminal OptimizeStmt optimize_stmt; nonterminal List<Pair<SlotRef, Expr>> update_set_expr_list; nonterminal StatementBase explain_stmt; @@ -607,6 +610,7 @@ nonterminal Boolean server_ident; nonterminal Boolean source_ident; nonterminal Boolean sources_ident; nonterminal Boolean uri_ident; +nonterminal String target_or_source_ident; nonterminal testcase_ident; // For Create/Drop/Show function ddl @@ -1042,24 +1046,54 @@ merge_case ::= {: RESULT = not_matched; :}; merge_when_matched ::= - KW_WHEN KW_MATCHED KW_THEN merge_matched:matched - {: RESULT = matched; :} - | KW_WHEN KW_MATCHED KW_AND expr_list:exprs KW_THEN merge_matched:matched + KW_WHEN KW_MATCHED merge_then:exprs merge_update_and_delete:matched {: matched.setFilterExprs(exprs); RESULT = matched; :}; merge_when_not_matched ::= - KW_WHEN KW_NOT KW_MATCHED KW_THEN merge_insert:insert - {: RESULT = insert; :} - | KW_WHEN KW_NOT KW_MATCHED KW_AND expr_list:exprs KW_THEN merge_insert:insert + KW_WHEN KW_NOT KW_MATCHED merge_then:exprs merge_insert:insert {: insert.setFilterExprs(exprs); + insert.setMatchType(TMergeMatchType.NOT_MATCHED_BY_TARGET); RESULT = insert; + :} + | KW_WHEN KW_NOT KW_MATCHED KW_BY target_or_source_ident:target_or_source merge_then:exprs merge_insert:insert + {: + // User wrote WHEN NOT MATCHED BY SOURCE with INSERT operation + // which doesn't make sense, as there is nothing to insert. + if (target_or_source.equals("SOURCE")) { + parser.parseError("SOURCE", SqlParserSymbols.KW_INSERT, "UPDATE, DELETE"); + } + insert.setFilterExprs(exprs); + insert.setMatchType(TMergeMatchType.NOT_MATCHED_BY_TARGET); + RESULT = insert; + :} + | KW_WHEN KW_NOT KW_MATCHED KW_BY target_or_source_ident:target_or_source merge_then:exprs merge_update_and_delete:matched + {: + // User wrote WHEN NOT MATCHED BY TARGET with DELETE/UPDATE operation + // which doesn't make sense, as there is nothing to delete/update. + if (target_or_source.equals("TARGET")) { + int kw = SqlParserSymbols.KW_DELETE; + if (matched.caseType().equals(TMergeCaseType.UPDATE)) { + kw = SqlParserSymbols.KW_UPDATE; + } + parser.parseError("TARGET", kw, "INSERT"); + } + matched.setMatchType(TMergeMatchType.NOT_MATCHED_BY_SOURCE); + matched.setFilterExprs(exprs); + RESULT = matched; :}; -merge_matched ::= +merge_then ::= + KW_AND expr_list:exprs KW_THEN + {: RESULT = exprs; :} + | + KW_THEN + {: RESULT = Collections.emptyList(); :}; + +merge_update_and_delete ::= merge_update:update {: RESULT = update; :} | merge_delete:delete @@ -2492,6 +2526,17 @@ testcase_ident ::= :} ; +target_or_source_ident ::= + IDENT:ident + {: + String targetOrSource = ident.toUpperCase(); + if (!targetOrSource.equals("TARGET") && !targetOrSource.equals("SOURCE")) { + parser.parseError("identifier", SqlParserSymbols.IDENT, "TARGET, SOURCE"); + } + RESULT = targetOrSource; + :} + ; + option_ident ::= IDENT:ident {: diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java index 08b3460a5..114998efe 100644 --- a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java +++ b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java @@ -26,6 +26,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; /** * Base class for different merge cases in MERGE statements. Each merge case @@ -40,25 +41,29 @@ public abstract class MergeCase extends StatementBase { protected TableName targetTableName_; protected List<Column> targetTableColumns_; protected TableRef targetTableRef_; + protected TMergeMatchType matchType_; protected MergeCase() { + matchType_ = TMergeMatchType.MATCHED; filterExprs_ = Collections.emptyList(); resultExprs_ = Collections.emptyList(); } protected MergeCase(List<Expr> resultExprs, List<Expr> filterExprs, TableName targetTableName, List<Column> targetTableColumns, - TableRef targetTableRef) { + TableRef targetTableRef, TMergeMatchType matchType) { targetTableName_ = targetTableName; targetTableColumns_ = targetTableColumns; targetTableRef_ = targetTableRef; resultExprs_ = resultExprs; filterExprs_ = filterExprs; + matchType_ = matchType; } public List<Expr> getFilterExprs() { return filterExprs_; } public void setFilterExprs(List<Expr> exprs) { filterExprs_ = exprs; } + public void setMatchType(TMergeMatchType matchType) { matchType_ = matchType; } public void setParent(MergeStmt parent) { targetTableName_ = parent.getTargetTable().getTableName(); @@ -101,7 +106,7 @@ public abstract class MergeCase extends StatementBase { public String toSql(ToSqlOptions options) { StringBuilder builder = new StringBuilder(); builder.append("WHEN "); - builder.append(matchType().value()); + builder.append(matchTypeAsString()); if (!filterExprs_.isEmpty()) { StringJoiner expressionJoiner = new StringJoiner(" AND "); builder.append(" AND "); @@ -127,15 +132,23 @@ public abstract class MergeCase extends StatementBase { @Override public List<Expr> getResultExprs() { return resultExprs_; } - public enum MatchType { - MATCHED("MATCHED"), - NOT_MATCHED("NOT MATCHED"); - - private final String value_; + public TMergeMatchType matchType(){ + return matchType_; + } - MatchType(String value) { value_ = value; } - public String value() { return value_; } + public String matchTypeAsString() { + switch (matchType_) { + case MATCHED: + return "MATCHED"; + case NOT_MATCHED_BY_TARGET: + return "NOT MATCHED BY TARGET"; + case NOT_MATCHED_BY_SOURCE: + return "NOT MATCHED BY SOURCE"; + default: + throw new IllegalStateException( + String.format("Invalid TMergeMatchType value: %s", matchType_)); + } } - public abstract MatchType matchType(); + public abstract TMergeCaseType caseType(); } diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java b/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java index f7e36f4dc..df9d05fe5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java +++ b/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.impala.catalog.Column; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; /** * Delete clause for MERGE statement. This clause does not have any extra clauses, it @@ -38,8 +39,9 @@ public class MergeDelete extends MergeCase { protected MergeDelete(List<Expr> resultExprs, List<Expr> filterExprs, TableName targetTableName, List<Column> targetTableColumns, - TableRef targetTableRef) { - super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef); + TableRef targetTableRef, TMergeMatchType matchType) { + super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef, + matchType); } @Override @@ -60,17 +62,12 @@ public class MergeDelete extends MergeCase { return String.format("%sDELETE", parent); } - @Override - public MatchType matchType() { - return MatchType.MATCHED; - } - @Override public TMergeCaseType caseType() { return TMergeCaseType.DELETE; } @Override public MergeDelete clone() { return new MergeDelete(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()), - targetTableName_, targetTableColumns_, targetTableRef_); + targetTableName_, targetTableColumns_, targetTableRef_, matchType_); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java index 13d0311a1..119655f73 100644 --- a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java +++ b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java @@ -29,6 +29,7 @@ import java.util.StringJoiner; import org.apache.impala.catalog.Column; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; /** * Insert clause for MERGE statements. This clause supports 2 different usage modes: @@ -49,8 +50,10 @@ public class MergeInsert extends MergeCase { protected MergeInsert(List<Expr> resultExprs, List<Expr> filterExprs, TableName targetTableName, List<Column> targetTableColumns, TableRef targetTableRef, - List<String> columnPermutation, SelectList selectList) { - super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef); + TMergeMatchType matchType, List<String> columnPermutation, + SelectList selectList) { + super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef, + matchType); columnPermutation_ = columnPermutation; selectList_ = selectList; } @@ -91,16 +94,14 @@ public class MergeInsert extends MergeCase { return builder.toString(); } - @Override - public MatchType matchType() { return MatchType.NOT_MATCHED; } - @Override public TMergeCaseType caseType() { return TMergeCaseType.INSERT; } @Override public MergeInsert clone() { return new MergeInsert(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()), - targetTableName_, targetTableColumns_, targetTableRef_, columnPermutation_, + targetTableName_, targetTableColumns_, targetTableRef_, matchType_, + columnPermutation_, selectList_); } diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java index 4153e3e38..0d0a3d043 100644 --- a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java @@ -26,6 +26,7 @@ import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.DataSink; import org.apache.impala.planner.PlannerContext; import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; import org.apache.impala.thrift.TSortingOrder; /** @@ -163,8 +164,8 @@ public class MergeStmt extends DmlStatementBase { public List<MergeCase> getCases() { return cases_; } public boolean hasOnlyMatchedCases() { - return cases_.stream().noneMatch( - mergeCase -> mergeCase.caseType().equals(TMergeCaseType.INSERT)); + return cases_.stream().allMatch(mergeCase -> mergeCase.matchType().equals( + TMergeMatchType.MATCHED)); } public boolean hasOnlyInsertCases() { diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java index 52ce89253..6eae3bd18 100644 --- a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java +++ b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java @@ -29,6 +29,7 @@ import org.apache.impala.catalog.Column; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.Pair; import org.apache.impala.thrift.TMergeCaseType; +import org.apache.impala.thrift.TMergeMatchType; public class MergeUpdate extends MergeCase { private final List<Pair<SlotRef, Expr>> assignmentExprs_; @@ -39,8 +40,9 @@ public class MergeUpdate extends MergeCase { protected MergeUpdate(List<Expr> resultExprs, List<Expr> filterExprs, TableName targetTableName, List<Column> targetTableColumns, TableRef targetTableRef, - List<Pair<SlotRef, Expr>> assignmentExprs) { - super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef); + TMergeMatchType matchType, List<Pair<SlotRef, Expr>> assignmentExprs) { + super(resultExprs, filterExprs, targetTableName, targetTableColumns, targetTableRef, + matchType); assignmentExprs_ = assignmentExprs; } @@ -93,9 +95,6 @@ public class MergeUpdate extends MergeCase { return builder.toString(); } - @Override - public MatchType matchType() { return MatchType.MATCHED; } - @Override public TMergeCaseType caseType() { return TMergeCaseType.UPDATE; } @@ -125,6 +124,7 @@ public class MergeUpdate extends MergeCase { @Override public MergeUpdate clone() { return new MergeUpdate(Expr.cloneList(resultExprs_), Expr.cloneList(getFilterExprs()), - targetTableName_, targetTableColumns_, targetTableRef_, assignmentExprs_); + targetTableName_, targetTableColumns_, targetTableRef_, matchType_, + assignmentExprs_); } } diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java index 4b8bb84cb..e608d131e 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java @@ -68,7 +68,8 @@ public class IcebergMergeNode extends PlanNode { List<TIcebergMergeCase> mergeCases = new ArrayList<>(); for (MergeCase mergeCase : cases_) { TIcebergMergeCase tMergeCase = new TIcebergMergeCase( - Expr.treesToThrift(mergeCase.getResultExprs()), mergeCase.caseType()); + Expr.treesToThrift(mergeCase.getResultExprs()), mergeCase.caseType(), + mergeCase.matchType()); if (!mergeCase.getFilterExprs().isEmpty()) { tMergeCase.setFilter_conjuncts(Expr.treesToThrift(mergeCase.getFilterExprs())); } @@ -134,7 +135,7 @@ public class IcebergMergeNode extends PlanNode { for (int i = 0; i < cases_.size(); i++) { MergeCase mergeCase = cases_.get(i); joiner.add(String.format("%sCASE %d: %s", detailPrefix, i, - cases_.get(i).matchType().value())); + cases_.get(i).matchTypeAsString())); for (String detail : mergeCase.getExplainStrings(detailLevel)) { joiner.add(String.format("%s%s%s", detailPrefix, detailPrefix, detail)); } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java index eab3fd753..2aeda2e0c 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java @@ -281,7 +281,6 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { + "using functional_parquet.iceberg_non_partitioned " + "on target.id = functional_parquet.iceberg_non_partitioned.id " + "when matched then delete"); - // INSERT // Inserting values originated from the target table (NULL values) AnalyzesOk("merge into " @@ -342,6 +341,25 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { + "then insert (id, user) values (s.id, concat(s.user, 'string'))" + "when not matched then " + "insert values (s.id, s.user, 'ab', s.event_time);"); + // NOT MATCHED BY TARGET + AnalyzesOk("merge into " + + "functional_parquet.iceberg_v2_partitioned_position_deletes target " + + "using functional_parquet.iceberg_non_partitioned s " + + "on target.id = s.id " + + "when not matched by target and s.id <= 12 " + + "then insert (id, user) values (s.id, 'user')"); + // NOT MATCHED BY SOURCE + AnalyzesOk("merge into " + + "functional_parquet.iceberg_v2_partitioned_position_deletes target " + + "using functional_parquet.iceberg_non_partitioned s " + + "on target.id = s.id " + + "when not matched by source " + + "then update set user = 'updated'"); + AnalyzesOk("merge into " + + "functional_parquet.iceberg_v2_partitioned_position_deletes target " + + "using functional_parquet.iceberg_non_partitioned s " + + "on target.id = s.id " + + "when not matched by source and target.user = 'something' then delete"); // Inline view as target AnalysisError("merge into " diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 75d33f8ff..702acff21 100755 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -2009,6 +2009,19 @@ public class ParserTest extends FrontendTestBase { + "then update set t.a = s.a"); ParsesOk("merge into t using s on t.id = s.id when not matched " + "then insert values(a,b,c)"); + ParsesOk("merge into t using s on t.id = s.id when not matched by target " + + "then insert values(a,b,c)"); + ParsesOk("merge into t using s on t.id = s.id when not matched by target " + + "and t.a > 10 then insert values(a,b,c)"); + ParsesOk("merge into t using s on t.id = s.id when not matched by source " + + "then delete"); + ParsesOk("merge into t using s on t.id = s.id when not matched by source " + + "and funcn(a) > 10 then delete"); + ParsesOk("merge into t using s on t.id = s.id when not matched by source " + + "then update set b = 12"); + ParsesOk("merge into t using s on t.id = s.id when not matched by source " + + "and func(b) != func(a) then update set b = 12"); + ParserError("merge into t using s on t.id = s.id " + "when matched and t.a > s.b then delete from"); @@ -2018,6 +2031,18 @@ public class ParserTest extends FrontendTestBase { + "when not matched and t.a > s.b then update set a = b"); ParserError("merge into t using s on t.id = s.id " + "when not matched and t.a > s.b then delete"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by target then delete"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by target and a = 1 then delete"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by target then update set b = a"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by target and x = y then update set b = a, c = d"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by source then insert values (1, 2, 3)"); + ParserError("merge into t using s on t.id = s.id " + + "when not matched by source and a <> b then insert values (1, 2, 3)"); // Invalid column permutation ParserError("merge into target t using (select * from source) s " diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test index 3a6e31b34..1082cd650 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test @@ -63,7 +63,7 @@ when not matched then insert values(source.id, source.user) WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false] | 03:MERGE -| CASE 0: NOT MATCHED +| CASE 0: NOT MATCHED BY TARGET | | result expressions: source.id, source.`user` | | type: INSERT | row-size=80B cardinality=23 @@ -85,7 +85,7 @@ WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false] WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false] | 03:MERGE -| CASE 0: NOT MATCHED +| CASE 0: NOT MATCHED BY TARGET | | result expressions: source.id, source.`user` | | type: INSERT | row-size=80B cardinality=23 @@ -349,7 +349,7 @@ MERGE SINK | | filter predicates: target.id = 2 | | result expressions: target.id, target.`user`, target.action, source.event_time | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | filter predicates: source.id > 10 | | result expressions: source.id, source.`user`, source.action, target.event_time | | type: INSERT @@ -357,7 +357,7 @@ MERGE SINK | | filter predicates: target.id > 10 | | result expressions: target.id, target.`user`, target.action, target.event_time | | type: DELETE -| CASE 3: NOT MATCHED +| CASE 3: NOT MATCHED BY TARGET | | result expressions: source.id, source.`user`, NULL, NULL | | type: INSERT | CASE 4: MATCHED @@ -403,7 +403,7 @@ MERGE SINK | | filter predicates: target.id = 2 | | result expressions: target.id, target.`user`, target.action, source.event_time | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | filter predicates: source.id > 10 | | result expressions: source.id, source.`user`, source.action, target.event_time | | type: INSERT @@ -411,7 +411,7 @@ MERGE SINK | | filter predicates: target.id > 10 | | result expressions: target.id, target.`user`, target.action, target.event_time | | type: DELETE -| CASE 3: NOT MATCHED +| CASE 3: NOT MATCHED BY TARGET | | result expressions: source.id, source.`user`, NULL, NULL | | type: INSERT | CASE 4: MATCHED @@ -571,7 +571,7 @@ MERGE SINK | CASE 0: MATCHED | | result expressions: target.id, target.int_col, action, target.date_string_col, target.`year`, target.`month` | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | result expressions: source.id, NULL, source.action, NULL, NULL, NULL | | type: INSERT | row-size=120B cardinality=14.62K @@ -604,7 +604,7 @@ MERGE SINK | CASE 0: MATCHED | | result expressions: target.id, target.int_col, action, target.date_string_col, target.`year`, target.`month` | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | result expressions: source.id, NULL, source.action, NULL, NULL, NULL | | type: INSERT | row-size=120B cardinality=14.62K @@ -646,7 +646,7 @@ MERGE SINK | CASE 0: MATCHED | | result expressions: target.id, target.`user`, functional_parquet.iceberg_partition_evolution.string_col, target.event_time | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL | | type: INSERT | row-size=120B cardinality=14.61K @@ -687,7 +687,7 @@ MERGE SINK | CASE 0: MATCHED | | result expressions: target.id, target.`user`, functional_parquet.iceberg_partition_evolution.string_col, target.event_time | | type: UPDATE -| CASE 1: NOT MATCHED +| CASE 1: NOT MATCHED BY TARGET | | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL | | type: INSERT | row-size=120B cardinality=14.61K @@ -720,3 +720,114 @@ MERGE SINK Iceberg snapshot id: 8885697082976537578 row-size=80B cardinality=20 ==== +# Merge into a partitioned Iceberg table containing explicit NOT MATCHED BY TARGET, implicit NOT MATCHED BY TARGET and NOT MATCHED BY SOURCE merge cases +merge into functional_parquet.iceberg_v2_partitioned_position_deletes target +using (select * from functional_parquet.iceberg_partition_evolution) source +on target.id = source.id +when not matched then insert (id, user, action) values(source.id, source.string_col, concat("something ", source.string_col)) +when not matched by target and target.id < 15 then insert (id, user, action) values(source.id, source.string_col, concat("something ", source.string_col)) +when not matched by source and target.user = 'something' then delete +when not matched by source then update set target.id = 10; +---- PLAN +MERGE SINK +|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)] +|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +| +06:SORT +| order by: action ASC NULLS LAST +| row-size=81B cardinality=14.61K +| +05:MERGE +| CASE 0: NOT MATCHED BY TARGET +| | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL +| | type: INSERT +| CASE 1: NOT MATCHED BY TARGET +| | filter predicates: target.id < 15 +| | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL +| | type: INSERT +| CASE 2: NOT MATCHED BY SOURCE +| | filter predicates: target.`user` = 'something' +| | result expressions: target.id, target.`user`, target.action, target.event_time +| | type: DELETE +| CASE 3: NOT MATCHED BY SOURCE +| | result expressions: 10, target.`user`, target.action, target.event_time +| | type: UPDATE +| row-size=120B cardinality=14.61K +| +04:HASH JOIN [FULL OUTER JOIN] +| hash predicates: target.id = functional_parquet.iceberg_partition_evolution.id +| row-size=120B cardinality=14.61K +| +|--03:SCAN HDFS [functional_parquet.iceberg_partition_evolution] +| HDFS partitions=1/1 files=1460 size=2.49MB +| Iceberg snapshot id: 547864005421580562 +| row-size=40B cardinality=14.60K +| +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] +| row-size=80B cardinality=10 +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 target-position-delete] +| HDFS partitions=1/1 files=3 size=9.47KB +| Iceberg snapshot id: 8885697082976537578 +| row-size=204B cardinality=10 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes target] + HDFS partitions=1/1 files=3 size=3.48KB + Iceberg snapshot id: 8885697082976537578 + row-size=80B cardinality=20 +---- DISTRIBUTEDPLAN +MERGE SINK +|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)] +|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +| +10:SORT +| order by: action ASC NULLS LAST +| row-size=81B cardinality=14.61K +| +09:EXCHANGE [HASH(target.action)] +| +05:MERGE +| CASE 0: NOT MATCHED BY TARGET +| | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL +| | type: INSERT +| CASE 1: NOT MATCHED BY TARGET +| | filter predicates: target.id < 15 +| | result expressions: functional_parquet.iceberg_partition_evolution.id, functional_parquet.iceberg_partition_evolution.string_col, concat('something ', functional_parquet.iceberg_partition_evolution.string_col), NULL +| | type: INSERT +| CASE 2: NOT MATCHED BY SOURCE +| | filter predicates: target.`user` = 'something' +| | result expressions: target.id, target.`user`, target.action, target.event_time +| | type: DELETE +| CASE 3: NOT MATCHED BY SOURCE +| | result expressions: 10, target.`user`, target.action, target.event_time +| | type: UPDATE +| row-size=120B cardinality=14.61K +| +04:HASH JOIN [FULL OUTER JOIN, PARTITIONED] +| hash predicates: target.id = functional_parquet.iceberg_partition_evolution.id +| row-size=120B cardinality=14.61K +| +|--08:EXCHANGE [HASH(functional_parquet.iceberg_partition_evolution.id)] +| | +| 03:SCAN HDFS [functional_parquet.iceberg_partition_evolution] +| HDFS partitions=1/1 files=1460 size=2.49MB +| Iceberg snapshot id: 547864005421580562 +| row-size=40B cardinality=14.60K +| +07:EXCHANGE [HASH(target.id)] +| +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] +| row-size=80B cardinality=10 +| +|--06:EXCHANGE [DIRECTED] +| | +| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 target-position-delete] +| HDFS partitions=1/1 files=3 size=9.47KB +| Iceberg snapshot id: 8885697082976537578 +| row-size=204B cardinality=10 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes target] + HDFS partitions=1/1 files=3 size=3.48KB + Iceberg snapshot id: 8885697082976537578 + row-size=80B cardinality=20 +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test new file mode 100644 index 000000000..4bc57cb68 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition-sort.test @@ -0,0 +1,120 @@ +==== +---- QUERY +# Table creation and initial data loading +create table target_part_sort(id int, bool_col boolean, int_col int, float_col float, + decimal_col decimal(20,0), date_col date, string_col string) + partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), year(date_col), truncate(3, string_col)) + sort by (int_col) + stored as iceberg tblproperties("format-version"="2") ; + +create table source(id int, bool_col boolean, int_col int, float_col float, + decimal_col decimal(20,0), date_col date, string_col string) + stored as iceberg tblproperties("format-version"="2"); + +insert into source select id, bool_col, int_col, + float_col, cast(bigint_col as decimal(20,0)), to_date(timestamp_col), + string_col from functional.alltypes order by id limit 7; +==== +---- QUERY +# Merge into partitioned and sorted target table from the source table as an inline view +# using inserts as the target table is empty +merge into target_part_sort target using (select * from source) source on target.id = source.id +when not matched and source.id % 2 = 0 then insert values( + cast(source.id + 1000 as int), source.bool_col, source.int_col, + source.float_col, source.decimal_col, + source.date_col, 'constant string value') +when not matched then insert values( + cast(source.id + 2000 as int), source.bool_col, source.int_col, + source.float_col, source.decimal_col, + source.date_col, concat(source.string_col, " case 2")) +---- DML_RESULTS: target_part_sort +1000,true,0,0,0,2009-01-01,'constant string value' +1002,true,2,2.200000047683716,20,2009-01-01,'constant string value' +1004,true,4,4.400000095367432,40,2009-01-01,'constant string value' +1006,true,6,6.599999904632568,60,2009-01-01,'constant string value' +2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' +2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' +2005,false,5,5.5,50,2009-01-01,'5 case 2' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned and sorted target table from the source table as +# an union of inline views duplicating the source rows +# causing query abortion on the backend +merge into target_part_sort target using (select * from source union all select * from source) source on target.id = source.id + 1002 +when not matched by target then insert values( + cast(source.id + 1000 as int), source.bool_col, source.int_col, + source.float_col, source.decimal_col, + source.date_col, 'constant string value') +---- CATCH +Duplicate row found +==== +---- QUERY +# Merge into partitioned and sorted target table from the source table as +# an inline view using an update to combine target and source values as a source value +merge into target_part_sort target using (select * from source) source on target.id = source.id + 1000 +when matched then update set float_col = cast(target.float_col + source.float_col as float) +---- DML_RESULTS: target_part_sort +1000,true,0,0,0,2009-01-01,'constant string value' +1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' +1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' +1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' +2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' +2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' +2005,false,5,5.5,50,2009-01-01,'5 case 2' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned and sorted target table from the source table as +# an inline view using an update to combine target and source values as a source value +# using join condition on string columns +merge into target_part_sort target using (select * from source) source on target.string_col = concat(source.string_col, ' case 2') +when matched then update set float_col = cast(target.float_col + source.float_col as float) +---- DML_RESULTS: target_part_sort +1000,true,0,0,0,2009-01-01,'constant string value' +1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' +1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' +1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' +2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2' +2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2' +2005,false,5,11,50,2009-01-01,'5 case 2' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned and sorted target table from the source table +# using all permutation of merge clauses (with an unconditional delete) +merge into target_part_sort target using +(select id, bool_col, int_col, float_col, decimal_col, date_col, string_col from source union all + select cast(id + 2000 as int), bool_col, int_col, float_col, decimal_col, date_col, string_col from source) source +on target.id = source.id +when not matched by target and source.id = 2006 then insert values (source.id, source.bool_col, source.int_col, source.float_col, source.decimal_col, source.date_col, "first") +when not matched then insert values (source.id, source.bool_col, source.int_col, source.float_col, source.decimal_col, source.date_col, "second") +when matched and target.id = 2005 then update set string_col = "third" +when matched and target.id = 2003 then delete +when matched then delete +when not matched by source and target.id = 1000 then update set string_col = "fourth", decimal_col = 1000000 +when not matched by source then update set string_col = "fifth", float_col = -683925235.2 +---- DML_RESULTS: target_part_sort +0,true,0,0,0,2009-01-01,'second' +1,false,1,1.100000023841858,10,2009-01-01,'second' +2,true,2,2.200000047683716,20,2009-01-01,'second' +3,false,3,3.299999952316284,30,2009-01-01,'second' +4,true,4,4.400000095367432,40,2009-01-01,'second' +5,false,5,5.5,50,2009-01-01,'second' +6,true,6,6.599999904632568,60,2009-01-01,'second' +1000,true,0,0,1000000,2009-01-01,'fourth' +1002,true,2,-683925248,20,2009-01-01,'fifth' +1004,true,4,-683925248,40,2009-01-01,'fifth' +1006,true,6,-683925248,60,2009-01-01,'fifth' +2000,true,0,0,0,2009-01-01,'second' +2002,true,2,2.200000047683716,20,2009-01-01,'second' +2004,true,4,4.400000095367432,40,2009-01-01,'second' +2005,false,5,11,50,2009-01-01,'third' +2006,true,6,6.599999904632568,60,2009-01-01,'first' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== + diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test similarity index 60% copy from testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test copy to testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test index ad9c9e997..b5e3afdb3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test @@ -1,21 +1,11 @@ ==== ---- QUERY -# Table structure creation and initial data loading -create table target(id int, bool_col boolean, int_col int, float_col float, - decimal_col decimal(20,0), date_col date, string_col string) - stored as iceberg tblproperties("format-version"="2"); - +# Table creation and initial data loading create table target_part(id int, bool_col boolean, int_col int, float_col float, decimal_col decimal(20,0), date_col date, string_col string) partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), year(date_col), truncate(3, string_col)) stored as iceberg tblproperties("format-version"="2"); -create table target_part_sort(id int, bool_col boolean, int_col int, float_col float, - decimal_col decimal(20,0), date_col date, string_col string) - partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), year(date_col), truncate(3, string_col)) - sort by (int_col) - stored as iceberg tblproperties("format-version"="2") ; - create table source(id int, bool_col boolean, int_col int, float_col float, decimal_col decimal(20,0), date_col date, string_col string) stored as iceberg tblproperties("format-version"="2"); @@ -25,103 +15,6 @@ insert into source select id, bool_col, int_col, string_col from functional.alltypes order by id limit 7; ==== ---- QUERY -# Merge into unpartitioned target table from the source table -# using when not matched insert case as the target table is empty now -merge into target using source on target.id = source.id -when not matched then insert values( - source.id, source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, source.string_col) ----- DML_RESULTS: target -3,false,3,3.29999995232,30,2009-01-01,'3' -5,false,5,5.5,50,2009-01-01,'5' -1,false,1,1.10000002384,10,2009-01-01,'1' -4,true,4,4.40000009537,40,2009-01-01,'4' -0,true,0,0.0,0,2009-01-01,'0' -6,true,6,6.59999990463,60,2009-01-01,'6' -2,true,2,2.20000004768,20,2009-01-01,'2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING ----- RUNTIME_PROFILE -NumModifiedRows: 7 -NumDeletedRows: 0 -==== ----- QUERY -# Merge into unpartitioned target table from the source table -# using update case to update int_col to a constant values -merge into target using source on target.id = source.id -when matched and source.id % 2 = 1 then update set int_col = 555 -when matched and source.id % 2 = 0 then update set int_col = 222 ----- DML_RESULTS: target -3,false,555,3.29999995232,30,2009-01-01,'3' -5,false,555,5.5,50,2009-01-01,'5' -1,false,555,1.10000002384,10,2009-01-01,'1' -4,true,222,4.40000009537,40,2009-01-01,'4' -0,true,222,0.0,0,2009-01-01,'0' -6,true,222,6.59999990463,60,2009-01-01,'6' -2,true,222,2.20000004768,20,2009-01-01,'2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING ----- RUNTIME_PROFILE -NumModifiedRows: 7 -NumDeletedRows: 7 -==== ----- QUERY -# Merge into unpartitioned target table from the source table -# Using when matched delete case to delete some values from the target table -merge into target using source on target.id = source.id -when matched and source.id % 2 = 1 then delete ----- DML_RESULTS: target -4,true,222,4.40000009537,40,2009-01-01,'4' -0,true,222,0.0,0,2009-01-01,'0' -6,true,222,6.59999990463,60,2009-01-01,'6' -2,true,222,2.20000004768,20,2009-01-01,'2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING ----- RUNTIME_PROFILE -NumModifiedRows: 0 -NumDeletedRows: 3 -==== ----- QUERY -# Merge into unpartitioned target table from the source table -# using multiple cases to insert/update/delete specific rows -merge into target using source on target.id = source.id -when matched and source.id = 6 then delete -when matched and target.id % 2 = 0 then update set string_col = concat(source.string_col, " case 2") -when not matched and source.id = 5 then insert (id, int_col) values (source.id, source.int_col) ----- DML_RESULTS: target -4,true,222,4.40000009537,40,2009-01-01,'4 case 2' -0,true,222,0.0,0,2009-01-01,'0 case 2' -2,true,222,2.20000004768,20,2009-01-01,'2 case 2' -5,NULL,5,NULL,NULL,NULL,'NULL' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING ----- RUNTIME_PROFILE -NumModifiedRows: 4 -NumDeletedRows: 4 -==== ----- QUERY -# Validate the number of snapshots written to target -select count(1) snapshots from $DATABASE.target.snapshots ----- RESULTS -4 ----- TYPES -BIGINT -==== ----- QUERY -# Validate the files written for target -show files in target ----- RESULTS: VERIFY_IS_SUBSET -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY' ----- TYPES -STRING,STRING,STRING,STRING -==== ----- QUERY # Merge into partitioned target table from the source table # Using insert to fill target_part table merge into target_part target using source on target.id = source.id @@ -208,74 +101,6 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket STRING,STRING,STRING,STRING ==== ---- QUERY -# Merge into partitioned and sorted target table from the source table as an inline view -# using inserts as the target table is empty -merge into target_part_sort target using (select * from source) source on target.id = source.id -when not matched and source.id % 2 = 0 then insert values( - cast(source.id + 1000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, 'constant string value') -when not matched then insert values( - cast(source.id + 2000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, concat(source.string_col, " case 2")) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,2.200000047683716,20,2009-01-01,'constant string value' -1004,true,4,4.400000095367432,40,2009-01-01,'constant string value' -1006,true,6,6.599999904632568,60,2009-01-01,'constant string value' -2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' -2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' -2005,false,5,5.5,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an union of inline views duplicating the source rows -# causing query abortion on the backend -merge into target_part_sort target using (select * from source union all select * from source) source on target.id = source.id + 1002 -when not matched then insert values( - cast(source.id + 1000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, 'constant string value') ----- CATCH -Duplicate row found -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an inline view using an update to combine target and source values as a source value -merge into target_part_sort target using (select * from source) source on target.id = source.id + 1000 -when matched then update set float_col = cast(target.float_col + source.float_col as float) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' -1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' -1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' -2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' -2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' -2005,false,5,5.5,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an inline view using an update to combine target and source values as a source value -# using join condition on string columns -merge into target_part_sort target using (select * from source) source on target.string_col = concat(source.string_col, ' case 2') -when matched then update set float_col = cast(target.float_col + source.float_col as float) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' -1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' -1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' -2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2' -2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2' -2005,false,5,11,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY # Modifying the partition layout of 'target_part' by removing bucket partitions from int_col alter table target_part set partition spec (truncate(3, decimal_col), year(date_col), truncate(3, string_col)) ==== @@ -331,4 +156,110 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_tr row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING,STRING,STRING,STRING -==== \ No newline at end of file +==== +---- QUERY +# Merge into partitioned target table from the source table as +# an inline view using an update to set the value of int_col +# when source is not matching and target.id > 4 +merge into target_part target using (select * from source) source on target.id = cast(source.id + 10 as int) +when not matched by source and target.id > 4 then update set int_col = cast(target.int_col + 10 as int) +---- DML_RESULTS: target_part +3,false,13,3.29999995232,30,2009-01-01,'3' +5,false,25,5.5,50,2009-01-01,'4' +4,true,14,4.40000009537,40,2009-01-01,'3' +6,true,26,6.59999990463,60,2009-01-01,'5' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned target table from the source table +# using not matched by source clause +merge into target_part target using source source on target.id = cast(source.id + 10 as int) +when not matched by source then update set bool_col = !target.bool_col +---- DML_RESULTS: target_part +3,true,13,3.29999995232,30,2009-01-01,'3' +4,false,14,4.40000009537,40,2009-01-01,'3' +5,true,25,5.5,50,2009-01-01,'4' +6,false,26,6.59999990463,60,2009-01-01,'5' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned target table from the source table as +# an inline view using a delete when source is not matching and target.id < 4 +merge into target_part target using (select * from source) source on target.id = cast(source.id + 10 as int) +when not matched by source and target.id < 4 then delete +---- DML_RESULTS: target_part +5,true,25,5.5,50,2009-01-01,'4' +4,false,14,4.40000009537,40,2009-01-01,'3' +6,false,26,6.59999990463,60,2009-01-01,'5' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned target table from the source table as +# an inline view using not matched by source clause with multiple filter predicates +merge into target_part target using source on target.id = cast(source.id + 10 as int) +when not matched by source +and target.id > 4 and target.id < 6 and (target.float_col > 3.0 or target.float_col < 7.0) +then update set date_col = '2024-12-12' +---- DML_RESULTS: target_part +5,true,25,5.5,50,2024-12-12,'4' +4,false,14,4.40000009537,40,2009-01-01,'3' +6,false,26,6.59999990463,60,2009-01-01,'5' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned target table from the source table +# using an explicit not matched by target clause to insert rows +merge into target_part target using source on target.id = cast(source.id + 100 as int) +when not matched by target then insert values (cast(source.id + 7 as int), source.bool_col, source.int_col, source.float_col, source.decimal_col, source.date_col, source.string_col) +---- DML_RESULTS: target_part +4,false,14,4.400000095367432,40,2009-01-01,'3' +5,true,25,5.5,50,2024-12-12,'4' +6,false,26,6.599999904632568,60,2009-01-01,'5' +7,true,0,0,0,2009-01-01,'0' +8,false,1,1.100000023841858,10,2009-01-01,'1' +9,true,2,2.200000047683716,20,2009-01-01,'2' +10,false,3,3.299999952316284,30,2009-01-01,'3' +11,true,4,4.400000095367432,40,2009-01-01,'4' +12,false,5,5.5,50,2009-01-01,'5' +13,true,6,6.599999904632568,60,2009-01-01,'6' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY +# Merge into partitioned target table from the source table +# using all permutation of merge clauses (minus the unconditional delete) +merge into target_part target using +(select id, bool_col, int_col, float_col, decimal_col, date_col, string_col from source union all + select cast(id + 10 as int), bool_col, int_col, float_col, decimal_col, date_col, string_col from source) source +on target.id = source.id +when not matched by target and source.id = 15 then insert values (source.id, source.bool_col, source.int_col, source.float_col, source.decimal_col, source.date_col, "first") +when not matched then insert values (source.id, source.bool_col, source.int_col, source.float_col, source.decimal_col, source.date_col, "second") +when matched and target.id = 6 then update set string_col = "third" +when matched and target.id = 4 then delete +when matched then update set string_col = "fourth", date_col = "1900-01-01" +when not matched by source and target.id = 9 then update set string_col = "fifth", decimal_col = 1000000 +when not matched by source then update set string_col = "sixth", float_col = -683925235.2 +---- DML_RESULTS: target_part +0,true,0,0,0,2009-01-01,'second' +1,false,1,1.100000023841858,10,2009-01-01,'second' +2,true,2,2.200000047683716,20,2009-01-01,'second' +3,false,3,3.299999952316284,30,2009-01-01,'second' +5,true,25,5.5,50,1900-01-01,'fourth' +6,false,26,6.599999904632568,60,2009-01-01,'third' +7,true,0,-683925248,0,2009-01-01,'sixth' +8,false,1,-683925248,10,2009-01-01,'sixth' +9,true,2,2.200000047683716,1000000,2009-01-01,'fifth' +10,false,3,3.299999952316284,30,1900-01-01,'fourth' +11,true,4,4.400000095367432,40,1900-01-01,'fourth' +12,false,5,5.5,50,1900-01-01,'fourth' +13,true,6,6.599999904632568,60,1900-01-01,'fourth' +14,true,4,4.400000095367432,40,2009-01-01,'second' +15,false,5,5.5,50,2009-01-01,'first' +16,true,6,6.599999904632568,60,2009-01-01,'second' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test index ad9c9e997..24a971dfa 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test @@ -1,21 +1,10 @@ ==== ---- QUERY -# Table structure creation and initial data loading +# Table creation and initial data loading create table target(id int, bool_col boolean, int_col int, float_col float, decimal_col decimal(20,0), date_col date, string_col string) stored as iceberg tblproperties("format-version"="2"); -create table target_part(id int, bool_col boolean, int_col int, float_col float, - decimal_col decimal(20,0), date_col date, string_col string) - partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), year(date_col), truncate(3, string_col)) - stored as iceberg tblproperties("format-version"="2"); - -create table target_part_sort(id int, bool_col boolean, int_col int, float_col float, - decimal_col decimal(20,0), date_col date, string_col string) - partitioned by spec (bucket(5, int_col), truncate(3, decimal_col), year(date_col), truncate(3, string_col)) - sort by (int_col) - stored as iceberg tblproperties("format-version"="2") ; - create table source(id int, bool_col boolean, int_col int, float_col float, decimal_col decimal(20,0), date_col date, string_col string) stored as iceberg tblproperties("format-version"="2"); @@ -101,10 +90,23 @@ NumModifiedRows: 4 NumDeletedRows: 4 ==== ---- QUERY +# Merge into unpartitioned target table from the source table +# using not matched by source clause +merge into target using source on target.id = cast(source.id + 1 as int) +when not matched by source then update set date_col = '2022-12-12' +---- DML_RESULTS: target +4,true,222,4.40000009537,40,2009-01-01,'4 case 2' +0,true,222,0.0,0,2022-12-12,'0 case 2' +2,true,222,2.20000004768,20,2009-01-01,'2 case 2' +5,NULL,5,NULL,NULL,NULL,'NULL' +---- TYPES +INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING +==== +---- QUERY # Validate the number of snapshots written to target select count(1) snapshots from $DATABASE.target.snapshots ---- RESULTS -4 +5 ---- TYPES BIGINT ==== @@ -120,215 +122,4 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.* row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target/data/delete-.*.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING,STRING,STRING,STRING -==== ----- QUERY -# Merge into partitioned target table from the source table -# Using insert to fill target_part table -merge into target_part target using source on target.id = source.id -when not matched then insert values( - cast(source.id + 10 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, 'constant string value') ----- DML_RESULTS: target_part -13,false,3,3.29999995232,30,2009-01-01,'constant string value' -15,false,5,5.5,50,2009-01-01,'constant string value' -11,false,1,1.10000002384,10,2009-01-01,'constant string value' -14,true,4,4.40000009537,40,2009-01-01,'constant string value' -10,true,0,0.0,0,2009-01-01,'constant string value' -16,true,6,6.59999990463,60,2009-01-01,'constant string value' -12,true,2,2.20000004768,20,2009-01-01,'constant string value' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned target table from the source table -# Using update clause to update string_col value and delete clause -# to delete rows where source.id is lower than 3 -merge into target_part target using source on target.id = source.id + 10 -when matched and source.id < 3 then delete -when matched then update set string_col = source.string_col ----- DML_RESULTS: target_part -13,false,3,3.29999995232,30,2009-01-01,'3' -15,false,5,5.5,50,2009-01-01,'5' -14,true,4,4.40000009537,40,2009-01-01,'4' -16,true,6,6.59999990463,60,2009-01-01,'6' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned target table from itself shifted by 1 -# using update clause to update string_col and int_col -merge into target_part target using target_part source on target.id = source.id + 1 -when matched then update set string_col = source.string_col, int_col = cast(source.int_col + 100 as int) ----- DML_RESULTS: target_part -13,false,3,3.29999995232,30,2009-01-01,'3' -15,false,104,5.5,50,2009-01-01,'4' -14,true,103,4.40000009537,40,2009-01-01,'3' -16,true,105,6.59999990463,60,2009-01-01,'5' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Validate the number of snapshots written to target_part -select count(1) snapshots from $DATABASE.target_part.snapshots ----- RESULTS -3 ----- TYPES -BIGINT -==== ----- QUERY -# Validate the files written for target_part -show files in target_part ----- RESULTS: VERIFY_IS_SUBSET -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=4/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=0/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=0/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=0/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=9/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=1/decimal_col_trunc=9/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=18/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=18/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=2/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=3/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=6/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=6/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=con/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/int_col_bucket=4/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=con/delete-.*.parq','.*','','$ERASURECODE_POLICY' ----- TYPES -STRING,STRING,STRING,STRING -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as an inline view -# using inserts as the target table is empty -merge into target_part_sort target using (select * from source) source on target.id = source.id -when not matched and source.id % 2 = 0 then insert values( - cast(source.id + 1000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, 'constant string value') -when not matched then insert values( - cast(source.id + 2000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, concat(source.string_col, " case 2")) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,2.200000047683716,20,2009-01-01,'constant string value' -1004,true,4,4.400000095367432,40,2009-01-01,'constant string value' -1006,true,6,6.599999904632568,60,2009-01-01,'constant string value' -2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' -2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' -2005,false,5,5.5,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an union of inline views duplicating the source rows -# causing query abortion on the backend -merge into target_part_sort target using (select * from source union all select * from source) source on target.id = source.id + 1002 -when not matched then insert values( - cast(source.id + 1000 as int), source.bool_col, source.int_col, - source.float_col, source.decimal_col, - source.date_col, 'constant string value') ----- CATCH -Duplicate row found -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an inline view using an update to combine target and source values as a source value -merge into target_part_sort target using (select * from source) source on target.id = source.id + 1000 -when matched then update set float_col = cast(target.float_col + source.float_col as float) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' -1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' -1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' -2001,false,1,1.100000023841858,10,2009-01-01,'1 case 2' -2003,false,3,3.299999952316284,30,2009-01-01,'3 case 2' -2005,false,5,5.5,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Merge into partitioned and sorted target table from the source table as -# an inline view using an update to combine target and source values as a source value -# using join condition on string columns -merge into target_part_sort target using (select * from source) source on target.string_col = concat(source.string_col, ' case 2') -when matched then update set float_col = cast(target.float_col + source.float_col as float) ----- DML_RESULTS: target_part_sort -1000,true,0,0,0,2009-01-01,'constant string value' -1002,true,2,4.400000095367432,20,2009-01-01,'constant string value' -1004,true,4,8.8000001907348632,40,2009-01-01,'constant string value' -1006,true,6,13.19999980926514,60,2009-01-01,'constant string value' -2001,false,1,2.200000047683716,10,2009-01-01,'1 case 2' -2003,false,3,6.599999904632568,30,2009-01-01,'3 case 2' -2005,false,5,11,50,2009-01-01,'5 case 2' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Modifying the partition layout of 'target_part' by removing bucket partitions from int_col -alter table target_part set partition spec (truncate(3, decimal_col), year(date_col), truncate(3, string_col)) -==== ----- QUERY -# Merge into partitioned target table from the source table as -# an inline view using an update to set the value of int_col -merge into target_part target using (select * from source) source on target.id = source.id + 10 -when matched then update set int_col = source.int_col, id = source.id ----- DML_RESULTS: target_part -3,false,3,3.29999995232,30,2009-01-01,'3' -5,false,5,5.5,50,2009-01-01,'4' -4,true,4,4.40000009537,40,2009-01-01,'3' -6,true,6,6.59999990463,60,2009-01-01,'5' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Validate the files written for target_part (different partitioning) -show files in target_part ----- RESULTS: VERIFY_IS_SUBSET -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY' ----- TYPES -STRING,STRING,STRING,STRING -==== ----- QUERY -# Modifying the partition layout of 'target_part' by removing all partition transforms -alter table target_part set partition spec (void(int_col)) -==== ----- QUERY -# Merge into partitioned target table from the source table as -# an inline view using an update to set the value of int_col -merge into target_part target using (select * from source) source on target.id = source.id -when matched then update set int_col = cast(source.int_col + 10 as int) ----- DML_RESULTS: target_part -3,false,13,3.29999995232,30,2009-01-01,'3' -5,false,15,5.5,50,2009-01-01,'4' -4,true,14,4.40000009537,40,2009-01-01,'3' -6,true,16,6.59999990463,60,2009-01-01,'5' ----- TYPES -INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING -==== ----- QUERY -# Validate the files written for target_part (no partitioning) -show files in target_part ----- RESULTS: VERIFY_IS_SUBSET -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/[a-z0-9_-]+_data\.0\.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=30/date_col_year=2009/string_col_trunc=3/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=39/date_col_year=2009/string_col_trunc=3/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=48/date_col_year=2009/string_col_trunc=4/delete-.*.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/target_part/data/decimal_col_trunc=60/date_col_year=2009/string_col_trunc=5/delete-.*.parq','.*','','$ERASURECODE_POLICY' ----- TYPES -STRING,STRING,STRING,STRING ==== \ No newline at end of file diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 53a1e1834..624e4dd4e 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1989,6 +1989,12 @@ class TestIcebergV2Table(IcebergTestSuite): def test_merge(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge', vector, unique_database) + def test_merge_partition(self, vector, unique_database): + self.run_test_case('QueryTest/iceberg-merge-partition', vector, unique_database) + + def test_merge_partition_sort(self, vector, unique_database): + self.run_test_case('QueryTest/iceberg-merge-partition-sort', vector, unique_database) + def test_merge_long(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-merge-long', vector, unique_database)
