Peter Rozsa has posted comments on this change. ( http://gerrit.cloudera.org:8080/21423 )
Change subject: IMPALA-12732: Add support for MERGE statements for Iceberg tables ...................................................................... Patch Set 12: (39 comments) Thank you Noemi and Zoltan! http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h File be/src/exec/iceberg-merge-node.h: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@22 PS11, Line 22: #include "exec/exec-node.h" > I don't think we need this #include here. Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@70 PS11, Line 70: /// the output row should be updated, deleted or inserted. : TTupleId merge_action_tuple_id_ = -1; : > nit: Comment could fit two lines only. Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@110 PS11, Line 110: ergeCase*> matched_cases_; > Maybe it's safer to and simpler to use Tuple* only that points to the targe It would require the null checks to be moved into EvaluateCases, as the result set may contain a tuple row where the target table's tuple is null (not matched case). http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.h@134 PS11, Line 134: > nit: Is 'inline' keyword needed? The other way to initialize non-trivial object is a separate line like 'const ColumnType IcebergMergeNode::merge_action_tuple_type_ = ColumnType(TYPE_TINYINT);' With this form, it's more compact, but it also generates an init guard for cxx_global_var_init. http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc File be/src/exec/iceberg-merge-node.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@167 PS11, Line 167: *e > nit: +4 indent needed Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-node.cc@180 PS11, Line 180: auto row = iter.Get(); : > nit: could be moved after duplicate check Done http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc File be/src/exec/iceberg-merge-sink.cc: http://gerrit.cloudera.org:8080/#/c/21423/11/be/src/exec/iceberg-merge-sink.cc@108 PS11, Line 108: AddRow(dele > DispatchRow() just adds the row to the row batch, so maybe simply 'AddRow() Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java File fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java@312 PS11, Line 312: > nit: missing space Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java File fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java@122 PS11, Line 122: tBase.checkTypeCompatibility( : tableRef.g > nit: no need for line break Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@61 PS11, Line 61: row, > row/record? Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@122 PS11, Line 122: ergeActionTuple(analyzer); > Hard to read because of the negations. MergeStmt could have a 'hasMacthedCa Renamed. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@245 PS11, Line 245: return new IcebergBufferedDeleteSink(icebergPositionalDeleteTable_, : > nit: missing newline Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@258 PS11, Line 258: ulat > nit: / Unfortunately, it's required, if it's a simple slash (/), then in terminates the comment block. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java@261 PS11, Line 261: * SELECT /* +straight_join */ : * CAST(TupleIsNull(0) + TupleIsNull(1) * > These are only needed if there's a DELETE sink. It's not a problem to alway Added sub-task: IMPALA-13205 http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java File fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java@66 PS11, Line 66: deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_); : IcebergUtil.validateIcebergTableForInsert(originalTargetTable_); : IcebergUtil.checkUpdateMode(originalTargetTable_); : } : > Similarly to this check, the analyze() function of IcebergMergeImpl.java st Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java File fe/src/main/java/org/apache/impala/analysis/MergeCase.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@31 PS11, Line 31: cases in MERGE statements. Each merge case : * could contain additional filtering predicates, for example: 'WHEN MATCHED AND : * source.id > nit: To help readability, here 'predicate' might be a better word than 'exp Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@36 PS11, Line 36: */ > We usually have a reset() method to reset the members that are modified dur Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@69 PS11, Line 69: public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) { > Don't we need to analyze the resultExprs_? The result expressions are analyzed in the child classes. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@89 PS11, Line 89: super.analyze(analyzer); : for (Expr expr : filterExprs_) { : expr.analyze(analyzer); : if (expr.type_ != Type.BOOLEAN) { : throw new AnalysisException(String.format( : "Filter expression requires return type '%s'. Actual type is '%s'", : > We also have such a method in IcebergModifyImpl. Can we put it to some comm Moved to DmlStatementBase. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeCase.java@115 PS11, Line 115: > StatementBase.clone() throws NotImplementedException. I think it'd be clean I changed it to an abstract method. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java File fe/src/main/java/org/apache/impala/analysis/MergeDelete.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeDelete.java@49 PS11, Line 49: resultExprs_ = Lists.newArrayList(); : Preconditions.checkNotNull(targetTableColumns_); : f > Could you please add a comment why we need all the target table columns, an Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java File fe/src/main/java/org/apache/impala/analysis/MergeInsert.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@125 PS11, Line 125: // Comparing by reference : boolean emptyColumnPermutation = columnPermutation_ == Collections.EMPTY_LIST; > Why do you compare by reference? columnPermutation_.isEmpty() feels safer a At parse time, I pass an empty list to the ctor, which means that the column permutation is not provided: {: RESULT = new MergeInsert(Collections.emptyList(), operands); :}; It's required because the merge insert can provide an empty list (not the object from emptyList()) for column permutations: WHEN NOT MATCHED THEN INSERT () VALUES (...) I think the emptyList() immutable causes less headache than a null, but I can replace it with a null. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@188 PS11, Line 188: > Unnecessary comment: // name It was a todo comment for myself, which mistakenly remained in. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeInsert.java@188 PS11, Line 188: columnNames = new H > It doesn't store the frequency as it is not a map. So how about just 'colum Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java File fe/src/main/java/org/apache/impala/analysis/MergeStmt.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@34 PS11, Line 34: > nit: you could add a space before this Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@36 PS11, Line 36: additional filter expression > nit: "an additional filter expression" / "additional filter expressions" Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@69 PS11, Line 69: ses_ > nit: 4 spaces are enough I think. That will be also consistent with L83 Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@80 PS11, Line 80: ble_ = targetTableRef_.getTable(); > Does it work? (I know it doesn't work for UPDATE e.g.) I followed the analogy from UPDATE; I checked, it is not respected during the calculation of instances. I filed IMPALA-13206. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeStmt.java@101 PS11, Line 101: ((InlineViewRef) sourceTableRe > nit: I think it's good practice to call super methods first, this way we ca Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java File fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/analysis/MergeUpdate.java@98 PS11, Line 98: > Please add comment to this method Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java File fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java@160 PS11, Line 160: > nit: missing space Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java File fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/planner/IcebergMergeNode.java@121 PS11, Line 121: if (getChild(0).cardinality_ == -1) { : cardinality_ = -1; : } else { : cardinality_ = applyConjunctsSelectivity(getChild(0).cardinality_); : Preconditions.checkState(cardinality_ >= 0); : } > If we have both MATCHED and NOT MATCHED claues without any filters we can a Yes, my assumption is this. I think this is the simplest approach that gives "usable" cardinality for this node, it could be refined by counting the different kind of merge cases in, but as this method gives a reliable upper bound, it should be fine initially. http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/Frontend.java File fe/src/main/java/org/apache/impala/service/Frontend.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/Frontend.java@2726 PS11, Line 2726: > nit: missing space Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/Frontend.java@2794 PS11, Line 2794: DELETE, UPDATE > You could mention MERGE in the comment. Done http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java File fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java: http://gerrit.cloudera.org:8080/#/c/21423/11/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java@432 PS11, Line 432: FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf); : : PartitionSpec partSpec = feIcebergTable.getIcebergApiTable().specs().get( : dataFile.specId()); : IcebergPartitionSpec impPartSpec = : feIcebergTable.getPartitionSpec(dataFile.specId()); : Metrics metrics = buildDataFileMetrics(dataFile); : DataFiles.Builder builder = : DataFiles.builder(partSpec) : > Because of the validation checks I think it's safer to always invoke update Done http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test File testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test: http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@219 PS11, Line 219: > My mistake, target.action is a column of the table that is included in the Ack http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@317 PS11, Line 317: > Could you please add tests where SOURCE is an inline view? Done http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-merge.test@418 PS11, Line 418: | | result expressions: target.id, concat(source.`user`, 'string'), target.action, target.event_time > Could you please add tests for tables with partition evolution, e.g. Done http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test File testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test: http://gerrit.cloudera.org:8080/#/c/21423/11/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge.test@11 PS11, Line 11: stored as iceberg tblproperties("format-version"="2"); > You could also modify the partition layout of these tables at some point. Done -- To view, visit http://gerrit.cloudera.org:8080/21423 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I3416a79740eddc446c87f72bf1a85ed3f71af268 Gerrit-Change-Number: 21423 Gerrit-PatchSet: 12 Gerrit-Owner: Peter Rozsa <[email protected]> Gerrit-Reviewer: Daniel Becker <[email protected]> Gerrit-Reviewer: Gabor Kaszab <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Noemi Pap-Takacs <[email protected]> Gerrit-Reviewer: Peter Rozsa <[email protected]> Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]> Gerrit-Comment-Date: Wed, 07 Aug 2024 13:25:11 +0000 Gerrit-HasComments: Yes
