Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 )
Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables ...................................................................... Patch Set 6: (55 comments) Awesome work, this will be a huge milestone for the Impala project! Did a first round, but planning to do another rounds as this change is massive :) http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@24 PS6, Line 24: level nit: maybe 'phase' is a better word here http://gerrit.cloudera.org:8080/#/c/21423/6//COMMIT_MSG@47 PS6, Line 47: We should also have authorization tests. Also for fine-grained authz, e.g. test that if the source table is masked/filtered, then the MERGE statement cannot be used to expose the sensitive data. http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@18 PS6, Line 18: #ifndef IMPALA_EXEC_ICEBERG_MERGE_NODE_H : #define IMPALA_EXEC_ICEBERG_MERGE_NODE_H nit: we prefer #pragma once http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@43 PS6, Line 43: class IcebergMergePlanNode : public PlanNode { Please add class comment http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@101 PS6, Line 101: boost::scoped_ptr We prefer std::unique_ptr http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.h@158 PS6, Line 158: inline no need for 'inline', member-functions defined inside class definitions are implicitly inline: https://en.cppreference.com/w/cpp/language/inline http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@165 PS6, Line 165: nit: too much indent http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@178 PS6, Line 178: last_row_ = nullptr; I'm not sure if we want to reset last_row_ each time EvaluateCases() is invoked. E.g. what happens if output_batch becomes full when then there are duplicates in child_row_batch_? http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@186 PS6, Line 186: return Status( : "Duplicate row found: one target table row matched more than one source row"); Can we also output the duplicated row with PrintTuple()? It would be nice to print both the target tuple and the matching source tuples. http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@209 PS6, Line 209: if (selected_case) { nit: to reduce nesting, we could have if (!selected_case) continue; http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@210 PS6, Line 210: // Add a new row to output_batch : int dst_row_idx = output_batch->AddRow(); : TupleRow* dst_row = output_batch->GetRow(dst_row_idx); : : auto* target_tuple = Tuple::Create( : row_descriptor_.tuple_descriptors()[target_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); : auto* merge_action_tuple = Tuple::Create( : row_descriptor_.tuple_descriptors()[merge_action_tuple_idx_]->byte_size(), : output_batch->tuple_data_pool()); : : TIcebergMergeSinkAction::type action = selected_case->SinkAction(); : : dst_row->SetTuple(target_tuple_idx_, target_tuple); : dst_row->SetTuple(merge_action_tuple_idx_, merge_action_tuple); : : for (int i = 0; i < row_descriptor_.tuple_descriptors().size(); i++) { : if (i != target_tuple_idx_ && i != merge_action_tuple_idx_) { : dst_row->SetTuple(i, nullptr); : } : } : : target_tuple->MaterializeExprs<false, false>(source_row, : *row_descriptor_.tuple_descriptors()[target_tuple_idx_], : selected_case->combined_evaluators_, output_batch->tuple_data_pool()); : : RawValue::WriteNonNullPrimitive(&action, merge_action_tuple, type, nullptr); : : output_batch->CommitLastRow(); : IncrementNumRowsReturned(1); nit: for readability, this could go to its own member function http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@248 PS6, Line 248: if (previous_row == nullptr) { : return false; : } nit: fits single line http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@252 PS6, Line 252: if (previous_row_target_tuple == nullptr) { : return false; : } nit: fits single line http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-node.cc@262 PS6, Line 262: child_eos_ = false; last_row_ could be nulled out. http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h File be/src/exec/iceberg-merge-sink.h: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@18 PS6, Line 18: #ifndef IMPALA_EXEC_ICEBERG_MERGE_SINK_H : #define IMPALA_EXEC_ICEBERG_MERGE_SINK_H nit: please use #pragma once http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@22 PS6, Line 22: namespace impala { nit: please add blank line http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@38 PS6, Line 38: IcebergMergeSinkConfig nit: this is usually const T& http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.h@40 PS6, Line 40: IcebergMergeSinkConfig nit: missing const here as well http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.cc File be/src/exec/iceberg-merge-sink.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/exec/iceberg-merge-sink.cc@110 PS6, Line 110: row->DeepCopy(output_row, batch->row_desc()->tuple_descriptors(), : batch->tuple_data_pool(), false); Do we really need to DeepCopy the incoming rows? Setting the pointers isn't enough? http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/service/client-request-state.cc File be/src/service/client-request-state.cc: http://gerrit.cloudera.org:8080/#/c/21423/6/be/src/service/client-request-state.cc@1671 PS6, Line 1671: nit: requires 4 spaces indent http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift File common/thrift/PlanNodes.thrift: http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift@734 PS6, Line 734: 1: required list<Exprs.TExpr> output_expressions : 2: optional list<Exprs.TExpr> filter_conjuncts : 3: required TMergeCaseType type Comments for the fields would be helpful http://gerrit.cloudera.org:8080/#/c/21423/6/common/thrift/PlanNodes.thrift@789 PS6, Line 789: 22: optional TIcebergMergeNode merge_node field ids don't need to be in order. Reassigning existing field ids can make backports difficult. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup File fe/src/main/cup/sql-parser.cup: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/cup/sql-parser.cup@1020 PS6, Line 1020: table_ref:source Can we also allow queries here (in a follow-up patch)? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@105 PS6, Line 105: nit: unnecessary line break http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@131 PS6, Line 131: nit: unnecessary line break http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@139 PS6, Line 139: nit: unnecessary line break http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@32 PS6, Line 32: *; nit: please import individual classes http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@41 PS6, Line 41: Merge nit: could be all capital http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@91 PS6, Line 91: icebergTable_ = (FeIcebergTable) targetTableRef_.getTable(); We should check that the target is indeed an Iceberg table, otherwise we should raise an error. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@103 PS6, Line 103: mergeActionTuple_ = : analyzer.getDescTbl().createTupleDescriptor(MERGE_ACTION_TUPLE_NAME); : : targetTupleId_ = targetTableRef_.getId(); : : SlotDescriptor sd = analyzer.addSlotDescriptor(mergeActionTuple_); : sd.setType(Type.TINYINT); : sd.setIsMaterialized(true); : sd.setIsNullable(false); : : Expr mergeActionExpr = new SlotRef(sd); : sd.setSourceExpr(mergeActionExpr); : expressions_.mergeActionExpression(mergeActionExpr); The merge action related parts could be moved to a helper method for readability http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@136 PS6, Line 136: public DataSink createDataSink() { : TableSink insertSink = TableSink.create(icebergTable_, TableSink.Op.INSERT, : expressions_.targetPartitionExpressions(), : expressions_.targetExpressions(), : Collections.emptyList(), false, false, : expressions_.sortingColumnsAndOrder(), -1, : null, mergeStmt_.maxTableSinks_); : List<Expr> deletePartitionKeys = Collections.emptyList(); : if (icebergTable_.isPartitioned()) { : deletePartitionKeys = expressions_.targetPartitionMetaExpressions(); : } : TableSink deleteSink = : new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_, : deletePartitionKeys, : expressions_.targetPositionMetaExpressions(), deleteTableId_); : : return new IcebergMergeSink(insertSink, deleteSink, : Collections.singletonList(expressions_.mergeActionExpression())); : } nit: can we move it next to getPlanNode()? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@159 PS6, Line 159: expressions_.targetExpressions( : Expr.substituteList(expressions_.targetExpressions(), smap, analyzer, : true)); : expressions_.targetPartitionMetaExpressions(Expr.substituteList( : expressions_.targetPartitionMetaExpressions(), smap, analyzer, true)); : expressions_.targetPositionMetaExpressions(Expr.substituteList( : expressions_.targetPositionMetaExpressions(), smap, analyzer, true)); : expressions_.mergeActionExpression( : expressions_.mergeActionExpression().substitute(smap, analyzer, true)); : expressions_.targetPartitionExpressions(Expr.substituteList( : expressions_.targetPartitionExpressions(), smap, analyzer, true)); Could be placed to a method in MergeExpressions. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@240 PS6, Line 240: * nit: missing space before * http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@240 PS6, Line 240: SMALLINT Isn't it TINYINT? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@440 PS6, Line 440: public void targetPositionMetaExpressions(List<Expr> list) { : targetPositionMetaExpressions = list; : } please move next to the getter http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@444 PS6, Line 444: public List<Expr> targetPartitionExpressions() { : return targetPartitionExpressions; : } Please move next to the setter http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java File fe/src/main/java/org/apache/impala/analysis/MergeCase.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@33 PS6, Line 33: this. nit: redundant 'this.'. http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@40 PS6, Line 40: this. nit: redundant 'this.' http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@47 PS6, Line 47: this.analyzer_ = analyzer; nit: you could follow the usual pattern: if (isAnalyzed()) return; super.analyze(analyzer); http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@35 PS6, Line 35: columnPermutation_ Could you please add comment? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@83 PS6, Line 83: analyzeColumnPermutation Could you pelase add comment to this method? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@142 PS6, Line 142: collateColumns Could you please add a comment for this method? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java File fe/src/main/java/org/apache/impala/analysis/MergeStmt.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@48 PS6, Line 48: this.analyzer_ = analyzer; nit: You could follow the usual pattern: if (isAnalyzed()) return; super.analyze(analyzer); http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@68 PS6, Line 68: nit: redundant empty line http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@78 PS6, Line 78: (!(targetTableRef_ instanceof InlineViewRef This could be a precondition http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java File fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java@129 PS6, Line 129: cardinality_ = capCardinalityAtLimit(cardinality_); Can this statement have a limit? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java File fe/src/main/java/org/apache/impala/planner/Planner.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java@143 PS6, Line 143: singleNodePlan = addMergeNode(singleNodePlan, ctx_.getRootAnalyzer()); Would it be possible to move it to L188? Or do you want it to be before validatePlan()? http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/planner/Planner.java@228 PS6, Line 228: isUpdateOrDelete nit: this is the only place this method is being used. Could be isUpdateOrDeleteOrMerge()? Or if it is ugly we could just have ctx_.isUpdate() || ctx_.isDelete() || ctx_.isMerge() http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java File fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java@442 PS6, Line 442: indentation is off http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/util/IcebergUtil.java File fe/src/main/java/org/apache/impala/util/IcebergUtil.java: http://gerrit.cloudera.org:8080/#/c/21423/6/fe/src/main/java/org/apache/impala/util/IcebergUtil.java@1300 PS6, Line 1300: _ Since PartitionExprBundle is basically a struct and these fields are public, we don't need the underscores at the end. http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test: http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@5 PS6, Line 5: when matched then delete We could use INNER JOIN for such MERGE statements that only have MATCHED clauses. http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@13 PS6, Line 13: result expressions: target.id, target.`user`, target.action, target.event_time Do we need the result expressions when the type is DELETE? http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@223 PS6, Line 223: UNPARTITIONED I think this should be a PARTITIONED exchange on 'from_timestamp(target.event_time, 'yyyy-MM-dd-HH')'? http://gerrit.cloudera.org:8080/#/c/21423/6/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@291 PS6, Line 291: UNPARTITIONED Again, this should be a PARTITIONED exchange. http://gerrit.cloudera.org:8080/#/c/21423/6/tests/query_test/test_iceberg.py File tests/query_test/test_iceberg.py: http://gerrit.cloudera.org:8080/#/c/21423/6/tests/query_test/test_iceberg.py@1791 PS6, Line 1791: self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) Was it intentional to add this line? -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 6 Gerrit-Owner: Peter Rozsa <[email protected]> Gerrit-Reviewer: Daniel Becker <[email protected]> Gerrit-Reviewer: Gabor Kaszab <[email protected]> Gerrit-Reviewer: Noemi Pap-Takacs <[email protected]> Gerrit-Reviewer: Peter Rozsa <[email protected]> Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]> Gerrit-Comment-Date: Tue, 28 May 2024 17:15:51 +0000 Gerrit-HasComments: Yes
