This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 826b113fd719369955079c96f968a3be4d0b9dab
Author: Gabor Kaszab <[email protected]>
AuthorDate: Tue Mar 21 09:51:30 2023 +0100

    IMPALA-11954: Fix for URL encoded partition columns for Iceberg tables
    
    There is a bug when an Iceberg table has a string partition column and
    Impala insert special chars into this column that need to be URL
    encoded. In this case the partition name is URL encoded not to confuse
    the file paths for that partition. E.g. 'b=1/2' value is converted to
    'b=1%2F2'.
    This if fine for path creation, however, for Iceberg tables
    the same URL encoded partition name is saved into catalog as the
    partition name also used for Iceberg column stats. This brings to
    incorrect results when querying the table as the URL encoded values
    are returned in a SELECT * query instead of what the user inserted.
    Additionally, when adding a filter to the query, Iceberg will filter
    out all the rows because it compares the non-encoded values to the URL
    encoded values.
    
    Testing:
      - Added new tests to iceberg-partitioned-insert.test to cover this
        scenario.
      - Re-run the existing test suite.
    
    Change-Id: I67edc3d04738306fed0d4ebc5312f3d8d4f14254
    Reviewed-on: http://gerrit.cloudera.org:8080/19654
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-table-sink.cc                     |  80 +++++++++------
 be/src/exec/hdfs-table-sink.h                      |  14 +++
 be/src/exec/output-partition.h                     |   7 +-
 be/src/runtime/dml-exec-state.cc                   |  16 ++-
 common/fbs/IcebergObjects.fbs                      |   1 +
 .../impala/service/IcebergCatalogOpExecutor.java   |   4 +-
 .../java/org/apache/impala/util/IcebergUtil.java   |  23 +++--
 .../QueryTest/iceberg-partitioned-insert.test      | 111 +++++++++++++++++++++
 8 files changed, 204 insertions(+), 52 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 37e6dcc25..355cb7479 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -508,30 +508,35 @@ string HdfsTableSink::GetPartitionName(int i) {
   }
 }
 
-Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
-    const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
-    OutputPartition* output_partition, bool empty_partition) {
-  // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
-  // etc.
-  stringstream partition_name_ss;
+void HdfsTableSink::ConstructPartitionNames(
+    const TupleRow* row,
+    string* url_encoded_partition_name,
+    vector<string>* raw_partition_names,
+    string* external_partition_name) {
+  DCHECK(url_encoded_partition_name != nullptr);
+  DCHECK(external_partition_name != nullptr);
+  DCHECK(raw_partition_names != nullptr);
+  DCHECK(raw_partition_names->empty());
+
+  stringstream url_encoded_partition_name_ss;
   stringstream external_partition_name_ss;
-  for (int j = 0; j < partition_key_expr_evals_.size(); ++j) {
-    bool is_external_part = HasExternalOutputDir() &&
-        j >= external_output_partition_depth_;
-    if (is_external_part) {
-      external_partition_name_ss << GetPartitionName(j) << "=";
-    }
-    partition_name_ss << GetPartitionName(j) << "=";
-    void* value = partition_key_expr_evals_[j]->GetValue(row);
-    // nullptr partition keys get a special value to be compatible with Hive.
+  for (int i = 0; i < partition_key_expr_evals_.size(); ++i) {
+    stringstream raw_partition_key_value_ss;
+    stringstream encoded_partition_key_value_ss;
+
+    raw_partition_key_value_ss << GetPartitionName(i) << "=";
+    encoded_partition_key_value_ss << GetPartitionName(i) << "=";
+
+    void* value = partition_key_expr_evals_[i]->GetValue(row);
     if (value == nullptr) {
-      partition_name_ss << table_desc_->null_partition_key_value();
-      if (is_external_part) {
-        external_partition_name_ss << table_desc_->null_partition_key_value();
-      }
+      raw_partition_key_value_ss << table_desc_->null_partition_key_value();
+      encoded_partition_key_value_ss << 
table_desc_->null_partition_key_value();
     } else {
       string value_str;
-      partition_key_expr_evals_[j]->PrintValue(value, &value_str);
+      partition_key_expr_evals_[i]->PrintValue(value, &value_str);
+
+      raw_partition_key_value_ss << value_str;
+
       // Directory names containing partition-key values need to be 
UrlEncoded, in
       // particular to avoid problems when '/' is part of the key value (which 
might
       // occur, for example, with date strings). Hive will URL decode the value
@@ -545,23 +550,32 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
       string part_key_value = (encoded_str.empty() ?
                               table_desc_->null_partition_key_value() : 
encoded_str);
       // If the string is empty, map it to nullptr (mimicking Hive's behaviour)
-      partition_name_ss << part_key_value;
-      if (is_external_part) {
-        external_partition_name_ss << part_key_value;
-      }
+      encoded_partition_key_value_ss << part_key_value;
     }
-    if (j < partition_key_expr_evals_.size() - 1) {
-      partition_name_ss << "/";
-      if (is_external_part) {
-        external_partition_name_ss << "/";
-      }
+    if (i < partition_key_expr_evals_.size() - 1) 
encoded_partition_key_value_ss << "/";
+
+    url_encoded_partition_name_ss << encoded_partition_key_value_ss.str();
+    if (HasExternalOutputDir() && i >= external_output_partition_depth_) {
+      external_partition_name_ss << encoded_partition_key_value_ss.str();
     }
+
+    raw_partition_names->push_back(raw_partition_key_value_ss.str());
   }
 
-  // partition_name_ss now holds the unique descriptor for this partition,
-  output_partition->partition_name = partition_name_ss.str();
-  BuildHdfsFileNames(partition_descriptor, output_partition,
-      external_partition_name_ss.str());
+  *url_encoded_partition_name = url_encoded_partition_name_ss.str();
+  *external_partition_name = external_partition_name_ss.str();
+}
+
+Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
+    const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
+    OutputPartition* output_partition, bool empty_partition) {
+  // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
+  // etc.
+  string external_partition_name;
+  ConstructPartitionNames(row, &output_partition->partition_name,
+      &output_partition->raw_partition_names, &external_partition_name);
+
+  BuildHdfsFileNames(partition_descriptor, output_partition, 
external_partition_name);
 
   if (ShouldSkipStaging(state, output_partition)) {
     // We will be writing to the final file if we're skipping staging, so get 
a connection
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 13d44866e..5aa637949 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -140,6 +140,20 @@ class HdfsTableSink : public DataSink {
       const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
       OutputPartition* output_partition, bool empty_partition) 
WARN_UNUSED_RESULT;
 
+  /// Constructs the partition name using 'partition_key_expr_evals_'.
+  /// 'url_encoded_partition_name' is the full partition name in URL encoded 
form. E.g.:
+  /// it's "a=12%2F31%2F11/b=10" if we have 2 partition columns "a" and "b", 
and "a" has
+  /// the value of "12/31/11" and "b" has the value of 10. Since this is URL 
encoded,
+  /// can be used for paths.
+  /// 'raw_partition_name' is a vector of partition key-values in a 
non-encoded format.
+  /// Staying with the above example this would hold ["a=12/31/11", "b=10"].
+  /// 'external_partition_name' is a subset of 'url_encoded_partition_name'.
+  void ConstructPartitionNames(
+      const TupleRow* row,
+      string* url_encoded_partition_name,
+      std::vector<std::string>* raw_partition_names,
+      string* external_partition_name);
+
   /// Add a temporary file to an output partition.  Files are created in a
   /// temporary directory and then moved to the real partition directory by the
   /// coordinator in a finalization step. The temporary file's current location
diff --git a/be/src/exec/output-partition.h b/be/src/exec/output-partition.h
index 821b6a893..38f66645b 100644
--- a/be/src/exec/output-partition.h
+++ b/be/src/exec/output-partition.h
@@ -63,9 +63,14 @@ struct OutputPartition {
   /// Path: tmp_hdfs_dir_name/partition_name/<unique_id_str>
   std::string tmp_hdfs_file_name_prefix;
 
-  /// key1=val1/key2=val2/ etc. Used to identify partitions to the metastore.
+  /// key1=val1/key2=val2/ etc. Used to identify partitions to the metastore. 
Note, the
+  /// value in this member is URL encoded for the sake of e.g. data file name 
creation.
   std::string partition_name;
 
+  /// This is a split of the 'partition_name' variable by '/'. Note, the 
partition keys
+  /// and values in this variable are not URL encoded.
+  std::vector<std::string> raw_partition_names;
+
   /// Connection to hdfs.
   hdfsFS hdfs_connection = nullptr;
 
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 94c0ba5af..3f8c32439 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -500,7 +500,7 @@ createIcebergColumnStats(
 }
 
 string createIcebergDataFileString(
-    const string& partition_name, const string& final_path, int64_t num_rows,
+    const OutputPartition& partition, const string& final_path, int64_t 
num_rows,
     int64_t file_size, const IcebergFileStats& insert_stats) {
   using namespace org::apache::impala::fb;
   flatbuffers::FlatBufferBuilder fbb;
@@ -510,13 +510,19 @@ string createIcebergDataFileString(
     ice_col_stats_vec.push_back(createIcebergColumnStats(fbb, it->first, 
it->second));
   }
 
+  vector<flatbuffers::Offset<flatbuffers::String>> raw_partition_fields;
+  for (const string& partition_name : partition.raw_partition_names) {
+    raw_partition_fields.push_back(fbb.CreateString(partition_name));
+  }
+
   flatbuffers::Offset<FbIcebergDataFile> data_file = 
CreateFbIcebergDataFile(fbb,
       fbb.CreateString(final_path),
       // Currently we can only write Parquet to Iceberg
       FbIcebergDataFileFormat::FbIcebergDataFileFormat_PARQUET,
       num_rows,
       file_size,
-      fbb.CreateString(partition_name),
+      fbb.CreateString(partition.partition_name),
+      fbb.CreateVector(raw_partition_fields),
       fbb.CreateVector(ice_col_stats_vec));
   fbb.Finish(data_file);
   return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), 
fbb.GetSize());
@@ -527,8 +533,8 @@ string createIcebergDataFileString(
 void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool 
is_iceberg,
     const IcebergFileStats& insert_stats) {
   lock_guard<mutex> l(lock_);
-  const string& partition_name = partition.partition_name;
-  PartitionStatusMap::iterator entry = 
per_partition_status_.find(partition_name);
+  PartitionStatusMap::iterator entry =
+      per_partition_status_.find(partition.partition_name);
   DCHECK(entry != per_partition_status_.end());
   DmlFileStatusPb* file = entry->second.add_created_files();
   if (partition.current_file_final_name.empty()) {
@@ -541,7 +547,7 @@ void DmlExecState::AddCreatedFile(const OutputPartition& 
partition, bool is_iceb
   file->set_size(partition.current_file_bytes);
   if (is_iceberg) {
     file->set_iceberg_data_file_fb(
-        createIcebergDataFileString(partition_name, file->final_path(), 
file->num_rows(),
+        createIcebergDataFileString(partition, file->final_path(), 
file->num_rows(),
         file->size(), insert_stats));
   }
 }
diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index db7fac7d1..bec9547e5 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -65,6 +65,7 @@ table FbIcebergDataFile {
   record_count: long = 0;
   file_size_in_bytes: long = 0;
   partition_path: string;
+  raw_partition_fields: [string];
   per_column_stats: [FbIcebergColumnStats];
 }
 
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index a49a5b3bf..03566774c 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -351,9 +351,9 @@ public class IcebergCatalogOpExecutor {
           
.withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format()))
           .withRecordCount(dataFile.recordCount())
           .withFileSizeInBytes(dataFile.fileSizeInBytes());
-      IcebergUtil.PartitionData partitionData = 
IcebergUtil.partitionDataFromPath(
+      IcebergUtil.PartitionData partitionData = 
IcebergUtil.partitionDataFromDataFile(
           partSpec.partitionType(),
-          feIcebergTable.getDefaultPartitionSpec(), dataFile.partitionPath());
+          feIcebergTable.getDefaultPartitionSpec(), dataFile);
       if (partitionData != null) builder.withPartition(partitionData);
       batchWrite.addFile(builder.build());
     }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index c0dc4c3e6..fb5bdf2cd 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -85,6 +85,7 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.fb.FbFileMetadata;
+import org.apache.impala.fb.FbIcebergDataFile;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
 import org.apache.impala.fb.FbIcebergMetadata;
 import org.apache.impala.fb.FbIcebergPartitionTransformValue;
@@ -690,28 +691,28 @@ public class IcebergUtil {
   }
 
   /**
-   * Create a PartitionData object from a partition path and its descriptors.
+   * Create a PartitionData object using partition information from 
FbIcebergDataFile.
    */
-  public static PartitionData partitionDataFromPath(Types.StructType 
partitionType,
-      IcebergPartitionSpec spec, String path) throws ImpalaRuntimeException {
-    if (path == null || path.isEmpty()) return null;
+  public static PartitionData partitionDataFromDataFile(Types.StructType 
partitionType,
+      IcebergPartitionSpec spec, FbIcebergDataFile dataFile)
+      throws ImpalaRuntimeException {
+    if (dataFile == null || dataFile.rawPartitionFieldsLength() == 0) return 
null;
 
     PartitionData data = new 
PartitionData(spec.getIcebergPartitionFieldsSize());
-    String[] partitions = path.split("/", -1);
     int path_i = 0;
     for (int i = 0; i < spec.getIcebergPartitionFieldsSize(); ++i) {
       IcebergPartitionField field = spec.getIcebergPartitionFields().get(i);
-      if (field.getTransformType() == TIcebergPartitionTransformType.VOID) {
-        continue;
-      }
-      String[] parts = partitions[path_i].split("=", 2);
+      if (field.getTransformType() == TIcebergPartitionTransformType.VOID) 
continue;
+
+      Preconditions.checkState(path_i < dataFile.rawPartitionFieldsLength());
+      String[] parts = dataFile.rawPartitionFields(path_i).split("=", 2);
       Preconditions.checkArgument(parts.length == 2 && parts[0] != null &&
           field.getFieldName().equals(parts[0]), "Invalid partition: %s",
-          partitions[path_i]);
+          dataFile.rawPartitionFields(path_i));
       TIcebergPartitionTransformType transformType = field.getTransformType();
       data.set(i, getPartitionValue(
           partitionType.fields().get(i).type(), transformType, parts[1]));
-      path_i += 1;
+      ++path_i;
     }
     return data;
   }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 6f6abc917..faeef6e4f 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -666,3 +666,114 @@ select count(*) from store_sales where ss_sold_date_sk is 
null;
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# Insert into a string partition column some chars that have to be URL encoded 
for the path creation.
+# Check that result strings are not URL encoded.
+create table special_char_partitions (i int, s string, s2 string)
+partitioned by spec (i, s, truncate(4, s))
+stored as iceberg;
+insert into special_char_partitions
+    values (1, '11/14/31', '44/1'), (2, '11"14"31', '43"3'), (3, '11=14=31', 
'65=2'), (4, '', 'a'), (5, cast(null as string), 'b');
+select * from special_char_partitions;
+---- RESULTS
+1,'11/14/31','44/1'
+2,'11"14"31','43"3'
+3,'11=14=31','65=2'
+4,'','a'
+5,'NULL','b'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Check that filtering using special chars work as expected.
+select * from special_char_partitions where s = '11/14/31';
+---- RESULTS
+1,'11/14/31','44/1'
+---- TYPES
+INT,STRING,STRING
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 1
+aggregation(SUM, NumRowGroups): 1
+====
+---- QUERY
+select * from special_char_partitions where s = '';
+---- RESULTS
+4,'','a'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+select * from special_char_partitions where s is NULL;
+---- RESULTS
+5,'NULL','b'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Check that the file path contains URL encoded strings.
+show files in special_char_partitions;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=1/s=11%2F14%2F31/s_trunc=11%2F1/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=2/s=11%2214%2231/s_trunc=11%221/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=3/s=11%3D14%3D31/s_trunc=11%3D1/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=4/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=5/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Check that values in SHOW PARTITIONS are nor URL encoded (but simply 
escaped).
+show partitions special_char_partitions;
+---- RESULTS
+'{"i":"1","s":"11\\/14\\/31","s_trunc":"11\\/1"}',1,1
+'{"i":"2","s":"11\\"14\\"31","s_trunc":"11\\"1"}',1,1
+'{"i":"3","s":"11=14=31","s_trunc":"11=1"}',1,1
+'{"i":"4","s":"","s_trunc":""}',1,1
+'{"i":"5","s":null,"s_trunc":null}',1,1
+---- TYPES
+STRING,BIGINT,BIGINT
+====
+---- QUERY
+# Check special chars in a string partition column after partition evolution.
+alter table special_char_partitions set partition spec (s2);
+insert into special_char_partitions values (6, '11/22/33', '98/22');
+select * from special_char_partitions;
+---- RESULTS
+1,'11/14/31','44/1'
+2,'11"14"31','43"3'
+3,'11=14=31','65=2'
+4,'','a'
+5,'NULL','b'
+6,'11/22/33','98/22'
+---- TYPES
+INT,STRING,STRING
+====
+---- QUERY
+# Check that the new partition column's path contains URL encoded strings.
+show files in special_char_partitions;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=1/s=11%2F14%2F31/s_trunc=11%2F1/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=2/s=11%2214%2231/s_trunc=11%221/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=3/s=11%3D14%3D31/s_trunc=11%3D1/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=4/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/i=5/s=__HIVE_DEFAULT_PARTITION__/s_trunc=__HIVE_DEFAULT_PARTITION__/.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/special_char_partitions/data/s2=98%2F22/.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+show partitions special_char_partitions;
+---- RESULTS
+'{"i":"1","s":"11\\/14\\/31","s_trunc":"11\\/1"}',1,1
+'{"i":"2","s":"11\\"14\\"31","s_trunc":"11\\"1"}',1,1
+'{"i":"3","s":"11=14=31","s_trunc":"11=1"}',1,1
+'{"i":"4","s":"","s_trunc":""}',1,1
+'{"i":"5","s":null,"s_trunc":null}',1,1
+'{"s2":"98\\/22"}',1,1
+---- TYPES
+STRING,BIGINT,BIGINT
+====

Reply via email to