This is an automated email from the ASF dual-hosted git repository.
laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ff2f2ba77 IMPALA-13324: Enable statement rewrite for merge queries for
IcebergMergeImpl
ff2f2ba77 is described below
commit ff2f2ba77ebe5bc83ef7f3e62d43c14af1f777eb
Author: Peter Rozsa <[email protected]>
AuthorDate: Wed Nov 6 10:59:09 2024 +0100
IMPALA-13324: Enable statement rewrite for merge queries for
IcebergMergeImpl
This change enables MERGE statements with source expressions containing
subqueries that require rewrite. The change adds implementation for
reset methods for each merge case, and properly handles resets for
MergeStmt and IcebergMergeImpl.
Tests:
- Planner test added with a merge query that requires a rewrite
- Analyzer test modified
Change-Id: I26e5661274aade3f74a386802c0ed20e5cb068b5
Reviewed-on: http://gerrit.cloudera.org:8080/22039
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/analysis/AnalysisContext.java | 2 +-
.../apache/impala/analysis/IcebergMergeImpl.java | 36 +++-
.../java/org/apache/impala/analysis/MergeCase.java | 12 +-
.../java/org/apache/impala/analysis/MergeImpl.java | 2 +
.../org/apache/impala/analysis/MergeInsert.java | 6 +
.../java/org/apache/impala/analysis/MergeStmt.java | 37 +++--
.../org/apache/impala/analysis/MergeUpdate.java | 9 +
.../org/apache/impala/analysis/StmtRewriter.java | 6 +-
.../impala/analysis/AnalyzeModifyStmtsTest.java | 13 +-
.../queries/PlannerTest/iceberg-merge.test | 182 +++++++++++++++++++++
.../queries/QueryTest/iceberg-merge-partition.test | 40 ++++-
.../queries/QueryTest/ranger_column_masking.test | 22 +++
12 files changed, 331 insertions(+), 36 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 6bf1431ca..6f28c3a9f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -435,7 +435,7 @@ public class AnalysisContext {
}
public boolean requiresExprRewrite() {
return isQueryStmt() || isInsertStmt() || isCreateTableAsSelectStmt()
- || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt();
+ || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt() ||
isMergeStmt();
}
public boolean requiresSetOperationRewrite() {
return analyzer_.containsSetOperation() && !isCreateViewStmt() &&
!isAlterViewStmt()
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
index ffcd433fd..5956e59ec 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
@@ -68,12 +68,12 @@ public class IcebergMergeImpl implements MergeImpl {
private static final String MERGE_ACTION_TUPLE_NAME = "merge-action";
private final MergeStmt mergeStmt_;
- private final TableRef targetTableRef_;
- private final TableRef sourceTableRef_;
+ private TableRef targetTableRef_;
+ private TableRef sourceTableRef_;
private final Expr on_;
private FeIcebergTable icebergTable_;
private IcebergPositionDeleteTable icebergPositionalDeleteTable_;
- private QueryStmt queryStmt_;
+ private SelectStmt queryStmt_;
private int deleteTableId_;
private final FeTable table_;
private TupleDescriptor mergeActionTuple_;
@@ -101,9 +101,20 @@ public class IcebergMergeImpl implements MergeImpl {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
+ if (queryStmt_ != null ) {
+ Preconditions.checkState(queryStmt_.fromClause_.size() == 2);
+ sourceTableRef_ = analyzer.resolveTableRef(sourceTableRef_);
+ targetTableRef_ = analyzer.resolveTableRef(targetTableRef_);
+ queryStmt_.fromClause_.set(0, targetTableRef_);
+ queryStmt_.fromClause_.set(1, sourceTableRef_);
+ }
setJoinParams();
-
targetTableRef_.analyze(analyzer);
+
+ if (targetTableRef_.isTableMaskingView()) {
+ targetTableRef_= ((InlineViewRef)targetTableRef_).getUnMaskedTableRef();
+ }
+
FeTable table = targetTableRef_.getTable();
Preconditions.checkState(table instanceof FeIcebergTable);
icebergTable_ = (FeIcebergTable) table;
@@ -235,6 +246,15 @@ public class IcebergMergeImpl implements MergeImpl {
return mergeNode;
}
+ @Override
+ public void reset() {
+ queryStmt_.reset();
+ // TableRef resets are replacing references instead of resetting the
object state
+ targetTableRef_ = queryStmt_.fromClause_.get(0);
+ sourceTableRef_ = queryStmt_.fromClause_.get(1);
+ targetPartitionExpressions_.clear();
+ }
+
@Override
public DataSink createDataSink() {
if (mergeStmt_.hasOnlyDeleteCases()) { return createDeleteSink(); }
@@ -281,7 +301,7 @@ public class IcebergMergeImpl implements MergeImpl {
*
* @return Query statement that contains every target and source columns
*/
- public QueryStmt prepareQuery() {
+ public SelectStmt prepareQuery() {
List<SelectListItem> selectListItems = Lists.newArrayList();
SelectList selectList = new SelectList(selectListItems);
// Straight join hint is required to fix the join sides.
@@ -331,9 +351,9 @@ public class IcebergMergeImpl implements MergeImpl {
selectListItems.add(sourceColumns);
rowPresentExpression_ = rowPresentExpression;
- targetPartitionMetaExpressions_.addAll(partitionMetaExpressions);
- targetPositionMetaExpressions_.addAll(positionMetaExpressions);
- targetExpressions_.addAll(targetSlotRefs);
+ targetPartitionMetaExpressions_ = partitionMetaExpressions;
+ targetPositionMetaExpressions_ = positionMetaExpressions;
+ targetExpressions_ = targetSlotRefs;
FromClause fromClause =
new FromClause(Lists.newArrayList(targetTableRef_, sourceTableRef_));
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
index c15639dfe..42758e415 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
@@ -24,6 +24,7 @@ import java.util.StringJoiner;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
+import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TMergeCaseType;
import org.apache.impala.thrift.TMergeMatchType;
@@ -134,8 +135,10 @@ public abstract class MergeCase extends StatementBase {
@Override
public void reset() {
super.reset();
- filterExprs_ = Collections.emptyList();
resultExprs_ = Collections.emptyList();
+ for(Expr expr : filterExprs_) {
+ expr.reset();
+ }
}
@Override
@@ -144,9 +147,14 @@ public abstract class MergeCase extends StatementBase {
@Override
public List<Expr> getResultExprs() { return resultExprs_; }
- public TMergeMatchType matchType(){
+ public TMergeMatchType matchType() {
return matchType_;
}
+ @Override
+ public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+ rewriter.rewriteList(filterExprs_, analyzer_);
+ rewriter.rewriteList(resultExprs_, analyzer_);
+ }
public String matchTypeAsString() {
switch (matchType_) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java
b/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java
index 9f7b3c2a8..b497e9bd8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeImpl.java
@@ -94,4 +94,6 @@ public interface MergeImpl {
*/
PlanNode getPlanNode(PlannerContext ctx, PlanNode child, Analyzer analyzer)
throws ImpalaException;
+
+ void reset();
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
index b96e13a03..ca5faf4bb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
@@ -97,6 +97,12 @@ public class MergeInsert extends MergeCase {
@Override
public TMergeCaseType caseType() { return TMergeCaseType.INSERT; }
+ @Override
+ public void reset() {
+ super.reset();
+ selectList_.reset();
+ }
+
@Override
public MergeInsert clone() {
return new MergeInsert(Expr.cloneList(resultExprs_),
Expr.cloneList(getFilterExprs()),
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
index bf4dcafb1..e9c1da3fe 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
@@ -25,6 +25,7 @@ import org.apache.impala.common.ImpalaException;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.PlannerContext;
+import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TMergeCaseType;
import org.apache.impala.thrift.TMergeMatchType;
import org.apache.impala.thrift.TSortingOrder;
@@ -41,9 +42,7 @@ import org.apache.impala.thrift.TSortingOrder;
public class MergeStmt extends DmlStatementBase {
private static final int MERGE_CASE_LIMIT = 1000;
private TableRef targetTableRef_;
- private TableRef originalTargetTableRef_;
private TableRef sourceTableRef_;
- private TableRef originalSourceTableRef_;
private final List<MergeCase> cases_;
private final Expr onClause_;
private MergeImpl impl_;
@@ -51,9 +50,7 @@ public class MergeStmt extends DmlStatementBase {
public MergeStmt(TableRef target, TableRef source, Expr onClause,
List<MergeCase> cases) {
targetTableRef_ = target;
- originalTargetTableRef_ = target;
sourceTableRef_ = source;
- originalSourceTableRef_ = source;
onClause_ = onClause;
cases_ = cases;
}
@@ -80,13 +77,16 @@ public class MergeStmt extends DmlStatementBase {
table_ = targetTableRef_.getTable();
- if (table_ instanceof FeIcebergTable) {
- impl_ = new IcebergMergeImpl(this, targetTableRef_, sourceTableRef_,
onClause_);
- setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
- } else {
- throw new AnalysisException(String.format(
- "Target table must be an Iceberg table: %s", table_.getFullName()));
+ if (impl_ == null) {
+ if (table_ instanceof FeIcebergTable) {
+ impl_ = new IcebergMergeImpl(this, targetTableRef_, sourceTableRef_,
onClause_);
+ setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
+ } else {
+ throw new AnalysisException(String.format(
+ "Target table must be an Iceberg table: %s",
table_.getFullName()));
+ }
}
+
impl_.analyze(analyzer);
for (MergeCase mergeCase : getCases()) {
@@ -99,7 +99,7 @@ public class MergeStmt extends DmlStatementBase {
public void collectTableRefs(List<TableRef> tblRefs) {
super.collectTableRefs(tblRefs);
if (sourceTableRef_ instanceof InlineViewRef) {
- ((InlineViewRef) sourceTableRef_).queryStmt_.collectTableRefs(tblRefs,
true);
+ ((InlineViewRef) sourceTableRef_).queryStmt_.collectTableRefs(tblRefs);
} else {
tblRefs.add(sourceTableRef_);
}
@@ -144,13 +144,24 @@ public class MergeStmt extends DmlStatementBase {
@Override
public void reset() {
super.reset();
- targetTableRef_ = originalTargetTableRef_;
- sourceTableRef_ = originalSourceTableRef_;
+ impl_.reset();
onClause_.reset();
for (MergeCase mergeCase : cases_) {
mergeCase.reset();
}
}
+ @Override
+ public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
+ return getQueryStmt().resolveTableMask(analyzer);
+ }
+
+ @Override
+ public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+ getQueryStmt().rewriteExprs(rewriter);
+ for (MergeCase mergeCase : cases_) {
+ mergeCase.rewriteExprs(rewriter);
+ }
+ }
public QueryStmt getQueryStmt() { return impl_.getQueryStmt(); }
diff --git a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
index 47cea51b9..912ea5dd5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java
@@ -122,6 +122,15 @@ public class MergeUpdate extends MergeCase {
}
}
+ @Override
+ public void reset() {
+ super.reset();
+ for (Pair<SlotRef, Expr> expr : assignmentExprs_) {
+ expr.first.reset();
+ expr.second.reset();
+ }
+ }
+
@Override
public MergeUpdate clone() {
return new MergeUpdate(Expr.cloneList(resultExprs_),
Expr.cloneList(getFilterExprs()),
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
index 6b30b4bde..0a2185f40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
@@ -68,12 +68,10 @@ public class StmtRewriter {
queryStmt = ((UpdateStmt) analysisResult.getStmt()).getQueryStmt();
} else if (analysisResult.isDeleteStmt()) {
queryStmt = ((DeleteStmt) analysisResult.getStmt()).getQueryStmt();
+ } else if (analysisResult.isMergeStmt()) {
+ queryStmt = ((MergeStmt) analysisResult.getStmt()).getQueryStmt();
} else if (analysisResult.isTestCaseStmt()) {
queryStmt = ((CopyTestCaseStmt) analysisResult.getStmt()).getQueryStmt();
- } else if (analysisResult.isMergeStmt()) {
- // TODO: IMPALA-13324
- throw new AnalysisException("Unable to rewrite MERGE query statement: "
- + ((MergeStmt) analysisResult.getStmt()).getQueryStmt().toSql());
} else {
throw new AnalysisException("Unsupported statement: " + stmt.toSql());
}
diff --git
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
index 34299e7ed..da0bf9f61 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java
@@ -378,6 +378,12 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
+ "functional_parquet.iceberg_v2_partitioned_position_deletes target "
+ "using functional_parquet.iceberg_non_partitioned s "
+ "on target.id = s.id when not matched by source then update set *");
+ // Query rewrite in the source subquery
+ AnalyzesOk("merge into functional_parquet.iceberg_partition_evolution t "
+ + "using (select * from functional_parquet.iceberg_non_partitioned
where "
+ + "id in (select max(id) from
functional_parquet.iceberg_non_partitioned)) s "
+ + "on t.id = s.id "
+ + "when matched and s.id > 2 then delete");
// Inline view as target
AnalysisError("merge into "
@@ -479,13 +485,6 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest {
+ "on t.id = s.id "
+ "when matched and s.id then delete",
"Filter expression requires return type 'BOOLEAN'. Actual type is
'INT'");
- // Subquery rewrite in query statement
- AnalysisError("merge into functional_parquet.iceberg_partition_evolution t
"
- + "using (select * from functional_parquet.iceberg_non_partitioned
where "
- + "id in (select max(id) from
functional_parquet.iceberg_non_partitioned)) s "
- + "on t.id = s.id "
- + "when matched and s.id > 2 then delete",
- "Unable to rewrite MERGE query statement");
// UPDATE SET * with different column lists
AnalysisError("merge into functional_parquet.iceberg_partition_evolution t
"
+ "using functional_parquet.iceberg_non_partitioned s "
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
index 1082cd650..fc515368e 100644
---
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
+++
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
@@ -831,3 +831,185 @@ MERGE SINK
Iceberg snapshot id: 8885697082976537578
row-size=80B cardinality=20
====
+# Merge into a partitioned Iceberg table using multiple merge cases and using
a query that will be rewritten as source
+merge into functional_parquet.iceberg_v2_partitioned_position_deletes target
+using (select distinct 1 id, "string value" string_col
+from functional.alltypesagg a
+where exists
+ (select id
+ from functional.alltypestiny b
+ where a.tinyint_col = b.tinyint_col and a.string_col = b.string_col
+ group by rollup(id, int_col, bool_col)
+ having int_col is null)
+and tinyint_col < 10) source
+on target.id = source.id
+when matched then update set action = string_col
+when not matched then insert (id, user, action) values(source.id,
source.string_col, concat("something ", source.string_col))
+---- PLAN
+MERGE SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes,
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+11:SORT
+| order by: action ASC NULLS LAST
+| row-size=81B cardinality=11
+|
+10:MERGE
+| CASE 0: MATCHED
+| | result expressions: target.id, target.`user`, 'string value',
target.event_time
+| | type: UPDATE
+| CASE 1: NOT MATCHED BY TARGET
+| | result expressions: 1, 'string value', concat('something ', 'string
value'), NULL
+| | type: INSERT
+| row-size=93B cardinality=11
+|
+09:HASH JOIN [FULL OUTER JOIN]
+| hash predicates: target.id = 1
+| row-size=93B cardinality=11
+|
+|--08:AGGREGATE [FINALIZE]
+| | group by: 1, 'string value'
+| | row-size=13B cardinality=1
+| |
+| 07:HASH JOIN [LEFT SEMI JOIN]
+| | hash predicates: a.tinyint_col = CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.tinyint_col WHEN 4 THEN b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN
b.tinyint_col END, a.string_col = CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.string_col WHEN 4 THEN b.string_col WHEN 5 THEN b.string_col WHEN 6 THEN
b.string_col END
+| | runtime filters: RF000 <- CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.tinyint_col WHEN 4 THEN b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN
b.tinyint_col END, RF001 <- CASE valid_tid(3,4,5,6) WHEN 3 THEN b.string_col
WHEN 4 THEN b.string_col WHEN 5 THEN b.string_col WHEN 6 THEN b.string_col END
+| | row-size=16B cardinality=1
+| |
+| |--06:AGGREGATE [FINALIZE]
+| | | group by: CASE valid_tid(3,4,5,6) WHEN 3 THEN id WHEN 4 THEN id WHEN
5 THEN id WHEN 6 THEN NULL END, CASE valid_tid(3,4,5,6) WHEN 3 THEN int_col
WHEN 4 THEN int_col WHEN 5 THEN NULL WHEN 6 THEN NULL END, CASE
valid_tid(3,4,5,6) WHEN 3 THEN bool_col WHEN 4 THEN NULL WHEN 5 THEN NULL WHEN
6 THEN NULL END, CASE valid_tid(3,4,5,6) WHEN 3 THEN b.tinyint_col WHEN 4 THEN
b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN b.tinyint_col END, CASE
valid_tid(3,4,5,6) WHEN 3 THEN b.string_ [...]
+| | | having: CASE valid_tid(3,4,5,6) WHEN 3 THEN int_col WHEN 4 THEN
int_col WHEN 5 THEN NULL WHEN 6 THEN NULL END IS NULL
+| | | row-size=26B cardinality=1
+| | |
+| | 05:AGGREGATE [FINALIZE]
+| | | Class 0
+| | | group by: id, int_col, bool_col, b.tinyint_col, b.string_col
+| | | Class 1
+| | | group by: id, int_col, NULL, b.tinyint_col, b.string_col
+| | | Class 2
+| | | group by: id, NULL, NULL, b.tinyint_col, b.string_col
+| | | Class 3
+| | | group by: NULL, NULL, NULL, b.tinyint_col, b.string_col
+| | | row-size=92B cardinality=28
+| | |
+| | 04:SCAN HDFS [functional.alltypestiny b]
+| | HDFS partitions=4/4 files=4 size=460B
+| | row-size=23B cardinality=8
+| |
+| 03:SCAN HDFS [functional.alltypesagg a]
+| HDFS partitions=11/11 files=11 size=814.73KB
+| predicates: tinyint_col < 10
+| runtime filters: RF000 -> a.tinyint_col, RF001 -> a.string_col
+| row-size=16B cardinality=1.10K
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+| row-size=80B cardinality=10
+|
+|--01:SCAN HDFS
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01
target-position-delete]
+| HDFS partitions=1/1 files=3 size=9.47KB
+| Iceberg snapshot id: 8885697082976537578
+| row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes
target]
+ HDFS partitions=1/1 files=3 size=3.48KB
+ Iceberg snapshot id: 8885697082976537578
+ row-size=80B cardinality=20
+---- DISTRIBUTEDPLAN
+MERGE SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes,
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+20:SORT
+| order by: action ASC NULLS LAST
+| row-size=81B cardinality=11
+|
+19:EXCHANGE [HASH(target.action)]
+|
+10:MERGE
+| CASE 0: MATCHED
+| | result expressions: target.id, target.`user`, 'string value',
target.event_time
+| | type: UPDATE
+| CASE 1: NOT MATCHED BY TARGET
+| | result expressions: 1, 'string value', concat('something ', 'string
value'), NULL
+| | type: INSERT
+| row-size=93B cardinality=11
+|
+09:HASH JOIN [FULL OUTER JOIN, PARTITIONED]
+| hash predicates: target.id = 1
+| row-size=93B cardinality=11
+|
+|--18:EXCHANGE [HASH(1)]
+| |
+| 16:AGGREGATE [FINALIZE]
+| | group by: 1, 'string value'
+| | row-size=13B cardinality=1
+| |
+| 15:EXCHANGE [HASH(1,'string value')]
+| |
+| 08:AGGREGATE [STREAMING]
+| | group by: 1, 'string value'
+| | row-size=13B cardinality=1
+| |
+| 07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+| | hash predicates: a.tinyint_col = CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.tinyint_col WHEN 4 THEN b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN
b.tinyint_col END, a.string_col = CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.string_col WHEN 4 THEN b.string_col WHEN 5 THEN b.string_col WHEN 6 THEN
b.string_col END
+| | runtime filters: RF000 <- CASE valid_tid(3,4,5,6) WHEN 3 THEN
b.tinyint_col WHEN 4 THEN b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN
b.tinyint_col END, RF001 <- CASE valid_tid(3,4,5,6) WHEN 3 THEN b.string_col
WHEN 4 THEN b.string_col WHEN 5 THEN b.string_col WHEN 6 THEN b.string_col END
+| | row-size=16B cardinality=1
+| |
+| |--14:EXCHANGE [BROADCAST]
+| | |
+| | 06:AGGREGATE [FINALIZE]
+| | | group by: CASE valid_tid(3,4,5,6) WHEN 3 THEN id WHEN 4 THEN id WHEN
5 THEN id WHEN 6 THEN NULL END, CASE valid_tid(3,4,5,6) WHEN 3 THEN int_col
WHEN 4 THEN int_col WHEN 5 THEN NULL WHEN 6 THEN NULL END, CASE
valid_tid(3,4,5,6) WHEN 3 THEN bool_col WHEN 4 THEN NULL WHEN 5 THEN NULL WHEN
6 THEN NULL END, CASE valid_tid(3,4,5,6) WHEN 3 THEN b.tinyint_col WHEN 4 THEN
b.tinyint_col WHEN 5 THEN b.tinyint_col WHEN 6 THEN b.tinyint_col END, CASE
valid_tid(3,4,5,6) WHEN 3 THEN b.string_ [...]
+| | | having: CASE valid_tid(3,4,5,6) WHEN 3 THEN int_col WHEN 4 THEN
int_col WHEN 5 THEN NULL WHEN 6 THEN NULL END IS NULL
+| | | row-size=26B cardinality=1
+| | |
+| | 13:AGGREGATE [FINALIZE]
+| | | Class 0
+| | | group by: id, int_col, bool_col, b.tinyint_col, b.string_col
+| | | Class 1
+| | | group by: id, int_col, NULL, b.tinyint_col, b.string_col
+| | | Class 2
+| | | group by: id, NULL, NULL, b.tinyint_col, b.string_col
+| | | Class 3
+| | | group by: NULL, NULL, NULL, b.tinyint_col, b.string_col
+| | | row-size=92B cardinality=28
+| | |
+| | 12:EXCHANGE [HASH(CASE valid_tid(3,4,5,6) WHEN 3 THEN murmur_hash(id)
WHEN 4 THEN murmur_hash(id) WHEN 5 THEN murmur_hash(id) WHEN 6 THEN
murmur_hash(NULL) END,CASE valid_tid(3,4,5,6) WHEN 3 THEN murmur_hash(int_col)
WHEN 4 THEN murmur_hash(int_col) WHEN 5 THEN murmur_hash(NULL) WHEN 6 THEN
murmur_hash(NULL) END,CASE valid_tid(3,4,5,6) WHEN 3 THEN murmur_hash(bool_col)
WHEN 4 THEN murmur_hash(NULL) WHEN 5 THEN murmur_hash(NULL) WHEN 6 THEN
murmur_hash(NULL) END,CASE valid_tid(3,4,5 [...]
+| | |
+| | 05:AGGREGATE [STREAMING]
+| | | Class 0
+| | | group by: id, int_col, bool_col, b.tinyint_col, b.string_col
+| | | Class 1
+| | | group by: id, int_col, NULL, b.tinyint_col, b.string_col
+| | | Class 2
+| | | group by: id, NULL, NULL, b.tinyint_col, b.string_col
+| | | Class 3
+| | | group by: NULL, NULL, NULL, b.tinyint_col, b.string_col
+| | | row-size=92B cardinality=28
+| | |
+| | 04:SCAN HDFS [functional.alltypestiny b]
+| | HDFS partitions=4/4 files=4 size=460B
+| | row-size=23B cardinality=8
+| |
+| 03:SCAN HDFS [functional.alltypesagg a]
+| HDFS partitions=11/11 files=11 size=814.73KB
+| predicates: tinyint_col < 10
+| runtime filters: RF000 -> a.tinyint_col, RF001 -> a.string_col
+| row-size=16B cardinality=1.10K
+|
+17:EXCHANGE [HASH(target.id)]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+| row-size=80B cardinality=10
+|
+|--11:EXCHANGE [DIRECTED]
+| |
+| 01:SCAN HDFS
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01
target-position-delete]
+| HDFS partitions=1/1 files=3 size=9.47KB
+| Iceberg snapshot id: 8885697082976537578
+| row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes
target]
+ HDFS partitions=1/1 files=3 size=3.48KB
+ Iceberg snapshot id: 8885697082976537578
+ row-size=80B cardinality=20
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
index 7de09e601..70cb459e0 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
@@ -300,4 +300,42 @@ when matched and source.id < 4 then update set *
---- TYPES
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
====
-
+---- QUERY
+# Merge into partitioned target table from an inline view that is guaranteed
to be rewritten
+# query using an update to set the value of int_col
+merge into target_part target using (select distinct 3 id, "string value"
string_col
+from functional.alltypesagg a
+where exists
+ (select id
+ from functional.alltypestiny b
+ where a.tinyint_col = b.tinyint_col and a.string_col = b.string_col
+ group by rollup(id, int_col, bool_col)
+ having int_col is null)
+and tinyint_col < 10) source on target.id = source.id
+when matched then update set string_col = source.string_col
+when not matched by source and target.id > 7 then delete
+---- DML_RESULTS: target_part
+0,true,0,0,0,2009-01-01,'0'
+1,false,1,1.100000023841858,10,2009-01-01,'1'
+2,true,2,2.200000047683716,20,2009-01-01,'2'
+3,false,0,0,0,2009-01-01,'string value'
+5,false,5,5.5,50,2009-01-01,'5'
+6,true,6,6.599999904632568,60,2009-01-01,'6'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
+---- QUERY
+# Merge into partitioned target table from source table using a matched
+# case with a between filter predicate
+merge into target_part target using source on target.id = source.id
+when matched and target.id between 2 and 3 then update set string_col =
"different string value"
+---- DML_RESULTS: target_part
+0,true,0,0,0,2009-01-01,'0'
+1,false,1,1.100000023841858,10,2009-01-01,'1'
+2,true,2,2.200000047683716,20,2009-01-01,'different string value'
+3,false,0,0,0,2009-01-01,'different string value'
+5,false,5,5.5,50,2009-01-01,'5'
+6,true,6,6.599999904632568,60,2009-01-01,'6'
+---- TYPES
+INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
+====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 207440680..d81103c25 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -709,6 +709,28 @@ select * from $UNIQUE_DB.masked_iceberg;
INT, STRING
====
---- QUERY
+# Test MERGE stmt for Iceberg target when source is a masked table.
+merge into $UNIQUE_DB.masked_iceberg target using alltypestiny source on
target.id = cast(source.id + 100 as int)
+when matched then update set target.string_col = source.string_col
+when not matched then insert values (cast(source.id + 100 as int), "string");
+select * from $UNIQUE_DB.masked_iceberg;
+---- RESULTS
+0,'0aaa'
+1,'NULL'
+3,'NULL'
+5,'NULL'
+100,'0aaa'
+200,'1aaa'
+300,'0aaa'
+400,'1aaa'
+500,'0aaa'
+600,'1aaa'
+700,'0aaa'
+800,'string'
+---- TYPES
+INT, STRING
+====
+---- QUERY
# Test on CreateView. Should not mask the columns when used in sql generations.
create view $UNIQUE_DB.masked_view as select * from alltypestiny;
show create view $UNIQUE_DB.masked_view;