Zoltan Borok-Nagy has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21423 )

Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg 
tables
......................................................................


Patch Set 6:

(55 comments)

Awesome work, this will be a huge milestone for the Impala project!
Did a first round, but planning to do another rounds as this change is massive 
:)

http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@24
PS6, Line 24: level
nit: maybe 'phase' is a better word here


http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@47
PS6, Line 47:
We should also have authorization tests. Also for fine-grained authz, e.g. test 
that if the source table is masked/filtered, then the MERGE statement cannot be 
used to expose the sensitive data.


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h
File be/src/exec/iceberg-merge-node.h:

http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@18
PS6, Line 18: #ifndef IMPALA_EXEC_ICEBERG_MERGE_NODE_H
            : #define IMPALA_EXEC_ICEBERG_MERGE_NODE_H
nit: we prefer #pragma once


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@43
PS6, Line 43: class IcebergMergePlanNode : public PlanNode {
Please add class comment


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@101
PS6, Line 101: boost::scoped_ptr
We prefer std::unique_ptr


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@158
PS6, Line 158: inline
no need for 'inline', member-functions defined inside class definitions are 
implicitly inline: https://en.cppreference.com/w/cpp/language/inline


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc
File be/src/exec/iceberg-merge-node.cc:

http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@165
PS6, Line 165:
nit: too much indent


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@178
PS6, Line 178: last_row_ = nullptr;
I'm not sure if we want to reset last_row_ each time EvaluateCases() is 
invoked. E.g. what happens if output_batch becomes full when then there are 
duplicates in child_row_batch_?


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@186
PS6, Line 186:       return Status(
             :           "Duplicate row found: one target table row matched 
more than one source row");
Can we also output the duplicated row with PrintTuple()?
It would be nice to print both the target tuple and the matching source tuples.


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@209
PS6, Line 209:     if (selected_case) {
nit: to reduce nesting, we could have

 if (!selected_case) continue;


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@210
PS6, Line 210:       // Add a new row to output_batch
             :       int dst_row_idx = output_batch->AddRow();
             :       TupleRow* dst_row = output_batch->GetRow(dst_row_idx);
             :
             :       auto* target_tuple = Tuple::Create(
             :           
row_descriptor_.tuple_descriptors()[target_tuple_idx_]->byte_size(),
             :           output_batch->tuple_data_pool());
             :       auto* merge_action_tuple = Tuple::Create(
             :           
row_descriptor_.tuple_descriptors()[merge_action_tuple_idx_]->byte_size(),
             :           output_batch->tuple_data_pool());
             :
             :       TIcebergMergeSinkAction::type action = 
selected_case->SinkAction();
             :
             :       dst_row->SetTuple(target_tuple_idx_, target_tuple);
             :       dst_row->SetTuple(merge_action_tuple_idx_, 
merge_action_tuple);
             :
             :       for (int i = 0; i < 
row_descriptor_.tuple_descriptors().size(); i++) {
             :         if (i != target_tuple_idx_ && i != 
merge_action_tuple_idx_) {
             :           dst_row->SetTuple(i, nullptr);
             :         }
             :       }
             :
             :       target_tuple->MaterializeExprs<false, false>(source_row,
             :           
*row_descriptor_.tuple_descriptors()[target_tuple_idx_],
             :           selected_case->combined_evaluators_, 
output_batch->tuple_data_pool());
             :
             :       RawValue::WriteNonNullPrimitive(&action, 
merge_action_tuple, type, nullptr);
             :
             :       output_batch->CommitLastRow();
             :       IncrementNumRowsReturned(1);
nit: for readability, this could go to its own member function


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@248
PS6, Line 248:   if (previous_row == nullptr) {
             :     return false;
             :   }
nit: fits single line


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@252
PS6, Line 252:   if (previous_row_target_tuple == nullptr) {
             :     return false;
             :   }
nit: fits single line


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@262
PS6, Line 262:   child_eos_ = false;
last_row_ could be nulled out.


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h
File be/src/exec/iceberg-merge-sink.h:

http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@18
PS6, Line 18: #ifndef IMPALA_EXEC_ICEBERG_MERGE_SINK_H
            : #define IMPALA_EXEC_ICEBERG_MERGE_SINK_H
nit: please use #pragma once


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@22
PS6, Line 22: namespace impala {
nit: please add blank line


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@38
PS6, Line 38: IcebergMergeSinkConfig
nit: this is usually const T&


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@40
PS6, Line 40: IcebergMergeSinkConfig
nit: missing const here as well


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.cc
File be/src/exec/iceberg-merge-sink.cc:

http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.cc@110
PS6, Line 110:       row->DeepCopy(output_row, 
batch->row_desc()->tuple_descriptors(),
             :           batch->tuple_data_pool(), false);
Do we really need to DeepCopy the incoming rows? Setting the pointers isn't 
enough?


http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/service/client-request-state.cc
File be/src/service/client-request-state.cc:

http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/service/client-request-state.cc@1671
PS6, Line 1671:
nit: requires 4 spaces indent


http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift
File common/thrift/PlanNodes.thrift:

http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift@734
PS6, Line 734:   1: required list<Exprs.TExpr> output_expressions
             :   2: optional list<Exprs.TExpr> filter_conjuncts
             :   3: required TMergeCaseType type
Comments for the fields would be helpful


http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift@789
PS6, Line 789:   22: optional TIcebergMergeNode merge_node
field ids don't need to be in order. Reassigning existing field ids can make 
backports difficult.


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup
File fe/src/main/cup/sql-parser.cup:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup@1020
PS6, Line 1020: table_ref:source
Can we also allow queries here (in a follow-up patch)?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@105
PS6, Line 105:
nit: unnecessary line break


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@131
PS6, Line 131:
nit: unnecessary line break


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@139
PS6, Line 139:
nit: unnecessary line break


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@32
PS6, Line 32: *;
nit: please import individual classes


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@41
PS6, Line 41: Merge
nit: could be all capital


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@91
PS6, Line 91:     icebergTable_ = (FeIcebergTable) targetTableRef_.getTable();
We should check that the target is indeed an Iceberg table, otherwise we should 
raise an error.


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@103
PS6, Line 103:     mergeActionTuple_ =
             :         
analyzer.getDescTbl().createTupleDescriptor(MERGE_ACTION_TUPLE_NAME);
             :
             :     targetTupleId_ = targetTableRef_.getId();
             :
             :     SlotDescriptor sd = 
analyzer.addSlotDescriptor(mergeActionTuple_);
             :     sd.setType(Type.TINYINT);
             :     sd.setIsMaterialized(true);
             :     sd.setIsNullable(false);
             :
             :     Expr mergeActionExpr = new SlotRef(sd);
             :     sd.setSourceExpr(mergeActionExpr);
             :     expressions_.mergeActionExpression(mergeActionExpr);
The merge action related parts could be moved to a helper method for readability


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@136
PS6, Line 136:   public DataSink createDataSink() {
             :     TableSink insertSink = TableSink.create(icebergTable_, 
TableSink.Op.INSERT,
             :         expressions_.targetPartitionExpressions(),
             :         expressions_.targetExpressions(),
             :         Collections.emptyList(), false, false,
             :         expressions_.sortingColumnsAndOrder(), -1,
             :         null, mergeStmt_.maxTableSinks_);
             :     List<Expr> deletePartitionKeys = Collections.emptyList();
             :     if (icebergTable_.isPartitioned()) {
             :       deletePartitionKeys = 
expressions_.targetPartitionMetaExpressions();
             :     }
             :     TableSink deleteSink =
             :         new 
IcebergBufferedDeleteSink(icebergPositionalDeleteTable_,
             :             deletePartitionKeys,
             :             expressions_.targetPositionMetaExpressions(), 
deleteTableId_);
             :
             :     return new IcebergMergeSink(insertSink, deleteSink,
             :         
Collections.singletonList(expressions_.mergeActionExpression()));
             :   }
nit: can we move it next to getPlanNode()?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@159
PS6, Line 159:     expressions_.targetExpressions(
             :         Expr.substituteList(expressions_.targetExpressions(), 
smap, analyzer,
             :             true));
             :     
expressions_.targetPartitionMetaExpressions(Expr.substituteList(
             :         expressions_.targetPartitionMetaExpressions(), smap, 
analyzer, true));
             :     
expressions_.targetPositionMetaExpressions(Expr.substituteList(
             :         expressions_.targetPositionMetaExpressions(), smap, 
analyzer, true));
             :     expressions_.mergeActionExpression(
             :         expressions_.mergeActionExpression().substitute(smap, 
analyzer, true));
             :     expressions_.targetPartitionExpressions(Expr.substituteList(
             :         expressions_.targetPartitionExpressions(), smap, 
analyzer, true));
Could be placed to a method in MergeExpressions.


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@240
PS6, Line 240: *
nit: missing space before *


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@240
PS6, Line 240: SMALLINT
Isn't it TINYINT?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@440
PS6, Line 440:     public void targetPositionMetaExpressions(List<Expr> list) {
             :       targetPositionMetaExpressions = list;
             :     }
please move next to the getter


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@444
PS6, Line 444:     public List<Expr> targetPartitionExpressions() {
             :       return targetPartitionExpressions;
             :     }
Please move next to the setter


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java
File fe/src/main/java/org/apache/impala/analysis/MergeCase.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@33
PS6, Line 33: this.
nit: redundant 'this.'.


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@40
PS6, Line 40: this.
nit: redundant 'this.'


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@47
PS6, Line 47:     this.analyzer_ = analyzer;
nit: you could follow the usual pattern:

    if (isAnalyzed()) return;
    super.analyze(analyzer);


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java
File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@35
PS6, Line 35: columnPermutation_
Could you please add comment?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@83
PS6, Line 83: analyzeColumnPermutation
Could you pelase add comment to this method?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@142
PS6, Line 142: collateColumns
Could you please add a comment for this method?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java
File fe/src/main/java/org/apache/impala/analysis/MergeStmt.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@48
PS6, Line 48:     this.analyzer_ = analyzer;
nit: You could follow the usual pattern:

    if (isAnalyzed()) return;
    super.analyze(analyzer);


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@68
PS6, Line 68: 
nit: redundant empty line


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@78
PS6, Line 78: (!(targetTableRef_ instanceof InlineViewRef
This could be a precondition


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java
File fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java@129
PS6, Line 129:     cardinality_ = capCardinalityAtLimit(cardinality_);
Can this statement have a limit?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java
File fe/src/main/java/org/apache/impala/planner/Planner.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java@143
PS6, Line 143:       singleNodePlan = addMergeNode(singleNodePlan, 
ctx_.getRootAnalyzer());
Would it be possible to move it to L188? Or do you want it to be before 
validatePlan()?


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java@228
PS6, Line 228: isUpdateOrDelete
nit: this is the only place this method is being used. Could be 
isUpdateOrDeleteOrMerge()? Or if it is ugly we could just have

 ctx_.isUpdate() || ctx_.isDelete() || ctx_.isMerge()


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
File fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java@442
PS6, Line 442:
indentation is off


http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
File fe/src/main/java/org/apache/impala/util/IcebergUtil.java:

http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/util/IcebergUtil.java@1300
PS6, Line 1300: _
Since PartitionExprBundle is basically a struct and these fields are public, we 
don't need the underscores at the end.


http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test
File 
testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test:

http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@5
PS6, Line 5: when matched then delete
We could use INNER JOIN for such MERGE statements that only have MATCHED 
clauses.


http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@13
PS6, Line 13: result expressions: target.id, target.`user`, target.action, 
target.event_time
Do we need the result expressions when the type is DELETE?


http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@223
PS6, Line 223: UNPARTITIONED
I think this should be a PARTITIONED exchange on 
'from_timestamp(target.event_time, 'yyyy-MM-dd-HH')'?


http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@291
PS6, Line 291: UNPARTITIONED
Again, this should be a PARTITIONED exchange.


http://gerrit.cloudera.org:8080/#/c/21423/6/tests/query_test/test_iceberg.py
File tests/query_test/test_iceberg.py:

http://gerrit.cloudera.org:8080/#/c/21423/6/tests/query_test/test_iceberg.py@1791
PS6, Line 1791: self.run_test_case('QueryTest/iceberg-optimize', vector, 
unique_database)
Was it intentional to add this line?



--
To view, visit http://gerrit.cloudera.org:8080/21423
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268
Gerrit-Change-Number: 21423
Gerrit-PatchSet: 6
Gerrit-Owner: Peter Rozsa <[email protected]>
Gerrit-Reviewer: Daniel Becker <[email protected]>
Gerrit-Reviewer: Gabor Kaszab <[email protected]>
Gerrit-Reviewer: Noemi Pap-Takacs <[email protected]>
Gerrit-Reviewer: Peter Rozsa <[email protected]>
Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]>
Gerrit-Comment-Date: Tue, 28 May 2024 17:15:51 +0000
Gerrit-HasComments: Yes

Reply via email to