This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4ae7b7bb9658568c521d8ab7be47a6879389b872 Author: Gabor Kaszab <[email protected]> AuthorDate: Wed Oct 18 16:23:16 2023 +0200 IMPALA-11387: Introduce virtual column to expose Iceberg's file-level data sequence number Data sequence number is used for deciding whether an equality delete file should be applied to a data file or not. New files have higher data sequence numbers than older files. So if the data sequence number is lower on the data file than on the equality delete file then we should apply the delete rows on that data file. Iceberg has two different sequence numbers on a ContentFile level: file and data sequence number. For details see the comments on the ContentFile class in Iceberg: https://github.com/apache/iceberg/blob/ebce8538db20fd13859b6af841cf433d9423b53c/api/src/main/java/org/apache/iceberg/ContentFile.java#L130 This patch adds data sequence number as a virtual column for Iceberg tables and can be queried like: SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM <iceberg_table>; Testing: - Added E2E tests to exercise the new virtual column for V1, V2 tables both partitioned and unpartitioned cases. Change-Id: Id950e97782a2a29b505164470cfb646c5358dfca Reviewed-on: http://gerrit.cloudera.org:8080/20595 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/file-metadata-utils.cc | 34 +++-- be/src/exec/hdfs-scan-node-base.cc | 3 + common/fbs/IcebergObjects.fbs | 1 + common/thrift/CatalogObjects.thrift | 3 +- .../org/apache/impala/catalog/IcebergTable.java | 1 + .../org/apache/impala/catalog/VirtualColumn.java | 5 + .../java/org/apache/impala/util/IcebergUtil.java | 11 ++ .../queries/QueryTest/iceberg-negative.test | 8 +- .../queries/QueryTest/iceberg-virtual-columns.test | 160 ++++++++++++++++++++- 9 files changed, 213 insertions(+), 13 deletions(-) diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc index a510d47f2..7882052a4 100644 --- a/be/src/exec/file-metadata-utils.cc +++ b/be/src/exec/file-metadata-utils.cc @@ -59,17 +59,31 @@ void FileMetadataUtils::AddFileLevelVirtualColumns(MemPool* mem_pool, if (template_tuple == nullptr) return; for (int i = 0; i < scan_node_->virtual_column_slots().size(); ++i) { const SlotDescriptor* slot_desc = scan_node_->virtual_column_slots()[i]; - if (slot_desc->virtual_column_type() != TVirtualColumnType::INPUT_FILE_NAME) { - continue; + if (slot_desc->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) { + StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset()); + const char* filename = file_desc_->filename.c_str(); + int len = strlen(filename); + char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len)); + Ubsan::MemCpy(filename_copy, filename, len); + slot->ptr = filename_copy; + slot->len = len; + template_tuple->SetNotNull(slot_desc->null_indicator_offset()); + } else if (slot_desc->virtual_column_type() == + TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) { + using namespace org::apache::impala::fb; + const FbIcebergMetadata* ice_metadata = + file_desc_->file_metadata->iceberg_metadata(); + DCHECK(ice_metadata != nullptr); + + int64_t data_seq_num = ice_metadata->data_sequence_number(); + if (data_seq_num > -1) { + int64_t* slot = template_tuple->GetBigIntSlot(slot_desc->tuple_offset()); + *slot = data_seq_num; + template_tuple->SetNotNull(slot_desc->null_indicator_offset()); + } else { + template_tuple->SetNull(slot_desc->null_indicator_offset()); + } } - StringValue* slot = template_tuple->GetStringSlot(slot_desc->tuple_offset()); - const char* filename = file_desc_->filename.c_str(); - int len = strlen(filename); - char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len)); - Ubsan::MemCpy(filename_copy, filename, len); - slot->ptr = filename_copy; - slot->len = len; - template_tuple->SetNotNull(slot_desc->null_indicator_offset()); } } diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 517fdfe57..c1ceb5c66 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -1125,6 +1125,9 @@ bool HdfsScanPlanNode::HasVirtualColumnInTemplateTuple() const { } else if (sd->virtual_column_type() == TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) { return true; + } else if (sd->virtual_column_type() == + TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) { + return true; } else { // Adding DCHECK here so we don't forget to update this when adding new virtual // column. diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs index 3a260a73d..e1245b540 100644 --- a/common/fbs/IcebergObjects.fbs +++ b/common/fbs/IcebergObjects.fbs @@ -47,6 +47,7 @@ table FbIcebergPartitionTransformValue { table FbIcebergMetadata { file_format : FbIcebergDataFileFormat; record_count : long; + data_sequence_number : long; spec_id : ushort; partition_keys : [FbIcebergPartitionTransformValue]; } diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 10c7c647f..891d86f28 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -79,7 +79,8 @@ enum TVirtualColumnType { INPUT_FILE_NAME, FILE_POSITION, PARTITION_SPEC_ID, - ICEBERG_PARTITION_SERIALIZED + ICEBERG_PARTITION_SERIALIZED, + ICEBERG_DATA_SEQUENCE_NUMBER } // TODO: Since compression is also enabled for Kudu columns, we should diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index c8e2ee618..076574023 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -488,6 +488,7 @@ public class IcebergTable extends Table implements FeIcebergTable { addVirtualColumn(VirtualColumn.FILE_POSITION); addVirtualColumn(VirtualColumn.PARTITION_SPEC_ID); addVirtualColumn(VirtualColumn.ICEBERG_PARTITION_SERIALIZED); + addVirtualColumn(VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER); } @Override diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java index 0d626e2d1..dc3547e02 100644 --- a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java @@ -45,6 +45,10 @@ public class VirtualColumn extends Column { public static VirtualColumn ICEBERG_PARTITION_SERIALIZED = new VirtualColumn("ICEBERG__PARTITION__SERIALIZED", Type.BINARY, TVirtualColumnType.ICEBERG_PARTITION_SERIALIZED); + public static VirtualColumn ICEBERG_DATA_SEQUENCE_NUMBER = new VirtualColumn( + "ICEBERG__DATA__SEQUENCE__NUMBER", + Type.BIGINT, + TVirtualColumnType.ICEBERG_DATA_SEQUENCE_NUMBER); public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) { switch (virtColType) { @@ -52,6 +56,7 @@ public class VirtualColumn extends Column { case FILE_POSITION: return FILE_POSITION; case PARTITION_SPEC_ID: return PARTITION_SPEC_ID; case ICEBERG_PARTITION_SERIALIZED: return ICEBERG_PARTITION_SERIALIZED; + case ICEBERG_DATA_SEQUENCE_NUMBER: return ICEBERG_DATA_SEQUENCE_NUMBER; default: break; } return null; diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index a1e4729d7..71c200dab 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -942,6 +942,17 @@ public class IcebergUtil { if (partKeysOffset != -1) { FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset); } + if (cf.dataSequenceNumber() != null) { + FbIcebergMetadata.addDataSequenceNumber(fbb, cf.dataSequenceNumber()); + } else { + // According to comments from the Iceberg code, data sequence numbers could be null + // when files were written with "older" Iceberg versions. Quote from the code + // comments of Iceberg's ContentFile.dataSequenceNumber(): + // "This method can return null if the data sequence number is unknown. This may + // happen while reading a v2 manifest that did not persist the data sequence number + // for manifest entries with status DELETED (older Iceberg versions)." + FbIcebergMetadata.addDataSequenceNumber(fbb, -1l); + } return FbIcebergMetadata.endFbIcebergMetadata(fbb); } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index 7dfc2c287..ef8f7fe30 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -800,4 +800,10 @@ CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS I optimize table ice_complex; ---- CATCH AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns -==== \ No newline at end of file +==== +---- QUERY +# ICEBERG__DATA__SEQUENCE__NUMBER is not supported for non-Iceberg tables. +SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM functional_parquet.alltypes; +---- CATCH +AnalysisException: Could not resolve column/field reference: 'iceberg__data__sequence__number' +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test index a892093b5..8e1117189 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test @@ -36,4 +36,162 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=false/.*.0.p row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=true/.*.0.parq',0 ---- TYPES STRING, BIGINT -==== \ No newline at end of file +==== +---- QUERY +# For a V1 Iceberg table the data sequence number is zero. +select col_i, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl order by col_i; +---- TYPES +INT,BIGINT +---- RESULTS +0,0 +1,0 +3,0 +5,0 +==== +---- QUERY +# select virtual colum without selecting any other slots. +select max(ICEBERG__DATA__SEQUENCE__NUMBER) from ice_tbl; +---- TYPES +BIGINT +---- RESULTS +0 +==== +---- QUERY +# Testing data sequence number for unpartitioned V2 tables. +create table ice_tbl_v2 (col_i int, col_str string) +stored as iceberg +tblproperties ('format-version'='2'); +insert into ice_tbl_v2 values (1, "str1"), (2, "str2"), (3, "str3"); +select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2; +---- RESULTS +1,1,'str1' +1,2,'str2' +1,3,'str3' +---- TYPES +BIGINT,INT,STRING +==== +---- QUERY +insert into ice_tbl_v2 values (4, "str4"), (5, "str5"); +select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2; +---- RESULTS +1,1,'str1' +1,2,'str2' +1,3,'str3' +2,4,'str4' +2,5,'str5' +---- TYPES +BIGINT,INT,STRING +==== +---- QUERY +delete from ice_tbl_v2 where col_i % 2 = 0; +select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2; +---- RESULTS +1,1,'str1' +1,3,'str3' +2,5,'str5' +---- TYPES +BIGINT,INT,STRING +==== +---- QUERY +insert into ice_tbl_v2 values (6, "str6"), (7, "str7"); +select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2; +---- RESULTS +1,1,'str1' +1,3,'str3' +2,5,'str5' +4,6,'str6' +4,7,'str7' +---- TYPES +BIGINT,INT,STRING +==== +---- QUERY +# Testing data sequence number for partitioned V2 tables. +create table ice_tbl_v2_part (col_i int, col_str string) +partitioned by spec (col_str) +stored as iceberg +tblproperties ('format-version'='2'); +insert into ice_tbl_v2_part values (1, "part1"), (2, "part1"), (3, "part2"); +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +1,'part1',1 +2,'part1',1 +3,'part2',1 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +insert into ice_tbl_v2_part values (4, "part1"), (5, "part2"); +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +1,'part1',1 +2,'part1',1 +3,'part2',1 +4,'part1',2 +5,'part2',2 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +# Delete from both partitions +delete from ice_tbl_v2_part where col_i % 2 = 1; +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +2,'part1',1 +4,'part1',2 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +insert into ice_tbl_v2_part values (6, "part1"), (7, "part2"); +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +2,'part1',1 +4,'part1',2 +6,'part1',4 +7,'part2',4 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +# Delete from one partition, insert into the other and check the data sequence number +delete from ice_tbl_v2_part where col_i = 4; +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +2,'part1',1 +6,'part1',4 +7,'part2',4 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +insert into ice_tbl_v2_part values (8, "part2"); +select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i; +---- RESULTS +2,'part1',1 +6,'part1',4 +7,'part2',4 +8,'part2',6 +---- TYPES +INT,STRING,BIGINT +==== +---- QUERY +# Order by ICEBERG__DATA__SEQUENCE__NUMBER while it's not in the select list. +select * from ice_tbl_v2_part order by ICEBERG__DATA__SEQUENCE__NUMBER desc, col_i; +---- RESULTS +8,'part2' +6,'part1' +7,'part2' +2,'part1' +---- TYPES +INT,STRING +==== +---- QUERY +# Test when the sequence number comes from a view and is part of a join condition. +with w as (select ICEBERG__DATA__SEQUENCE__NUMBER as seq from ice_tbl_v2_part) +select seq, ap.i, ap.p_bigint from w +join functional_parquet.iceberg_alltypes_part ap on seq = ap.i; +---- RESULTS +1,1,11 +---- TYPES +BIGINT,INT,BIGINT +====
