This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4ae7b7bb9658568c521d8ab7be47a6879389b872
Author: Gabor Kaszab <[email protected]>
AuthorDate: Wed Oct 18 16:23:16 2023 +0200

    IMPALA-11387: Introduce virtual column to expose Iceberg's file-level data 
sequence number
    
    Data sequence number is used for deciding whether an equality
    delete file should be applied to a data file or not. New files have
    higher data sequence numbers than older files. So if the data sequence
    number is lower on the data file than on the equality delete file then
    we should apply the delete rows on that data file.
    
    Iceberg has two different sequence numbers on a ContentFile level: file
    and data sequence number.
    For details see the comments on the ContentFile class in Iceberg:
    
https://github.com/apache/iceberg/blob/ebce8538db20fd13859b6af841cf433d9423b53c/api/src/main/java/org/apache/iceberg/ContentFile.java#L130
    
    This patch adds data sequence number as a virtual column for Iceberg
    tables and can be queried like:
    SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM <iceberg_table>;
    
    Testing:
      - Added E2E tests to exercise the new virtual column for V1, V2
        tables both partitioned and unpartitioned cases.
    
    Change-Id: Id950e97782a2a29b505164470cfb646c5358dfca
    Reviewed-on: http://gerrit.cloudera.org:8080/20595
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/file-metadata-utils.cc                 |  34 +++--
 be/src/exec/hdfs-scan-node-base.cc                 |   3 +
 common/fbs/IcebergObjects.fbs                      |   1 +
 common/thrift/CatalogObjects.thrift                |   3 +-
 .../org/apache/impala/catalog/IcebergTable.java    |   1 +
 .../org/apache/impala/catalog/VirtualColumn.java   |   5 +
 .../java/org/apache/impala/util/IcebergUtil.java   |  11 ++
 .../queries/QueryTest/iceberg-negative.test        |   8 +-
 .../queries/QueryTest/iceberg-virtual-columns.test | 160 ++++++++++++++++++++-
 9 files changed, 213 insertions(+), 13 deletions(-)

diff --git a/be/src/exec/file-metadata-utils.cc 
b/be/src/exec/file-metadata-utils.cc
index a510d47f2..7882052a4 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -59,17 +59,31 @@ void FileMetadataUtils::AddFileLevelVirtualColumns(MemPool* 
mem_pool,
   if (template_tuple == nullptr) return;
   for (int i = 0; i < scan_node_->virtual_column_slots().size(); ++i) {
     const SlotDescriptor* slot_desc = scan_node_->virtual_column_slots()[i];
-    if (slot_desc->virtual_column_type() != 
TVirtualColumnType::INPUT_FILE_NAME) {
-      continue;
+    if (slot_desc->virtual_column_type() == 
TVirtualColumnType::INPUT_FILE_NAME) {
+      StringValue* slot = 
template_tuple->GetStringSlot(slot_desc->tuple_offset());
+      const char* filename = file_desc_->filename.c_str();
+      int len = strlen(filename);
+      char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
+      Ubsan::MemCpy(filename_copy, filename, len);
+      slot->ptr = filename_copy;
+      slot->len = len;
+      template_tuple->SetNotNull(slot_desc->null_indicator_offset());
+    } else if (slot_desc->virtual_column_type() ==
+        TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) {
+      using namespace org::apache::impala::fb;
+      const FbIcebergMetadata* ice_metadata =
+          file_desc_->file_metadata->iceberg_metadata();
+      DCHECK(ice_metadata != nullptr);
+
+      int64_t data_seq_num = ice_metadata->data_sequence_number();
+      if (data_seq_num > -1) {
+        int64_t* slot = 
template_tuple->GetBigIntSlot(slot_desc->tuple_offset());
+        *slot = data_seq_num;
+        template_tuple->SetNotNull(slot_desc->null_indicator_offset());
+      } else {
+        template_tuple->SetNull(slot_desc->null_indicator_offset());
+      }
     }
-    StringValue* slot = 
template_tuple->GetStringSlot(slot_desc->tuple_offset());
-    const char* filename = file_desc_->filename.c_str();
-    int len = strlen(filename);
-    char* filename_copy = reinterpret_cast<char*>(mem_pool->Allocate(len));
-    Ubsan::MemCpy(filename_copy, filename, len);
-    slot->ptr = filename_copy;
-    slot->len = len;
-    template_tuple->SetNotNull(slot_desc->null_indicator_offset());
   }
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 517fdfe57..c1ceb5c66 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -1125,6 +1125,9 @@ bool HdfsScanPlanNode::HasVirtualColumnInTemplateTuple() 
const {
     } else if (sd->virtual_column_type() ==
         TVirtualColumnType::ICEBERG_PARTITION_SERIALIZED) {
       return true;
+    } else if (sd->virtual_column_type() ==
+        TVirtualColumnType::ICEBERG_DATA_SEQUENCE_NUMBER) {
+      return true;
     } else {
       // Adding DCHECK here so we don't forget to update this when adding new 
virtual
       // column.
diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index 3a260a73d..e1245b540 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -47,6 +47,7 @@ table FbIcebergPartitionTransformValue {
 table FbIcebergMetadata {
   file_format : FbIcebergDataFileFormat;
   record_count : long;
+  data_sequence_number : long;
   spec_id : ushort;
   partition_keys : [FbIcebergPartitionTransformValue];
 }
diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index 10c7c647f..891d86f28 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -79,7 +79,8 @@ enum TVirtualColumnType {
   INPUT_FILE_NAME,
   FILE_POSITION,
   PARTITION_SPEC_ID,
-  ICEBERG_PARTITION_SERIALIZED
+  ICEBERG_PARTITION_SERIALIZED,
+  ICEBERG_DATA_SEQUENCE_NUMBER
 }
 
 // TODO: Since compression is also enabled for Kudu columns, we should
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index c8e2ee618..076574023 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -488,6 +488,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
     addVirtualColumn(VirtualColumn.FILE_POSITION);
     addVirtualColumn(VirtualColumn.PARTITION_SPEC_ID);
     addVirtualColumn(VirtualColumn.ICEBERG_PARTITION_SERIALIZED);
+    addVirtualColumn(VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java 
b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
index 0d626e2d1..dc3547e02 100644
--- a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
@@ -45,6 +45,10 @@ public class VirtualColumn extends Column {
   public static VirtualColumn ICEBERG_PARTITION_SERIALIZED = new
       VirtualColumn("ICEBERG__PARTITION__SERIALIZED", Type.BINARY,
       TVirtualColumnType.ICEBERG_PARTITION_SERIALIZED);
+  public static VirtualColumn ICEBERG_DATA_SEQUENCE_NUMBER = new VirtualColumn(
+      "ICEBERG__DATA__SEQUENCE__NUMBER",
+      Type.BIGINT,
+      TVirtualColumnType.ICEBERG_DATA_SEQUENCE_NUMBER);
 
   public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) 
{
     switch (virtColType) {
@@ -52,6 +56,7 @@ public class VirtualColumn extends Column {
       case FILE_POSITION: return FILE_POSITION;
       case PARTITION_SPEC_ID: return PARTITION_SPEC_ID;
       case ICEBERG_PARTITION_SERIALIZED: return ICEBERG_PARTITION_SERIALIZED;
+      case ICEBERG_DATA_SEQUENCE_NUMBER: return ICEBERG_DATA_SEQUENCE_NUMBER;
       default: break;
     }
     return null;
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index a1e4729d7..71c200dab 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -942,6 +942,17 @@ public class IcebergUtil {
     if (partKeysOffset != -1) {
       FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset);
     }
+    if (cf.dataSequenceNumber() != null) {
+      FbIcebergMetadata.addDataSequenceNumber(fbb, cf.dataSequenceNumber());
+    } else {
+      // According to comments from the Iceberg code, data sequence numbers 
could be null
+      // when files were written with "older" Iceberg versions. Quote from the 
code
+      // comments of Iceberg's ContentFile.dataSequenceNumber():
+      // "This method can return null if the data sequence number is unknown. 
This may
+      // happen while reading a v2 manifest that did not persist the data 
sequence number
+      // for manifest entries with status DELETED (older Iceberg versions)."
+      FbIcebergMetadata.addDataSequenceNumber(fbb, -1l);
+    }
     return FbIcebergMetadata.endFbIcebergMetadata(fbb);
   }
 
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 7dfc2c287..ef8f7fe30 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -800,4 +800,10 @@ CREATE TABLE ice_complex (id BIGINT NULL, int_array 
ARRAY<INT> NULL) STORED AS I
 optimize table ice_complex;
 ---- CATCH
 AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) 
because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala 
doesn't support inserting into tables containing complex type columns
-====
\ No newline at end of file
+====
+---- QUERY
+# ICEBERG__DATA__SEQUENCE__NUMBER is not supported for non-Iceberg tables.
+SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM functional_parquet.alltypes;
+---- CATCH
+AnalysisException: Could not resolve column/field reference: 
'iceberg__data__sequence__number'
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test
index a892093b5..8e1117189 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-virtual-columns.test
@@ -36,4 +36,162 @@ 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=false/.*.0.p
 
row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_b=true/.*.0.parq',0
 ---- TYPES
 STRING, BIGINT
-====
\ No newline at end of file
+====
+---- QUERY
+# For a V1 Iceberg table the data sequence number is zero.
+select col_i, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl order by col_i;
+---- TYPES
+INT,BIGINT
+---- RESULTS
+0,0
+1,0
+3,0
+5,0
+====
+---- QUERY
+# select virtual colum without selecting any other slots.
+select max(ICEBERG__DATA__SEQUENCE__NUMBER) from ice_tbl;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# Testing data sequence number for unpartitioned V2 tables.
+create table ice_tbl_v2 (col_i int, col_str string)
+stored as iceberg
+tblproperties ('format-version'='2');
+insert into ice_tbl_v2 values (1, "str1"), (2, "str2"), (3, "str3");
+select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
+---- RESULTS
+1,1,'str1'
+1,2,'str2'
+1,3,'str3'
+---- TYPES
+BIGINT,INT,STRING
+====
+---- QUERY
+insert into ice_tbl_v2 values (4, "str4"), (5, "str5");
+select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
+---- RESULTS
+1,1,'str1'
+1,2,'str2'
+1,3,'str3'
+2,4,'str4'
+2,5,'str5'
+---- TYPES
+BIGINT,INT,STRING
+====
+---- QUERY
+delete from ice_tbl_v2 where col_i % 2 = 0;
+select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
+---- RESULTS
+1,1,'str1'
+1,3,'str3'
+2,5,'str5'
+---- TYPES
+BIGINT,INT,STRING
+====
+---- QUERY
+insert into ice_tbl_v2 values (6, "str6"), (7, "str7");
+select ICEBERG__DATA__SEQUENCE__NUMBER, * from ice_tbl_v2;
+---- RESULTS
+1,1,'str1'
+1,3,'str3'
+2,5,'str5'
+4,6,'str6'
+4,7,'str7'
+---- TYPES
+BIGINT,INT,STRING
+====
+---- QUERY
+# Testing data sequence number for partitioned V2 tables.
+create table ice_tbl_v2_part (col_i int, col_str string)
+partitioned by spec (col_str)
+stored as iceberg
+tblproperties ('format-version'='2');
+insert into ice_tbl_v2_part values (1, "part1"), (2, "part1"), (3, "part2");
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+1,'part1',1
+2,'part1',1
+3,'part2',1
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+insert into ice_tbl_v2_part values (4, "part1"), (5, "part2");
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+1,'part1',1
+2,'part1',1
+3,'part2',1
+4,'part1',2
+5,'part2',2
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+# Delete from both partitions
+delete from ice_tbl_v2_part where col_i % 2 = 1;
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+2,'part1',1
+4,'part1',2
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+insert into ice_tbl_v2_part values (6, "part1"), (7, "part2");
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+2,'part1',1
+4,'part1',2
+6,'part1',4
+7,'part2',4
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+# Delete from one partition, insert into the other and check the data sequence 
number
+delete from ice_tbl_v2_part where col_i = 4;
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+2,'part1',1
+6,'part1',4
+7,'part2',4
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+insert into ice_tbl_v2_part values (8, "part2");
+select *, ICEBERG__DATA__SEQUENCE__NUMBER from ice_tbl_v2_part order by col_i;
+---- RESULTS
+2,'part1',1
+6,'part1',4
+7,'part2',4
+8,'part2',6
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+# Order by ICEBERG__DATA__SEQUENCE__NUMBER while it's not in the select list.
+select * from ice_tbl_v2_part order by ICEBERG__DATA__SEQUENCE__NUMBER desc, 
col_i;
+---- RESULTS
+8,'part2'
+6,'part1'
+7,'part2'
+2,'part1'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Test when the sequence number comes from a view and is part of a join 
condition.
+with w as (select ICEBERG__DATA__SEQUENCE__NUMBER as seq from ice_tbl_v2_part)
+select seq, ap.i, ap.p_bigint from w
+join functional_parquet.iceberg_alltypes_part ap on seq = ap.i;
+---- RESULTS
+1,1,11
+---- TYPES
+BIGINT,INT,BIGINT
+====

Reply via email to