Zoltan Borok-Nagy has posted comments on this change. ( http://gerrit.cloudera.org:8080/20753 )
Change subject: IMPALA-12597: Basic Equality delete read support for Iceberg tables ...................................................................... Patch Set 8: (12 comments) Left a few minor comments, but the change looks great overall! http://gerrit.cloudera.org:8080/#/c/20753/8//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20753/8//COMMIT_MSG@14 PS8, Line 14: data sequence number of a delete file has to : be greater than the data sequence number of the data file being : investigated nit: improvement idea for the future: If Flink always writes EQ-delete files, and uses the same primary key a lot, we will have the same entry in the HashMap with multiple data sequence numbers. Then during probing, for each hash table lookup we need to loop over all the sequence numbers and check them. Actually we only need the largest data sequence number, the lower sequence numbers with the same primary keys don't add any value. So we could add an Aggregation node to the right side of the join, like "PK1, PK2, ..., max(data_sequence_number), group by PK1, PK2, ...". Now, we would need to decide when to add this node to the plan, or when we shouldn't. We should also avoid having an EXCHANGE between the aggregation node and the JOIN node, as it would be redundant as they would use the same partition key expressions (the primary keys). If we had "hash teams" in Impala, we could always add this aggregator operator, as it would be in the same "hash team" with the JOIN operator, i.e. we wouldn't need to build the hash table twice. Microsoft's paper about hash joins and hash teams: https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=fc1c78cbef5062cf49fdb309b1935af08b759d2d http://gerrit.cloudera.org:8080/#/c/20753/8//COMMIT_MSG@39 PS8, Line 39: has have http://gerrit.cloudera.org:8080/#/c/20753/8//COMMIT_MSG@45 PS8, Line 45: table nit: tables http://gerrit.cloudera.org:8080/#/c/20753/8/common/thrift/PlanNodes.thrift File common/thrift/PlanNodes.thrift: http://gerrit.cloudera.org:8080/#/c/20753/8/common/thrift/PlanNodes.thrift@401 PS8, Line 401: vallues typo: values http://gerrit.cloudera.org:8080/#/c/20753/8/common/thrift/PlanNodes.thrift@406 PS8, Line 406: exaple typo: example http://gerrit.cloudera.org:8080/#/c/20753/8/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java File fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java: http://gerrit.cloudera.org:8080/#/c/20753/8/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java@504 PS8, Line 504: equalityDeleteFiles_.add(delFileDesc.first); : if (equalityIds_.isEmpty()) { : equalityIds_.addAll(delFile.equalityFieldIds()); : } else if (equalityIds_.size() != delFile.equalityFieldIds().size() || : !equalityIds_.containsAll(delFile.equalityFieldIds())) { : throw new ImpalaRuntimeException(String.format("Equality delete files " + : "with different equality field ID lists aren't supported. %s vs %s", : equalityIds_, delFile.equalityFieldIds())); : } nit: for readability, this could go to its own method. http://gerrit.cloudera.org:8080/#/c/20753/8/testdata/data/README File testdata/data/README: http://gerrit.cloudera.org:8080/#/c/20753/8/testdata/data/README@776 PS8, Line 776: when running a DELETE FROM statement Could we also hack INSERT INTO? In that case Impala would just write the Parquet files for us, but we would register them as equality delete files. During the hack we could also hard-code the equality delete columns. http://gerrit.cloudera.org:8080/#/c/20753/8/testdata/data/README@850 PS8, Line 850: Thank you for adding these detailed steps! http://gerrit.cloudera.org:8080/#/c/20753/8/tests/query_test/test_iceberg.py File tests/query_test/test_iceberg.py: http://gerrit.cloudera.org:8080/#/c/20753/8/tests/query_test/test_iceberg.py@1262 PS8, Line 1262: nit: we usually don't add space here http://gerrit.cloudera.org:8080/#/c/20753/8/tests/query_test/test_iceberg.py@1264 PS8, Line 1264: SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], : "testdata/data/iceberg_test/hadoop_catalog/ice/" : "iceberg_v2_delete_different_equality_ids") : DST_DIR = "/test-warehouse/iceberg_test/hadoop_catalog/ice/" \ : "iceberg_v2_delete_different_equality_ids" : : try: : self.filesystem_client.make_dir(DST_DIR, permission=777) : : self.filesystem_client.copy_from_local(os.path.join(SRC_DIR, "data"), DST_DIR) : self.filesystem_client.copy_from_local(os.path.join(SRC_DIR, "metadata"), DST_DIR) Can we use 'file_utils.create_iceberg_table_from_directory'? With equality deletes, it might even work in non-HDFS environments as well. create_iceberg_table_from_directory might need some enhancements to deal with hadoop.catalog, etc., but probably worth the effort. http://gerrit.cloudera.org:8080/#/c/20753/8/tests/query_test/test_iceberg.py@1278 PS8, Line 1278: external Is there a reason we create an external table? http://gerrit.cloudera.org:8080/#/c/20753/8/tests/query_test/test_iceberg.py@1290 PS8, Line 1290: assert "[1, 2]" in str(err) : assert "[1]" in str(err) What are these lists? -- To view, visit http://gerrit.cloudera.org:8080/20753 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I2053e6f321c69f1c82059a84a5d99aeaa9814cad Gerrit-Change-Number: 20753 Gerrit-PatchSet: 8 Gerrit-Owner: Gabor Kaszab <[email protected]> Gerrit-Reviewer: Andrew Sherman <[email protected]> Gerrit-Reviewer: Daniel Becker <[email protected]> Gerrit-Reviewer: Gabor Kaszab <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Tamas Mate <[email protected]> Gerrit-Reviewer: Zoltan Borok-Nagy <[email protected]> Gerrit-Comment-Date: Fri, 15 Dec 2023 14:45:14 +0000 Gerrit-HasComments: Yes
