This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 63f52807f0641ced7560dac2f616f352e7b5a86c
Author: Daniel Becker <[email protected]>
AuthorDate: Tue Mar 5 16:27:27 2024 +0100

    IMPALA-12611: Add support to MAP type Iceberg Metadata table columns
    
    This change adds support for querying MAP types from Iceberg Metadata
    tables.
    
    The 'IcebergMetadataScanner.ArrayScanner' java class is renamed to
    'CollectionScanner' and extended to be able to handle maps. For arrays
    the iteration returns the element as before, for maps it returns
    'Map.Entry' objects.
    
    Note that collections in the FROM clause are still not supported.
    
    Testing:
    - Added E2E tests in iceberg-metadata-tables.test.
    
    Change-Id: I8a8b3a574ca45c893315c3b41b33ce4e0eff865a
    Reviewed-on: http://gerrit.cloudera.org:8080/21125
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../iceberg-metadata/iceberg-metadata-scanner.cc   |  96 +++++++---
 .../iceberg-metadata/iceberg-metadata-scanner.h    |  68 ++++---
 be/src/exec/iceberg-metadata/iceberg-row-reader.cc | 197 +++++++++++++++------
 be/src/exec/iceberg-metadata/iceberg-row-reader.h  |  46 +++--
 .../apache/impala/util/IcebergMetadataScanner.java |  29 ++-
 .../queries/QueryTest/iceberg-metadata-tables.test |  90 +++++++++-
 6 files changed, 401 insertions(+), 125 deletions(-)

diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc 
b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
index 3443eca67..514335563 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
@@ -36,14 +36,14 @@ Status IcebergMetadataScanner::InitJNI() {
       "org/apache/impala/util/IcebergMetadataScanner",
       &iceberg_metadata_scanner_cl_));
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
-      "org/apache/impala/util/IcebergMetadataScanner$ArrayScanner",
-      &iceberg_metadata_scanner_array_scanner_cl_));
+      "org/apache/impala/util/IcebergMetadataScanner$CollectionScanner",
+      &iceberg_metadata_scanner_collection_scanner_cl_));
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
-      "org/apache/iceberg/Accessor", &accessor_cl_));
+      "java/util/Map$Entry", &map_entry_cl_));
+  RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", 
&list_cl_));
+  RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/Map", &map_cl_));
 
   // Method ids:
-  RETURN_IF_ERROR(JniUtil::GetMethodID(env, accessor_cl_, "get",
-      "(Ljava/lang/Object;)Ljava/lang/Object;", &accessor_get_));
   RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
       "<init>", 
"(Lorg/apache/impala/catalog/FeIcebergTable;Ljava/lang/String;)V",
       &iceberg_metadata_scanner_ctor_));
@@ -59,14 +59,24 @@ Status IcebergMetadataScanner::InitJNI() {
       "GetValueByPosition",
       "(Lorg/apache/iceberg/StructLike;ILjava/lang/Class;)Ljava/lang/Object;",
       &iceberg_metadata_scanner_get_value_by_position_));
+  RETURN_IF_ERROR(JniUtil::GetStaticMethodID(env,
+      iceberg_metadata_scanner_collection_scanner_cl_, "fromArray",
+      "(Ljava/util/List;)Lorg/apache/impala/util/"
+      "IcebergMetadataScanner$CollectionScanner;",
+      &iceberg_metadata_scanner_collection_scanner_from_array_));
+  RETURN_IF_ERROR(JniUtil::GetStaticMethodID(env,
+      iceberg_metadata_scanner_collection_scanner_cl_, "fromMap",
+      "(Ljava/util/Map;)Lorg/apache/impala/util/"
+      "IcebergMetadataScanner$CollectionScanner;",
+      &iceberg_metadata_scanner_collection_scanner_from_map_));
   RETURN_IF_ERROR(JniUtil::GetMethodID(env,
-      iceberg_metadata_scanner_array_scanner_cl_, "<init>",
-      "(Lorg/apache/impala/util/IcebergMetadataScanner;Ljava/util/List;)V",
-      &iceberg_metadata_scanner_array_scanner_ctor_));
-  RETURN_IF_ERROR(JniUtil::GetMethodID(env,
-      iceberg_metadata_scanner_array_scanner_cl_,
-      "GetNextArrayItem", "()Ljava/lang/Object;",
-      &iceberg_metadata_scanner_array_scanner_get_next_array_item_));
+      iceberg_metadata_scanner_collection_scanner_cl_,
+      "GetNextCollectionItem", "()Ljava/lang/Object;",
+      &iceberg_metadata_scanner_collection_scanner_get_next_collection_item_));
+  RETURN_IF_ERROR(JniUtil::GetMethodID(env, map_entry_cl_, "getKey",
+      "()Ljava/lang/Object;", &map_entry_get_key_));
+  RETURN_IF_ERROR(JniUtil::GetMethodID(env, map_entry_cl_, "getValue",
+      "()Ljava/lang/Object;", &map_entry_get_value_));
   return Status::OK();
 }
 
@@ -136,14 +146,35 @@ Status IcebergMetadataScanner::GetNext(JNIEnv* env, 
jobject* result) {
   return Status::OK();
 }
 
-Status IcebergMetadataScanner::GetNextArrayItem(JNIEnv* env, const jobject 
&scanner,
+Status IcebergMetadataScanner::GetNextArrayItem(JNIEnv* env, const jobject& 
scanner,
     jobject* result) {
+  return GetNextCollectionScannerItem(env, scanner, result);
+}
+
+Status IcebergMetadataScanner::GetNextMapKeyAndValue(JNIEnv* env, const 
jobject& scanner,
+    jobject* key, jobject* value) {
+  jobject map_entry;
+  RETURN_IF_ERROR(GetNextCollectionScannerItem(env, scanner, &map_entry));
+  DCHECK(env->IsInstanceOf(map_entry, map_entry_cl_) == JNI_TRUE);
+
+  *key = env->CallObjectMethod(map_entry, map_entry_get_key_);
+  RETURN_ERROR_IF_EXC(env);
+
+  *value = env->CallObjectMethod(map_entry, map_entry_get_value_);
+  RETURN_ERROR_IF_EXC(env);
+  env->DeleteLocalRef(map_entry);
+  return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetNextCollectionScannerItem(JNIEnv* env,
+    const jobject& scanner, jobject* result) {
   *result = env->CallObjectMethod(scanner,
-      iceberg_metadata_scanner_array_scanner_get_next_array_item_);
+      iceberg_metadata_scanner_collection_scanner_get_next_collection_item_);
   RETURN_ERROR_IF_EXC(env);
   return Status::OK();
 }
 
+
 Status IcebergMetadataScanner::GetValue(JNIEnv* env, const SlotDescriptor* 
slot_desc,
     const jobject &struct_like_row, const jclass& clazz, jobject* result) {
   DCHECK(slot_desc != nullptr);
@@ -152,14 +183,13 @@ Status IcebergMetadataScanner::GetValue(JNIEnv* env, 
const SlotDescriptor* slot_
     // Use accessor when it is available, these are top level primitive types, 
top level
     // structs and structs inside structs.
     RETURN_IF_ERROR(GetValueByFieldId(env, struct_like_row, 
field_id_it->second, result));
-  } else if (slot_desc->parent()->isTupleOfStructSlot()) {
+  } else {
     // Accessor is not available, this must be a STRUCT inside an ARRAY.
+    DCHECK(slot_desc->parent()->isTupleOfStructSlot());
     int pos = slot_desc->col_path().back();
     RETURN_IF_ERROR(GetValueByPosition(env, struct_like_row, pos, clazz, 
result));
-  } else {
-    // Primitive inside an ARRAY, the value can be accessed directly.
-    *result = struct_like_row;
   }
+
   return Status::OK();
 }
 
@@ -180,9 +210,31 @@ Status IcebergMetadataScanner::GetValueByPosition(JNIEnv* 
env, const jobject &st
 }
 
 Status IcebergMetadataScanner::CreateArrayScanner(JNIEnv* env, const jobject 
&list,
-    jobject& result) {
-  result = env->NewObject(iceberg_metadata_scanner_array_scanner_cl_,
-      iceberg_metadata_scanner_array_scanner_ctor_, jmetadata_scanner_, list);
+    jobject* result) {
+  return CreateArrayOrMapScanner</*IS_ARRAY*/ true>(env, list, result);
+}
+
+Status IcebergMetadataScanner::CreateMapScanner(JNIEnv* env, const jobject 
&map,
+    jobject* result) {
+  return CreateArrayOrMapScanner</*IS_ARRAY*/ false>(env, map, result);
+}
+
+template <bool IS_ARRAY>
+Status IcebergMetadataScanner::CreateArrayOrMapScanner(JNIEnv* env,
+    const jobject &list_or_map, jobject* result) {
+  DCHECK(result != nullptr);
+
+  jmethodID* factory_method;
+  if constexpr (IS_ARRAY) {
+    DCHECK(env->IsInstanceOf(list_or_map, list_cl_) == JNI_TRUE);
+    factory_method = &iceberg_metadata_scanner_collection_scanner_from_array_;
+  } else {
+    DCHECK(env->IsInstanceOf(list_or_map, map_cl_) == JNI_TRUE);
+    factory_method = &iceberg_metadata_scanner_collection_scanner_from_map_;
+  }
+
+  *result = 
env->CallStaticObjectMethod(iceberg_metadata_scanner_collection_scanner_cl_,
+      *factory_method, list_or_map);
   RETURN_ERROR_IF_EXC(env);
   return Status::OK();
 }
@@ -210,4 +262,4 @@ string IcebergMetadataScanner::DebugString() {
   return out.str();
 }
 
-}
\ No newline at end of file
+}
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h 
b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
index 034cee14e..e04465083 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
@@ -41,42 +41,65 @@ class IcebergMetadataScanner {
   Status Init(JNIEnv* env) WARN_UNUSED_RESULT;
 
   /// Executes an Iceberg scan through JNI.
-  Status ScanMetadataTable(JNIEnv* env);
+  Status ScanMetadataTable(JNIEnv* env) WARN_UNUSED_RESULT;
 
   /// Gets the next row of 'org.apache.impala.util.IcebergMetadataScanner'.
-  Status GetNext(JNIEnv* env, jobject* result);
+  Status GetNext(JNIEnv* env, jobject* result) WARN_UNUSED_RESULT;
 
   /// Wrapper over value access methods, decides whether to access the value 
by accessor
   /// or by position.
   Status GetValue(JNIEnv* env, const SlotDescriptor* slot_desc,
-      const jobject &struct_like_row, const jclass &clazz, jobject* result);
-
-  /// Creates a Java ArrayScanner object that can be used to access Array 
items.
-  /// Note that it returns a GlobalRef, that has to be released explicitly.
-  Status CreateArrayScanner(JNIEnv* env, const jobject &list, jobject& result);
-
-  /// Gets the next item of 
'org.apache.impala.util.IcebergMetadataScanner.ArrayScanner'.
-  Status GetNextArrayItem(JNIEnv* env, const jobject &list, jobject* result);
+      const jobject &struct_like_row, const jclass &clazz, jobject* result)
+      WARN_UNUSED_RESULT;
+
+  /// Creates a Java CollectionScanner object that can be used to access array 
items
+  /// (elements of a Java List).
+  /// Note that it returns a LocalRef, that can only be used in the current 
thread.
+  Status CreateArrayScanner(JNIEnv* env, const jobject &list, jobject* result)
+      WARN_UNUSED_RESULT;
+
+  /// Creates a Java CollectionScanner object that can be used to access map 
entries
+  /// (Map.Entry objects).
+  /// Note that it returns a LocalRef, that can only be used in the current 
thread.
+  Status CreateMapScanner(JNIEnv* env, const jobject &map, jobject* result)
+      WARN_UNUSED_RESULT;
+
+  /// Gets the next array item in 'result' from the given
+  /// 'org.apache.impala.util.IcebergMetadataScanner.CollectionScanner'.
+  Status GetNextArrayItem(JNIEnv* env, const jobject& list, jobject* result)
+      WARN_UNUSED_RESULT;
+
+  /// Gets the next map key and value in 'key' and 'value' from the given
+  /// 'org.apache.impala.util.IcebergMetadataScanner.CollectionScanner'.
+  Status GetNextMapKeyAndValue(JNIEnv* env, const jobject& scanner,
+      jobject* key, jobject* value) WARN_UNUSED_RESULT;
 
   /// Removes global references.
   void Close(RuntimeState* state);
 
  private:
   /// Global class references created with JniUtil.
-  inline static jclass accessor_cl_ = nullptr;
   inline static jclass iceberg_metadata_scanner_cl_ = nullptr;
-  inline static jclass iceberg_metadata_scanner_array_scanner_cl_ = nullptr;
+  inline static jclass iceberg_metadata_scanner_collection_scanner_cl_ = 
nullptr;
+  inline static jclass list_cl_ = nullptr;
+  inline static jclass map_cl_ = nullptr;
+  inline static jclass map_entry_cl_ = nullptr;
 
   /// Method references created with JniUtil.
-  inline static jmethodID accessor_get_ = nullptr;
   inline static jmethodID iceberg_metadata_scanner_ctor_ = nullptr;
   inline static jmethodID iceberg_metadata_scanner_scan_metadata_table_ = 
nullptr;
   inline static jmethodID iceberg_metadata_scanner_get_next_ = nullptr;
   inline static jmethodID iceberg_metadata_scanner_get_value_by_field_id_ = 
nullptr;
   inline static jmethodID iceberg_metadata_scanner_get_value_by_position_ = 
nullptr;
-  inline static jmethodID iceberg_metadata_scanner_array_scanner_ctor_ = 
nullptr;
-  inline static jmethodID 
iceberg_metadata_scanner_array_scanner_get_next_array_item_ =
-      nullptr;
+  inline static jmethodID
+      iceberg_metadata_scanner_collection_scanner_from_array_ = nullptr;
+  inline static jmethodID
+      iceberg_metadata_scanner_collection_scanner_from_map_ = nullptr;
+  inline static jmethodID
+      iceberg_metadata_scanner_collection_scanner_get_next_collection_item_ = 
nullptr;
+
+  inline static jmethodID map_entry_get_key_ = nullptr;
+  inline static jmethodID map_entry_get_value_ = nullptr;
 
   /// The Impala FeTable object in Java, used to scan the metadata table.
   jobject jtable_;
@@ -102,7 +125,7 @@ class IcebergMetadataScanner {
   /// fields, in this case the InitSlotIdFieldIdMapForStruct can be used.
   /// Collection types cannot be accessed through accessors, so those field 
ids won't be
   /// part of this map.
-  Status InitSlotIdFieldIdMap(JNIEnv* env);
+  Status InitSlotIdFieldIdMap(JNIEnv* env) WARN_UNUSED_RESULT;
 
   /// Recursive part of the slot_id_to_field_id_map_ collection, when there is 
a struct in
   /// the tuple. Collects the field ids of the struct members. The type_ field 
inside the
@@ -110,13 +133,18 @@ class IcebergMetadataScanner {
   /// be indexed with the last element of SchemaPath col_path to obtain the 
correct field
   /// id of the struct member.
   Status InitSlotIdFieldIdMapForStruct(JNIEnv* env,
-      const SlotDescriptor* struct_slot_desc);
+      const SlotDescriptor* struct_slot_desc) WARN_UNUSED_RESULT;
 
   /// Wrappers around the Java methods.
+  Status GetNextCollectionScannerItem(JNIEnv* env, const jobject& scanner,
+      jobject* result) WARN_UNUSED_RESULT;
   Status GetValueByFieldId(JNIEnv* env, const jobject &struct_like, int 
field_id,
-      jobject* result);
+      jobject* result) WARN_UNUSED_RESULT;
   Status GetValueByPosition(JNIEnv* env, const jobject &struct_like, int pos,
-      const jclass &clazz, jobject* result);
+      const jclass &clazz, jobject* result) WARN_UNUSED_RESULT;
+  template <bool IS_ARRAY>
+  Status CreateArrayOrMapScanner(JNIEnv* env, const jobject &list_or_map, 
jobject* result)
+      WARN_UNUSED_RESULT;
 
   string DebugString();
 };
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc 
b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
index 5c615cad0..be6e0df9f 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
@@ -39,6 +39,7 @@ Status IcebergRowReader::InitJNI() {
 
   // Global class references:
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", 
&list_cl_));
+  RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/Map", &map_cl_));
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", 
&boolean_cl_));
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", 
&integer_cl_));
   RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", 
&long_cl_));
@@ -46,9 +47,8 @@ Status IcebergRowReader::InitJNI() {
       &char_sequence_cl_));
 
   // Method ids:
-  RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "get", 
"(I)Ljava/lang/Object;",
-      &list_get_));
   RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "size", "()I", 
&list_size_));
+  RETURN_IF_ERROR(JniUtil::GetMethodID(env, map_cl_, "size", "()I", 
&map_size_));
   RETURN_IF_ERROR(JniUtil::GetMethodID(env, boolean_cl_, "booleanValue", "()Z",
       &boolean_value_));
   RETURN_IF_ERROR(JniUtil::GetMethodID(env, integer_cl_, "intValue", "()I",
@@ -61,7 +61,7 @@ Status IcebergRowReader::InitJNI() {
 }
 
 Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
-    jobject struct_like_row, const TupleDescriptor* tuple_desc, Tuple* tuple,
+    const jobject& struct_like_row, const TupleDescriptor* tuple_desc, Tuple* 
tuple,
     MemPool* tuple_data_pool, RuntimeState* state) {
   DCHECK(env != nullptr);
   DCHECK(struct_like_row != nullptr);
@@ -73,47 +73,62 @@ Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
     jobject accessed_value;
     RETURN_IF_ERROR(metadata_scanner_->GetValue(env, slot_desc, 
struct_like_row,
         JavaClassFromImpalaType(slot_desc->type()), &accessed_value));
-    if (accessed_value == nullptr) {
-      tuple->SetNull(slot_desc->null_indicator_offset());
-      continue;
-    }
-    void* slot = tuple->GetSlot(slot_desc->tuple_offset());
-    switch (slot_desc->type().type) {
-      case TYPE_BOOLEAN: { // java.lang.Boolean
-        RETURN_IF_ERROR(WriteBooleanSlot(env, accessed_value, slot));
-        break;
-      } case TYPE_INT: { // java.lang.Integer
-        RETURN_IF_ERROR(WriteIntSlot(env, accessed_value, slot));
-        break;
-      } case TYPE_BIGINT: { // java.lang.Long
-        RETURN_IF_ERROR(WriteLongSlot(env, accessed_value, slot));
-        break;
-      } case TYPE_TIMESTAMP: { // org.apache.iceberg.types.TimestampType
-        RETURN_IF_ERROR(WriteTimeStampSlot(env, accessed_value, slot));
-        break;
-      } case TYPE_STRING: { // java.lang.String
-        RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, 
tuple_data_pool));
-        break;
-      } case TYPE_STRUCT: { // Struct type is not used by Impala to access 
values.
-        RETURN_IF_ERROR(WriteStructSlot(env, struct_like_row, slot_desc, tuple,
-            tuple_data_pool, state));
-        break;
-      } case TYPE_ARRAY: { // java.lang.ArrayList
-        RETURN_IF_ERROR(WriteArraySlot(env, accessed_value, 
(CollectionValue*)slot,
-            slot_desc, tuple, tuple_data_pool, state));
-        break;
-      }
-      default:
-        // Skip the unsupported type and set it to NULL
-        tuple->SetNull(slot_desc->null_indicator_offset());
-        VLOG(3) << "Skipping unsupported column type: " << 
slot_desc->type().type;
-    }
+    RETURN_IF_ERROR(WriteSlot(env, &struct_like_row, accessed_value, 
slot_desc, tuple,
+          tuple_data_pool, state));
     env->DeleteLocalRef(accessed_value);
     RETURN_ERROR_IF_EXC(env);
   }
   return Status::OK();
 }
 
+Status IcebergRowReader::WriteSlot(JNIEnv* env, const jobject* struct_like_row,
+    const jobject& accessed_value, const SlotDescriptor* slot_desc, Tuple* 
tuple,
+    MemPool* tuple_data_pool, RuntimeState* state) {
+  if (accessed_value == nullptr) {
+    tuple->SetNull(slot_desc->null_indicator_offset());
+    return Status::OK();
+  }
+  void* slot = tuple->GetSlot(slot_desc->tuple_offset());
+  switch (slot_desc->type().type) {
+    case TYPE_BOOLEAN: { // java.lang.Boolean
+      RETURN_IF_ERROR(WriteBooleanSlot(env, accessed_value, slot));
+      break;
+    } case TYPE_INT: { // java.lang.Integer
+      RETURN_IF_ERROR(WriteIntSlot(env, accessed_value, slot));
+      break;
+    } case TYPE_BIGINT: { // java.lang.Long
+      RETURN_IF_ERROR(WriteLongSlot(env, accessed_value, slot));
+      break;
+    } case TYPE_TIMESTAMP: { // org.apache.iceberg.types.TimestampType
+      RETURN_IF_ERROR(WriteTimeStampSlot(env, accessed_value, slot));
+      break;
+    } case TYPE_STRING: { // java.lang.String
+      RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, 
tuple_data_pool));
+      break;
+    } case TYPE_STRUCT: { // Struct type is not used by Impala to access 
values.
+      DCHECK(struct_like_row != nullptr);
+      RETURN_IF_ERROR(WriteStructSlot(env, *struct_like_row, slot_desc, tuple,
+          tuple_data_pool, state));
+      break;
+    } case TYPE_ARRAY: { // java.lang.ArrayList
+      RETURN_IF_ERROR(WriteCollectionSlot</*IS_ARRAY*/ true>(env, 
accessed_value,
+          (CollectionValue*) slot, slot_desc, tuple_data_pool, state));
+      break;
+    } case TYPE_MAP: { // java.lang.Map
+      RETURN_IF_ERROR(WriteCollectionSlot</*IS_ARRAY*/ false>(env, 
accessed_value,
+          (CollectionValue*) slot, slot_desc, tuple_data_pool, state));
+      break;
+    }
+    default:
+      // Skip the unsupported type and set it to NULL
+      tuple->SetNull(slot_desc->null_indicator_offset());
+      VLOG(3) << "Skipping unsupported column type: " << 
slot_desc->type().type;
+  }
+  return Status::OK();
+}
+
+
+
 Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, const jobject 
&accessed_value,
     void* slot) {
   DCHECK(accessed_value != nullptr);
@@ -188,50 +203,116 @@ Status IcebergRowReader::WriteStructSlot(JNIEnv* env, 
const jobject &struct_like
   return Status::OK();
 }
 
-Status IcebergRowReader::WriteArraySlot(JNIEnv* env, const jobject 
&struct_like_row,
-    CollectionValue* slot, const SlotDescriptor* slot_desc, Tuple* tuple,
+template <bool IS_ARRAY>
+Status IcebergRowReader::WriteCollectionSlot(JNIEnv* env, const jobject 
&struct_like_row,
+    CollectionValue* slot, const SlotDescriptor* slot_desc,
     MemPool* tuple_data_pool, RuntimeState* state) {
   DCHECK(slot_desc != nullptr);
-  DCHECK(slot_desc->type().IsCollectionType());
-  DCHECK(env->IsInstanceOf(struct_like_row, list_cl_) == JNI_TRUE);
+
+  if constexpr (IS_ARRAY) {
+    DCHECK(slot_desc->type().IsArrayType());
+    DCHECK(env->IsInstanceOf(struct_like_row, list_cl_) == JNI_TRUE);
+  } else {
+    DCHECK(slot_desc->type().IsMapType());
+    DCHECK(env->IsInstanceOf(struct_like_row, map_cl_) == JNI_TRUE);
+  }
+
   const TupleDescriptor* item_tuple_desc = 
slot_desc->children_tuple_descriptor();
+  DCHECK(item_tuple_desc != nullptr);
+  DCHECK_EQ(item_tuple_desc->slots().size(), IS_ARRAY ? 1 : 2);
+
   *slot = CollectionValue();
   CollectionValueBuilder coll_value_builder(slot, *item_tuple_desc, 
tuple_data_pool,
       state);
-  jobject array_scanner;
-  RETURN_IF_ERROR(metadata_scanner_->CreateArrayScanner(env, struct_like_row,
-      array_scanner));
-  int remaining_array_size = env->CallIntMethod(struct_like_row, list_size_);
+
+  jobject collection_scanner;
+  if constexpr (IS_ARRAY) {
+    RETURN_IF_ERROR(metadata_scanner_->CreateArrayScanner(env, struct_like_row,
+        &collection_scanner));
+  } else {
+    RETURN_IF_ERROR(metadata_scanner_->CreateMapScanner(env, struct_like_row,
+        &collection_scanner));
+  }
+
+  int remaining_items = env->CallIntMethod(struct_like_row,
+      IS_ARRAY ? list_size_ : map_size_);
   RETURN_ERROR_IF_EXC(env);
-  while (!scan_node_->ReachedLimit() && remaining_array_size > 0) {
+
+  while (!scan_node_->ReachedLimit() && remaining_items > 0) {
     RETURN_IF_CANCELLED(state);
-    int num_tuples;
     MemPool* tuple_data_pool_collection = coll_value_builder.pool();
     Tuple* tuple;
+    int num_tuples;
     RETURN_IF_ERROR(coll_value_builder.GetFreeMemory(&tuple, &num_tuples));
     // 'num_tuples' can be very high if we're writing to a large 
CollectionValue. Limit
     // the number of tuples we read at one time so we don't spend too long in 
the
     // 'num_tuples' loop below before checking for cancellation or limit 
reached.
     num_tuples = std::min(num_tuples, 
scan_node_->runtime_state()->batch_size());
     int num_to_commit = 0;
-    while (num_to_commit < num_tuples && remaining_array_size > 0) {
+    while (num_to_commit < num_tuples && remaining_items > 0) {
       tuple->Init(item_tuple_desc->byte_size());
-      jobject item;
-      RETURN_IF_ERROR(metadata_scanner_->GetNextArrayItem(env, array_scanner, 
&item));
-      RETURN_IF_ERROR(MaterializeTuple(env, item, item_tuple_desc, tuple,
-          tuple_data_pool_collection, state));
+
+      if constexpr (IS_ARRAY) {
+        RETURN_IF_ERROR(WriteArrayItem(env, collection_scanner, 
item_tuple_desc, tuple,
+            tuple_data_pool_collection, state));
+      } else {
+        RETURN_IF_ERROR(WriteMapKeyAndValue(env, collection_scanner, 
item_tuple_desc,
+            tuple, tuple_data_pool_collection, state));
+      }
+
       // For filtering please see IMPALA-12853.
       tuple += item_tuple_desc->byte_size();
       ++num_to_commit;
-      --remaining_array_size;
+      --remaining_items;
     }
     coll_value_builder.CommitTuples(num_to_commit);
   }
-  env->DeleteLocalRef(array_scanner);
+  env->DeleteLocalRef(collection_scanner);
   RETURN_ERROR_IF_EXC(env);
   return Status::OK();
 }
 
+Status IcebergRowReader::WriteArrayItem(JNIEnv* env, const jobject& 
array_scanner,
+    const TupleDescriptor* item_tuple_desc, Tuple* tuple,
+    MemPool* tuple_data_pool_collection, RuntimeState* state) {
+  jobject item;
+  RETURN_IF_ERROR(metadata_scanner_->GetNextArrayItem(env, array_scanner, 
&item));
+
+  const SlotDescriptor* child_slot_desc = item_tuple_desc->slots()[0];
+  const jobject* struct_like_row = child_slot_desc->type().IsStructType()
+    ? &item : nullptr;
+
+  RETURN_IF_ERROR(WriteSlot(env, struct_like_row, item, child_slot_desc, tuple,
+      tuple_data_pool_collection, state));
+
+  env->DeleteLocalRef(item);
+  RETURN_ERROR_IF_EXC(env);
+  return Status::OK();
+}
+
+Status IcebergRowReader::WriteMapKeyAndValue(JNIEnv* env, const jobject& 
map_scanner,
+    const TupleDescriptor* item_tuple_desc, Tuple* tuple,
+    MemPool* tuple_data_pool_collection, RuntimeState* state) {
+  jobject key;
+  jobject value;
+  RETURN_IF_ERROR(metadata_scanner_->GetNextMapKeyAndValue(env, map_scanner,
+      &key, &value));
+
+  const SlotDescriptor* key_slot_desc = item_tuple_desc->slots()[0];
+  DCHECK(!key_slot_desc->type().IsStructType());
+  const jobject* key_struct_like_row = nullptr;
+  RETURN_IF_ERROR(WriteSlot(env, key_struct_like_row, key, key_slot_desc, 
tuple,
+        tuple_data_pool_collection, state));
+
+  const SlotDescriptor* value_slot_desc = item_tuple_desc->slots()[1];
+  const jobject* value_struct_like_row = value_slot_desc->type().IsStructType()
+    ? &value : nullptr;
+  RETURN_IF_ERROR(WriteSlot(env, value_struct_like_row, value, value_slot_desc,
+        tuple, tuple_data_pool_collection, state));
+
+  return Status::OK();
+}
+
 jclass IcebergRowReader::JavaClassFromImpalaType(const ColumnType type) {
   switch (type.type) {
     case TYPE_BOOLEAN: {     // java.lang.Boolean
@@ -243,8 +324,10 @@ jclass IcebergRowReader::JavaClassFromImpalaType(const 
ColumnType type) {
       return long_cl_;
     } case TYPE_STRING: {    // java.lang.String
       return char_sequence_cl_;
-    } case TYPE_ARRAY: {     // java.lang.util.List
+    } case TYPE_ARRAY: {     // java.util.List
       return list_cl_;
+    } case TYPE_MAP: {       // java.util.Map
+      return map_cl_;
     }
     default:
       VLOG(3) << "Skipping unsupported column type: " << type.type;
@@ -252,4 +335,4 @@ jclass IcebergRowReader::JavaClassFromImpalaType(const 
ColumnType type) {
   return nullptr;
 }
 
-}
\ No newline at end of file
+}
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h 
b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
index 4e7dafd9a..64de2cc86 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.h
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
@@ -46,21 +46,22 @@ class IcebergRowReader {
   static Status InitJNI() WARN_UNUSED_RESULT;
 
   /// Materialize the StructLike Java objects into Impala rows.
-  Status MaterializeTuple(JNIEnv* env, jobject struct_like_row,
+  Status MaterializeTuple(JNIEnv* env, const jobject& struct_like_row,
       const TupleDescriptor* tuple_desc, Tuple* tuple,  MemPool* 
tuple_data_pool,
       RuntimeState* state);
 
  private:
   /// Global class references created with JniUtil.
   inline static jclass list_cl_ = nullptr;
+  inline static jclass map_cl_ = nullptr;
   inline static jclass boolean_cl_ = nullptr;
   inline static jclass integer_cl_ = nullptr;
   inline static jclass long_cl_ = nullptr;
   inline static jclass char_sequence_cl_ = nullptr;
 
   /// Method references created with JniUtil.
-  inline static jmethodID list_get_ = nullptr;
   inline static jmethodID list_size_ = nullptr;
+  inline static jmethodID map_size_ = nullptr;
   inline static jmethodID boolean_value_ = nullptr;
   inline static jmethodID integer_value_ = nullptr;
   inline static jmethodID long_value_ = nullptr;
@@ -73,27 +74,44 @@ class IcebergRowReader {
   /// IcebergMetadataScanner class, used to get and access values inside java 
objects.
   IcebergMetadataScanner* metadata_scanner_;
 
-  /// Reads the value of a primitive from the StructLike, translates it to a 
matching
-  /// Impala type and writes it into the target tuple. The related Accessor 
objects are
-  /// stored in the jaccessors_ map and created during Prepare.
-  Status WriteBooleanSlot(JNIEnv* env, const jobject &accessed_value, void* 
slot);
-  Status WriteIntSlot(JNIEnv* env, const jobject &accessed_value, void* slot);
-  Status WriteLongSlot(JNIEnv* env, const jobject &accessed_value, void* slot);
+  // Writes a Java value into the target tuple. 'struct_like_row' is only used 
for struct
+  // types. It is needed because struct children reside directly in the parent 
tuple of
+  // the struct.
+  Status WriteSlot(JNIEnv* env, const jobject* struct_like_row,
+      const jobject& accessed_value, const SlotDescriptor* slot_desc, Tuple* 
tuple,
+      MemPool* tuple_data_pool, RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Translates the value of a Java primitive to the matching Impala type and 
writes it
+  /// into the target slot.
+  Status WriteBooleanSlot(JNIEnv* env, const jobject &accessed_value, void* 
slot)
+      WARN_UNUSED_RESULT;
+  Status WriteIntSlot(JNIEnv* env, const jobject &accessed_value, void* slot)
+      WARN_UNUSED_RESULT;
+  Status WriteLongSlot(JNIEnv* env, const jobject &accessed_value, void* slot)
+      WARN_UNUSED_RESULT;
   /// Iceberg TimeStamp is parsed into TimestampValue.
-  Status WriteTimeStampSlot(JNIEnv* env, const jobject &accessed_value, void* 
slot);
+  Status WriteTimeStampSlot(JNIEnv* env, const jobject &accessed_value, void* 
slot)
+      WARN_UNUSED_RESULT;
   /// To obtain a character sequence from JNI the JniUtfCharGuard class is 
used. Then the
   /// data has to be copied to the tuple_data_pool, because the JVM releases 
the reference
   /// and reclaims the memory area.
   Status WriteStringSlot(JNIEnv* env, const jobject &accessed_value, void* 
slot,
-      MemPool* tuple_data_pool);
+      MemPool* tuple_data_pool) WARN_UNUSED_RESULT;
 
   /// Nested types recursively call MaterializeTuple method with their child 
tuple.
   Status WriteStructSlot(JNIEnv* env, const jobject &struct_like_row,
       const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
-      RuntimeState* state);
-  Status WriteArraySlot(JNIEnv* env, const jobject &accessed_value, 
CollectionValue* slot,
-      const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
-      RuntimeState* state);
+      RuntimeState* state) WARN_UNUSED_RESULT;
+  template <bool IS_ARRAY>
+  Status WriteCollectionSlot(JNIEnv* env, const jobject &accessed_value,
+      CollectionValue* slot, const SlotDescriptor* slot_desc, MemPool* 
tuple_data_pool,
+      RuntimeState* state) WARN_UNUSED_RESULT;
+  Status WriteArrayItem(JNIEnv* env, const jobject& collection_scanner,
+      const TupleDescriptor* item_tuple_desc, Tuple* tuple,
+      MemPool* tuple_data_pool_collection, RuntimeState* state) 
WARN_UNUSED_RESULT;
+  Status WriteMapKeyAndValue(JNIEnv* env, const jobject& collection_scanner,
+      const TupleDescriptor* item_tuple_desc, Tuple* tuple,
+      MemPool* tuple_data_pool_collection, RuntimeState* state) 
WARN_UNUSED_RESULT;
 
   /// Helper method that gives back the Iceberg Java class for a ColumnType. 
It is
   /// specified in this class, to avoid defining all the Java type classes in 
other
diff --git 
a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java 
b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
index b514269ae..b2669976d 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.iceberg.Accessor;
 import org.apache.iceberg.DataTask;
@@ -131,20 +132,34 @@ public class IcebergMetadataScanner {
   }
 
   /**
-   * Wrapper around an array that is the result of a metadata table scan.
+   * Wrapper around an array or a map that is the result of a metadata table 
scan.
    * It is used to avoid iterating over a list through JNI.
+   * For arrays the returned objects are the elements of the array, for maps 
they are
+   * {Map.Entry} objects containing the key and value.
    */
-  public class ArrayScanner<T> {
-    private Iterator<T> iterator;
+  public static class CollectionScanner<T> {
+    private Iterator<T> iterator_;
 
-    public ArrayScanner(List<T> array) {
-      this.iterator = array.iterator();
+    private CollectionScanner(Iterator<T> iterator) {
       Preconditions.checkNotNull(iterator);
+      iterator_ = iterator;
+    }
+
+    public static <G> CollectionScanner<G> fromArray(List<G> array) {
+      CollectionScanner<G> res = new CollectionScanner<>(array.iterator());
       LOG.trace("Created metadata table array scanner, array size: " + 
array.size());
+      return res;
+    }
+
+    public static <K, V> CollectionScanner<Map.Entry<K, V>> fromMap(Map<K, V> 
map) {
+      CollectionScanner<Map.Entry<K, V>> res =
+          new CollectionScanner<>(map.entrySet().iterator());
+      LOG.trace("Created metadata table map scanner, map size: " + map.size());
+      return res;
     }
 
-    public T GetNextArrayItem() {
-      if (iterator.hasNext()) return iterator.next();
+    public T GetNextCollectionItem() {
+      if (iterator_.hasNext()) return iterator_.next();
       return null;
     }
   }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 6bfd50655..291fdc8b1 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -704,17 +704,97 @@ INT,STRING,TIMESTAMP
 ---- QUERY
 select null_value_counts from 
functional_parquet.iceberg_query_metadata.all_files;
 ---- RESULTS
-'NULL'
-'NULL'
-'NULL'
-'NULL'
+'{2147483546:0,2147483545:0}'
+'{1:0}'
+'{1:0}'
+'{1:0}'
 ---- TYPES
 STRING
+====
+---- QUERY
+select snapshot_id, manifest_list, summary from 
functional_parquet.iceberg_partitioned.snapshots;
+---- RESULTS
+8270633197658268308,'/test-warehouse/iceberg_test/iceberg_partitioned/metadata/snap-8270633197658268308-1-af797bab-2f2c-44df-a77b-d91c7198fe53.avro','{"spark.app.id":"local-1598853475123","added-data-files":"20","added-records":"20","changed-partition-count":"3","total-records":"20","total-data-files":"20"}'
+---- TYPES
+BIGINT,STRING,STRING
+====
+---- QUERY
+select column_sizes, column_sizes from 
functional_parquet.iceberg_partitioned.all_data_files;
+---- RESULTS
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:51,2:51,3:55,4:51}','{1:51,2:51,3:55,4:51}'
+'{1:51,2:51,3:52,4:51}','{1:51,2:51,3:52,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:51,2:51,3:55,4:51}','{1:51,2:51,3:55,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:50,2:51,3:55,4:51}','{1:50,2:51,3:55,4:51}'
+'{1:51,2:51,3:55,4:51}','{1:51,2:51,3:55,4:51}'
+'{1:51,2:51,3:52,4:51}','{1:51,2:51,3:52,4:51}'
+'{1:51,2:51,3:52,4:51}','{1:51,2:51,3:52,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:50,2:51,3:52,4:51}','{1:50,2:51,3:52,4:51}'
+'{1:51,2:51,3:52,4:51}','{1:51,2:51,3:52,4:51}'
+'{1:51,2:51,3:55,4:51}','{1:51,2:51,3:55,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:51,2:51,3:55,4:51}','{1:51,2:51,3:55,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:51,2:51,3:52,4:51}','{1:51,2:51,3:52,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+'{1:51,2:51,3:51,4:51}','{1:51,2:51,3:51,4:51}'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+select summary from (select summary from 
functional_parquet.iceberg_partitioned.snapshots) s;
+---- RESULTS
+'{"spark.app.id":"local-1598853475123","added-data-files":"20","added-records":"20","changed-partition-count":"3","total-records":"20","total-data-files":"20"}'
+---- TYPES
+STRING
+====
+---- QUERY
+with s as (select summary from 
functional_parquet.iceberg_partitioned.snapshots)
+select summary from s;
+---- RESULTS
+'{"spark.app.id":"local-1598853475123","added-data-files":"20","added-records":"20","changed-partition-count":"3","total-records":"20","total-data-files":"20"}'
+---- TYPES
+STRING
+====
+---- QUERY
+select
+    s.operation,
+    h.is_current_ancestor,
+    s.summary
+from functional_parquet.iceberg_query_metadata.history h
+join functional_parquet.iceberg_query_metadata.snapshots s
+  on h.snapshot_id = s.snapshot_id
+order by made_current_at;
+---- RESULTS
+'append',true,'{"added-data-files":"1","added-records":"1","added-files-size":"351","changed-partition-count":"1","total-records":"1","total-files-size":"351","total-data-files":"1","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}'
+'append',true,'{"added-data-files":"1","added-records":"1","added-files-size":"351","changed-partition-count":"1","total-records":"2","total-files-size":"702","total-data-files":"2","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}'
+'append',true,'{"added-data-files":"1","added-records":"1","added-files-size":"351","changed-partition-count":"1","total-records":"3","total-files-size":"1053","total-data-files":"3","total-delete-files":"0","total-position-deletes":"0","total-equality-deletes":"0"}'
+row_regex:'overwrite',true,'{"added-position-delete-files":"1","added-delete-files":"1","added-files-size":"[1-9][0-9]*","added-position-deletes":"1","changed-partition-count":"1","total-records":"3","total-files-size":"[1-9][0-9]*","total-data-files":"3","total-delete-files":"1","total-position-deletes":"1","total-equality-deletes":"0"}'
+---- TYPES
+STRING,BOOLEAN,STRING
+====
 
 ####
-# Describe all the metadata tables once
+# Query MAPs and ARRAYs in the same query
 ####
+---- QUERY
+select column_sizes, value_counts, split_offsets, equality_ids from 
functional_parquet.iceberg_v2_delete_both_eq_and_pos.`files`;
+---- RESULTS
+'{1:40,2:62,3:40}','{1:2,2:2,3:2}','[4]','NULL'
+'{1:40,2:54,3:66}','{1:2,2:2,3:2}','[4]','NULL'
+'{2147483546:215,2147483545:51}','{2147483546:1,2147483545:1}','NULL','NULL'
+'{1:40,3:40}','{1:2,3:2}','[4]','[1,3]'
+'{1:40,3:66}','{1:2,3:2}','[4]','[1,3]'
+---- TYPES
+STRING,STRING,STRING,STRING
 ====
+
+####
+# Describe all the metadata tables once
+####
 ---- QUERY
 describe functional_parquet.iceberg_query_metadata.snapshots;
 ---- RESULTS


Reply via email to