This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ff3d0c79841b78f99fa78d81b1dca4371cd7bc99
Author: Daniel Becker <[email protected]>
AuthorDate: Tue Mar 7 16:53:46 2023 +0100

    IMPALA-12019: Support ORDER BY for arrays of fixed length types in select 
list
    
    As a first stage of IMPALA-10939, this change implements support for
    including in the sorting tuple top-level collections that only contain
    fixed length types (including fixed length structs). For these types the
    implementation is almost the same as the existing handling of strings.
    
    Another limitation is that structs that contain any type of collection
    are not yet allowed in the sorting tuple.
    
    Also refactored the RawValue::Write*() functions to have a clearer
    interface.
    
    Testing:
     - Added a new test table that contains many rows with arrays. This is
       queried in a new test added in test_sort.py, to ensure that we handle
       spilling correctly.
     - Added tests that have arrays and/or maps in the sorting tuple in
       test_queries.py::TestQueries::{test_sort,
           test_top_n,test_partitioned_top_n}.
    
    Change-Id: Ic7974ef392c1412e8c60231e3420367bd189677a
    Reviewed-on: http://gerrit.cloudera.org:8080/19660
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hash-table.cc                          |   2 +-
 be/src/runtime/collection-value.cc                 |   1 +
 be/src/runtime/collection-value.h                  |  18 ++
 be/src/runtime/descriptors.cc                      |  30 ++-
 be/src/runtime/descriptors.h                       |  18 +-
 be/src/runtime/raw-value.cc                        | 176 ++++++++++++------
 be/src/runtime/raw-value.h                         |  68 ++++---
 be/src/runtime/sorter-internal.h                   |  40 ++--
 be/src/runtime/sorter.cc                           | 179 ++++++++++++++----
 be/src/runtime/tuple.cc                            | 134 ++++++++------
 be/src/runtime/tuple.h                             |  73 +++++---
 be/src/runtime/types.h                             |   4 -
 .../java/org/apache/impala/analysis/Analyzer.java  |   2 +-
 .../java/org/apache/impala/analysis/QueryStmt.java |  19 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |  77 +++++++-
 .../apache/impala/analysis/TupleDescriptor.java    |   8 +-
 .../main/java/org/apache/impala/catalog/Type.java  |   2 +
 .../org/apache/impala/planner/AnalyticPlanner.java |  28 +--
 testdata/ComplexTypesTbl/simple_arrays_big.parq    | Bin 0 -> 10252963 bytes
 testdata/data/README                               |  39 ++++
 .../functional/functional_schema_template.sql      |  38 ++++
 .../datasets/functional/schema_constraints.csv     |   6 +
 .../QueryTest/mixed-collections-and-structs.test   |  14 --
 .../QueryTest/nested-array-in-select-list.test     |   6 -
 .../QueryTest/nested-map-in-select-list.test       |   6 -
 .../QueryTest/partitioned-top-n-complex.test       |  50 +++++
 .../queries/QueryTest/sort-complex.test            | 205 +++++++++++++++++++++
 .../queries/QueryTest/top-n-complex.test           |  66 +++++++
 tests/query_test/test_nested_types.py              |  12 +-
 tests/query_test/test_queries.py                   |  15 +-
 tests/query_test/test_sort.py                      |  46 ++++-
 31 files changed, 1100 insertions(+), 282 deletions(-)

diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 2f7b72397..da652c6c4 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -275,7 +275,7 @@ bool HashTableCtx::EvalRow(const TupleRow* row,
     const ColumnType& expr_type = build_exprs_[i]->type();
     DCHECK_LE(expr_type.GetSlotSize(), sizeof(NULL_VALUE));
     val = RawValue::CanonicalValue(val, expr_type);
-    RawValue::Write(val, loc, expr_type, NULL);
+    RawValue::WriteNonNullPrimitive(val, loc, expr_type, NULL);
   }
   return has_null;
 }
diff --git a/be/src/runtime/collection-value.cc 
b/be/src/runtime/collection-value.cc
index 4e6e8ed8e..1f3911acc 100644
--- a/be/src/runtime/collection-value.cc
+++ b/be/src/runtime/collection-value.cc
@@ -20,5 +20,6 @@
 namespace impala {
 
 const char* CollectionValue::LLVM_CLASS_NAME = 
"struct.impala::CollectionValue";
+const char* CollValueAndSize::LLVM_CLASS_NAME = 
"struct.impala::CollValueAndSize";
 
 }
diff --git a/be/src/runtime/collection-value.h 
b/be/src/runtime/collection-value.h
index ff24205ad..f2603d254 100644
--- a/be/src/runtime/collection-value.h
+++ b/be/src/runtime/collection-value.h
@@ -51,6 +51,24 @@ struct __attribute__((__packed__)) CollectionValue {
   static const char* LLVM_CLASS_NAME;
 };
 
+// A struct that contains a pointer to a CollectionValue and its byte size. 
Used instead
+// of std::pair because of codegen, because
+//   - the std::pair type is difficult to name in codegen and
+//   - we are not in control of the layout of std::pair.
+struct CollValueAndSize {
+  CollectionValue* coll_value;
+  // In most (maybe all) cases a 32 bit int should be enough but
+  // 'CollectionValue::ByteSize()' returns int64_t so we use that.
+  int64_t byte_size;
+
+  CollValueAndSize(): CollValueAndSize(nullptr, 0) {}
+  CollValueAndSize(CollectionValue* cv, int64_t size)
+    : coll_value(cv), byte_size(size) {}
+
+  /// For C++/IR interop, we need to be able to look up types by name.
+  static const char* LLVM_CLASS_NAME;
+};
+
 }
 
 #endif
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 54bfe6cd7..872cd7be2 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -916,7 +916,7 @@ void SlotDescriptor::CodegenWriteToSlotHelper(
     CodegenStoreStructToNativePtr(read_write_info, main_tuple_llvm_struct_ptr,
         slot, pool_val, insert_before);
   } else {
-    CodegenStoreNonNullAnyVal(read_write_info, slot, pool_val);
+    CodegenStoreNonNullAnyVal(read_write_info, slot, pool_val, this);
 
     // We only need this branch if we are not a struct, because for structs, 
the last leaf
     // (non-struct) field will add this branch.
@@ -1009,14 +1009,15 @@ CodegenAnyValReadWriteInfo 
CodegenAnyValToReadWriteInfo(CodegenAnyVal& any_val,
 }
 
 void SlotDescriptor::CodegenStoreNonNullAnyVal(CodegenAnyVal& any_val,
-      llvm::Value* raw_val_ptr, llvm::Value* pool_val) {
+      llvm::Value* raw_val_ptr, llvm::Value* pool_val,
+      const SlotDescriptor* slot_desc) {
   CodegenAnyValReadWriteInfo rwi = CodegenAnyValToReadWriteInfo(any_val, 
pool_val);
-  CodegenStoreNonNullAnyVal(rwi, raw_val_ptr, pool_val);
+  CodegenStoreNonNullAnyVal(rwi, raw_val_ptr, pool_val, slot_desc);
 }
 
 void SlotDescriptor::CodegenStoreNonNullAnyVal(
     const CodegenAnyValReadWriteInfo& read_write_info, llvm::Value* 
raw_val_ptr,
-    llvm::Value* pool_val) {
+    llvm::Value* pool_val, const SlotDescriptor* slot_desc) {
   LlvmBuilder* builder = read_write_info.builder();
   const ColumnType& type = read_write_info.type();
   switch (type.type) {
@@ -1024,7 +1025,8 @@ void SlotDescriptor::CodegenStoreNonNullAnyVal(
     case TYPE_VARCHAR:
     case TYPE_ARRAY: // CollectionVal has same memory layout as StringVal.
     case TYPE_MAP: { // CollectionVal has same memory layout as StringVal.
-      CodegenWriteStringOrCollectionToSlot(read_write_info, raw_val_ptr, 
pool_val);
+      CodegenWriteStringOrCollectionToSlot(read_write_info, raw_val_ptr,
+          pool_val, slot_desc);
       break;
     }
     case TYPE_CHAR:
@@ -1097,7 +1099,7 @@ void SlotDescriptor::CodegenSetToNull(const 
CodegenAnyValReadWriteInfo& read_wri
 
 void SlotDescriptor::CodegenWriteStringOrCollectionToSlot(
     const CodegenAnyValReadWriteInfo& read_write_info,
-    llvm::Value* slot_ptr, llvm::Value* pool_val) {
+    llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* 
slot_desc) {
   LlvmCodeGen* codegen = read_write_info.codegen();
   LlvmBuilder* builder = read_write_info.builder();
   const ColumnType& type = read_write_info.type();
@@ -1109,12 +1111,22 @@ void 
SlotDescriptor::CodegenWriteStringOrCollectionToSlot(
   str_or_coll_value = builder->CreateInsertValue(
       str_or_coll_value, read_write_info.GetPtrAndLen().len, 1);
   if (pool_val != nullptr) {
+    llvm::Value* len = read_write_info.GetPtrAndLen().len;
+    if (type.IsCollectionType()) {
+      DCHECK(slot_desc != nullptr) << "SlotDescriptor needed to calculate the 
size of "
+          << "the collection for copying.";
+      // For a 'CollectionValue', 'len' is not the byte size of the whole data 
but the
+      // number of items, so we have to multiply it with the byte size of the 
item tuple
+      // to get the data size.
+      int item_tuple_byte_size = 
slot_desc->children_tuple_descriptor()->byte_size();
+      len = builder->CreateMul(len, 
codegen->GetI32Constant(item_tuple_byte_size));
+    }
+
     // Allocate a 'new_ptr' from 'pool_val' and copy the data from
     // 'read_write_info->ptr'
     llvm::Value* new_ptr = codegen->CodegenMemPoolAllocate(
-        builder, pool_val, read_write_info.GetPtrAndLen().len, "new_ptr");
-    codegen->CodegenMemcpy(builder, new_ptr, 
read_write_info.GetPtrAndLen().ptr,
-        read_write_info.GetPtrAndLen().len);
+        builder, pool_val, len, "new_ptr");
+    codegen->CodegenMemcpy(builder, new_ptr, 
read_write_info.GetPtrAndLen().ptr, len);
     str_or_coll_value = builder->CreateInsertValue(str_or_coll_value, new_ptr, 
0);
   } else {
     str_or_coll_value = builder->CreateInsertValue(
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 4123ceafa..64879c31e 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -192,13 +192,19 @@ class SlotDescriptor {
   ///
   /// If 'pool_val' is non-NULL, var-len data will be copied into 'pool_val'.
   /// 'pool_val' has to be of type MemPool*.
+  ///
+  /// 'slot_desc' is needed when 'pool_val' is non-NULL and the value is a 
collection. In
+  /// this case the collection is copied and the slot desc is needed to 
calculate its byte
+  /// size.
   static void CodegenStoreNonNullAnyVal(CodegenAnyVal& any_val,
-      llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr);
+      llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr,
+      const SlotDescriptor* slot_desc = nullptr);
 
   /// Like the above, but takes a 'CodegenAnyValReadWriteInfo' instead of a
   /// 'CodegenAnyVal'.
   static void CodegenStoreNonNullAnyVal(const CodegenAnyValReadWriteInfo& 
read_write_info,
-      llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr);
+      llvm::Value* raw_val_ptr, llvm::Value* pool_val = nullptr,
+      const SlotDescriptor* slot_desc = nullptr);
 
   /// Like 'CodegenStoreNonNullAnyVal' but stores the value into a new alloca()
   /// allocation. Returns a pointer to the stored value.
@@ -266,11 +272,13 @@ class SlotDescriptor {
       llvm::Value* tuple) const;
 
   /// Codegens writing a string or a collection to the address pointed to by 
'slot_ptr'.
-  /// If 'pool_val' is non-NULL, the data will be copied into 'pool_val'.  
'pool_val' has
-  /// to be of type MemPool*.
+  /// If 'pool_val' is non-NULL, the data will be copied into 'pool_val'. 
'pool_val' has
+  /// to be of type MemPool*. 'slot_desc' is needed when 'pool_val' is 
non-NULL and the
+  /// value is a collection. In this case the collection is copied and the 
slot desc is
+  /// needed to calculate its byte size.
   static void CodegenWriteStringOrCollectionToSlot(
       const CodegenAnyValReadWriteInfo& read_write_info,
-      llvm::Value* slot_ptr, llvm::Value* pool_val);
+      llvm::Value* slot_ptr, llvm::Value* pool_val, const SlotDescriptor* 
slot_desc);
 
   static llvm::Value* CodegenToTimestampValue(
       const CodegenAnyValReadWriteInfo& read_write_info);
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index 928d36016..4a32027a3 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -137,7 +137,7 @@ void RawValue::PrintValue(const void* value, const 
ColumnType& type, int scale,
   *str = out.str();
 }
 
-void RawValue::Write(const void* value, void* dst, const ColumnType& type,
+void RawValue::WriteNonNullPrimitive(const void* value, void* dst, const 
ColumnType& type,
     MemPool* pool) {
   DCHECK(value != NULL);
   switch (type.type) {
@@ -182,9 +182,9 @@ void RawValue::Write(const void* value, void* dst, const 
ColumnType& type,
       dest->len = src->len;
       if (type.type == TYPE_VARCHAR) DCHECK_LE(dest->len, type.len);
       if (pool != NULL) {
-        // Note: if this changes to TryAllocate(), 
CodegenAnyVal::WriteToSlot() will need
-        // to reflect this change as well (the codegen'd Allocate() call is 
actually
-        // generated in CodegenAnyVal::StoreToNativePtr()).
+        // Note: if this changes to TryAllocate(), 
SlotDescriptor::CodegenWriteToSlot()
+        // will need to reflect this change as well (the codegen'd Allocate() 
call is
+        // actually generated in 
SlotDescriptor::CodegenWriteStringOrCollectionToSlot()).
         dest->ptr = reinterpret_cast<char*>(pool->Allocate(dest->len));
         Ubsan::MemCpy(dest->ptr, src->ptr, dest->len);
       } else {
@@ -201,11 +201,8 @@ void RawValue::Write(const void* value, void* dst, const 
ColumnType& type,
       break;
     case TYPE_ARRAY:
     case TYPE_MAP: {
-      DCHECK(pool == NULL) << "RawValue::Write(): deep copy of 
CollectionValues NYI";
-      const CollectionValue* src = reinterpret_cast<const 
CollectionValue*>(value);
-      CollectionValue* dest = reinterpret_cast<CollectionValue*>(dst);
-      dest->num_tuples = src->num_tuples;
-      dest->ptr = src->ptr;
+      // Collections should be handled by a different Write() function within 
this class.
+      DCHECK(false);
       break;
     }
     case TYPE_STRUCT: {
@@ -213,39 +210,62 @@ void RawValue::Write(const void* value, void* dst, const 
ColumnType& type,
       DCHECK(false);
     }
     default:
-      DCHECK(false) << "RawValue::Write(): bad type: " << type.DebugString();
+      DCHECK(false) << "RawValue::WriteNonNullPrimitive(): bad type: "
+          << type.DebugString();
   }
 }
 
 void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* 
slot_desc,
-                     MemPool* pool) {
-  if (value == NULL) {
-    tuple->SetNull(slot_desc->null_indicator_offset());
+    MemPool* pool) {
+  RawValue::Write<false>(value, tuple, slot_desc, pool, nullptr, nullptr);
+}
+
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* 
slot_desc,
+    MemPool* pool, std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values) {
+  if (value == nullptr) {
+    if (slot_desc->type().IsStructType()) {
+      tuple->SetStructToNull(slot_desc);
+    } else {
+      tuple->SetNull(slot_desc->null_indicator_offset());
+    }
   } else {
-    void* slot = tuple->GetSlot(slot_desc->tuple_offset());
-    RawValue::Write(value, slot, slot_desc->type(), pool);
+    RawValue::WriteNonNull<COLLECT_VAR_LEN_VALS>(value, tuple, slot_desc, pool,
+        string_values, collection_values);
   }
 }
 
-template <bool COLLECT_STRING_VALS>
-void RawValue::Write(const void* value, Tuple* tuple,
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::WriteNonNull(const void* value, Tuple* tuple,
     const SlotDescriptor* slot_desc, MemPool* pool,
-    vector<StringValue*>* string_values) {
-  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr &&
-      string_values != nullptr);
-  DCHECK(string_values->size() == 0);
+    vector<StringValue*>* string_values,
+    vector<pair<CollectionValue*, int64_t>>* collection_values) {
+  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr);
+
+  if (COLLECT_VAR_LEN_VALS) {
+    DCHECK(string_values != nullptr);
+    DCHECK(string_values->size() == 0);
+    DCHECK(collection_values != nullptr);
+    DCHECK(collection_values->size() == 0);
+  }
 
   if (slot_desc->type().IsStructType()) {
-    WriteStruct<COLLECT_STRING_VALS>(value, tuple, slot_desc, pool, 
string_values);
+    WriteStruct<COLLECT_VAR_LEN_VALS>(value, tuple, slot_desc, pool,
+        string_values, collection_values);
+  } else if (slot_desc->type().IsCollectionType()) {
+    WriteCollection<COLLECT_VAR_LEN_VALS>(value, tuple, slot_desc, pool,
+        string_values, collection_values);
   } else {
-    WritePrimitive<COLLECT_STRING_VALS>(value, tuple, slot_desc, pool, 
string_values);
+    WritePrimitiveCollectVarlen<COLLECT_VAR_LEN_VALS>(value, tuple, slot_desc, 
pool,
+        string_values, collection_values);
   }
 }
 
-template <bool COLLECT_STRING_VALS>
+template <bool COLLECT_VAR_LEN_VALS>
 void RawValue::WriteStruct(const void* value, Tuple* tuple,
-    const SlotDescriptor* slot_desc, MemPool* pool,
-    vector<StringValue*>* string_values) {
+    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* 
string_values,
+    vector<pair<CollectionValue*, int64_t>>* collection_values) {
   DCHECK(tuple != nullptr);
   DCHECK(slot_desc->type().IsStructType());
   DCHECK(slot_desc->children_tuple_descriptor() != nullptr);
@@ -262,30 +282,66 @@ void RawValue::WriteStruct(const void* value, Tuple* 
tuple,
     uint8_t* src_child = src->ptr[i];
     if (child_slot->type().IsStructType()) {
       // Recursive call in case of nested structs.
-      WriteStruct<COLLECT_STRING_VALS>(src_child, tuple, child_slot, pool,
-          string_values);
+      WriteStruct<COLLECT_VAR_LEN_VALS>(src_child, tuple, child_slot, pool,
+          string_values, collection_values);
       continue;
     }
     if (src_child == nullptr) {
       tuple->SetNull(child_slot->null_indicator_offset());
     } else {
-      WritePrimitive<COLLECT_STRING_VALS>(src_child, tuple, child_slot, pool,
-          string_values);
+      WritePrimitiveCollectVarlen<COLLECT_VAR_LEN_VALS>(src_child, tuple, 
child_slot,
+          pool, string_values, collection_values);
     }
   }
 }
 
-template <bool COLLECT_STRING_VALS>
-void RawValue::WritePrimitive(const void* value, Tuple* tuple,
-    const SlotDescriptor* slot_desc, MemPool* pool,
-    vector<StringValue*>* string_values) {
-  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr &&
-      string_values != nullptr);
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::WriteCollection(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* 
string_values,
+    vector<pair<CollectionValue*, int64_t>>* collection_values) {
+  DCHECK(slot_desc->type().IsCollectionType());
 
   void* dst = tuple->GetSlot(slot_desc->tuple_offset());
-  Write(value, dst, slot_desc->type(), pool);
-  if (COLLECT_STRING_VALS && slot_desc->type().IsVarLenStringType()) {
-    string_values->push_back(reinterpret_cast<StringValue*>(dst));
+
+  // TODO IMPALA-10939: Enable recursive collections.
+  const CollectionValue* src = reinterpret_cast<const CollectionValue*>(value);
+  CollectionValue* dest = reinterpret_cast<CollectionValue*>(dst);
+  dest->num_tuples = src->num_tuples;
+
+  int64_t byte_size = dest->ByteSize(*slot_desc->children_tuple_descriptor());
+  if (pool != nullptr) {
+    // Note: if this changes to TryAllocate(), 
SlotDescriptor::CodegenWriteToSlot() will
+    // need to reflect this change as well (the codegen'd Allocate() call is 
actually
+    // generated in SlotDescriptor::CodegenWriteStringOrCollectionToSlot()).
+    dest->ptr = reinterpret_cast<uint8_t*>(pool->Allocate(byte_size));
+    Ubsan::MemCpy(dest->ptr, src->ptr, byte_size);
+  } else {
+    dest->ptr = src->ptr;
+  }
+
+  if (COLLECT_VAR_LEN_VALS) {
+    DCHECK(string_values != nullptr);
+    DCHECK(collection_values != nullptr);
+    collection_values->push_back(std::make_pair(dest, byte_size));
+  }
+}
+
+template <bool COLLECT_VAR_LEN_VALS>
+void RawValue::WritePrimitiveCollectVarlen(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* 
string_values,
+    vector<pair<CollectionValue*, int64_t>>* collection_values) {
+  DCHECK(value != nullptr && tuple != nullptr && slot_desc != nullptr);
+
+  void* dst = tuple->GetSlot(slot_desc->tuple_offset());
+  WriteNonNullPrimitive(value, dst, slot_desc->type(), pool);
+  if (COLLECT_VAR_LEN_VALS) {
+    DCHECK(string_values != nullptr);
+    DCHECK(collection_values != nullptr);
+    if (slot_desc->type().IsVarLenStringType()) {
+      string_values->push_back(reinterpret_cast<StringValue*>(dst));
+    } else if (slot_desc->type().IsCollectionType()) {
+      DCHECK(false) << "Collections should be handled in WriteCollection.";
+    }
   }
 }
 
@@ -402,22 +458,38 @@ void RawValue::PrintValue(
 
 template void RawValue::Write<true>(const void* value, Tuple* tuple,
     const SlotDescriptor* slot_desc, MemPool* pool,
-    std::vector<StringValue*>* string_values);
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 template void RawValue::Write<false>(const void* value, Tuple* tuple,
     const SlotDescriptor* slot_desc, MemPool* pool,
-    std::vector<StringValue*>* string_values);
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+
+template void RawValue::WriteNonNull<true>(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+template void RawValue::WriteNonNull<false>(const void* value, Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 
 template void RawValue::WriteStruct<true>(const void* value, Tuple* tuple,
-      const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 template void RawValue::WriteStruct<false>(const void* value, Tuple* tuple,
-      const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
-
-template void RawValue::WritePrimitive<true>(const void* value, Tuple* tuple,
-      const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
-template void RawValue::WritePrimitive<false>(const void* value, Tuple* tuple,
-      const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+
+template void RawValue::WritePrimitiveCollectVarlen<true>(const void* value,
+    Tuple* tuple, const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+template void RawValue::WritePrimitiveCollectVarlen<false>(const void* value,
+    Tuple* tuple,
+    const SlotDescriptor* slot_desc, MemPool* pool,
+    std::vector<StringValue*>* string_values,
+    std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 }
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index b3edd9033..b52532633 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -117,23 +117,28 @@ class RawValue {
   static int IR_ALWAYS_INLINE Compare(
       const void* v1, const void* v2, const ColumnType& type) noexcept;
 
-  /// Writes the bytes of a given value into the slot of a tuple.
-  /// For string values, the string data is copied into memory allocated from 
'pool'
-  /// only if pool is non-NULL.
+  /// Writes the bytes of a given value into the slot of a tuple. Supports 
primitive and
+  /// complex types. 'value' is allowed to be NULL. For string and collection 
values, the
+  /// data is copied into memory allocated from 'pool' if pool is non-NULL, 
otherwise the
+  /// data is not copied.
+  /// If COLLECT_VAR_LEN_VALS is true, gathers the string slots of the slot 
tree into
+  /// 'string_values' and the collection slots along with their byte sizes into
+  /// 'collection_values'. In this case, 'string_values' and 
'collection_values' must be
+  /// non-NULL.
+  template <bool COLLECT_VAR_LEN_VALS>
   static void Write(const void* value, Tuple* tuple, const SlotDescriptor* 
slot_desc,
-                    MemPool* pool);
+      MemPool* pool, std::vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 
-  /// Writes 'src' into 'dst' for type.
-  /// For string values, the string data is copied into 'pool' if pool is 
non-NULL.
-  /// src must be non-NULL.
-  static void Write(const void* src, void* dst, const ColumnType& type, 
MemPool* pool);
+  /// Convenience wrapper for the templated version with 
COLLECT_VAR_LEN_VALS=false.
+  static void Write(const void* value, Tuple* tuple, const SlotDescriptor* 
slot_desc,
+      MemPool* pool);
 
-  /// Wrapper function for Write() to handle struct slots and its children. 
Additionally,
-  /// gathers the string slots of the slot tree into 'string_values'.
-  template <bool COLLECT_STRING_VALS>
-  static void Write(const void* value, Tuple* tuple,
-      const SlotDescriptor* slot_desc, MemPool* pool,
-    std::vector<StringValue*>* string_values);
+  /// Writes 'src' into 'dst' for the given primitive type. Does not support 
complex
+  /// types. 'src' must be non-NULL. For string values, the string data is 
copied into
+  /// 'pool' if pool is non-NULL.
+  static void WriteNonNullPrimitive(const void* src, void* dst, const 
ColumnType& type,
+      MemPool* pool);
 
   /// Returns true if v1 == v2.
   /// This is more performant than Compare() == 0 for string equality, mostly 
because of
@@ -164,18 +169,35 @@ class RawValue {
   }
 
 private:
+  /// Like Write() but 'value' must be non-NULL.
+  template <bool COLLECT_VAR_LEN_VALS>
+  static void WriteNonNull(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool,
+      std::vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+
   /// Recursive helper function for Write() to handle struct slots.
-  template <bool COLLECT_STRING_VALS>
+  template <bool COLLECT_VAR_LEN_VALS>
   static void WriteStruct(const void* value, Tuple* tuple,
       const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
-
-  /// Gets the destination slot from 'tuple' and 'slot_desc', writes value to 
this slot
-  /// using Write(). Collects pointer of the string slots to 'string_values'. 
'slot_desc'
-  /// has to be primitive type.
-  template <bool COLLECT_STRING_VALS>
-  static void WritePrimitive(const void* value, Tuple* tuple,
+      std::vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
+
+  /// Recursive helper function for Write() to handle collection slots.
+  template <bool COLLECT_VAR_LEN_VALS>
+  static void WriteCollection(const void* value, Tuple* tuple,
+      const SlotDescriptor* slot_desc, MemPool* pool, vector<StringValue*>* 
string_values,
+      vector<pair<CollectionValue*, int64_t>>* collection_values);
+
+  /// Gets the destination slot from 'tuple' and 'slot_desc' and writes 
'value' to this
+  /// slot. 'value' must be non-NULL. If COLLECT_VAR_LEN_VALS is true, 
collects the
+  /// pointers of the string slots to 'string_values' and the pointers of the 
collection
+  /// slots along with their byte sizes to 'collection_values'. 'slot_desc' 
has to be a
+  /// primitive type.
+  template <bool COLLECT_VAR_LEN_VALS>
+  static void WritePrimitiveCollectVarlen(const void* value, Tuple* tuple,
       const SlotDescriptor* slot_desc, MemPool* pool,
-      std::vector<StringValue*>* string_values);
+      std::vector<StringValue*>* string_values,
+      std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 };
 }
diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index 931ee20b6..7661051ba 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -222,10 +222,16 @@ class Sorter::Run {
   /// if the run is unpinned.
   Status FinalizePages(vector<Page>* pages);
 
-  /// Collect the non-null var-len (e.g. STRING) slots from 'src' in 
'var_len_values' and
-  /// return the total length of all var-len values in 'total_var_len'.
+  void CheckTypeForVarLenCollectionSorting();
+
+  /// Collects the non-null var-len slots (strings and collections) from 
'src'. Strings
+  /// are returned in 'string_values' and collections are returned, along with 
their byte
+  /// size, in 'collection_values'. The total length of all var-len values is 
returned in
+  /// 'total_var_len'.
   void CollectNonNullVarSlots(
-      Tuple* src, vector<StringValue*>* var_len_values, int* total_var_len);
+      Tuple* src, vector<StringValue*>* string_values,
+      std::vector<CollValueAndSize>* collection_values,
+      int* total_var_len);
 
   enum AddPageMode { KEEP_PREV_PINNED, UNPIN_PREV };
 
@@ -251,15 +257,18 @@ class Sorter::Run {
   /// this function will pin the page at 'page_index' + 1 in 'pages'.
   Status PinNextReadPage(vector<Page>* pages, int page_index);
 
-  /// Copy the StringValues in 'var_values' to 'dest' in order and update the 
StringValue
-  /// ptrs in 'dest' to point to the copied data.
-  void CopyVarLenData(const vector<StringValue*>& var_values, uint8_t* dest);
+  /// Copy the var len data in 'string_values' and 
'collection_values_and_sizes' to 'dest'
+  /// in order and update the pointers to point to the copied data.
+  void CopyVarLenData(const vector<StringValue*>& string_values,
+      const vector<CollValueAndSize>& collection_values_and_sizes, uint8_t* 
dest);
 
-  /// Copy the StringValues in 'var_values' to 'dest' in order. Update the 
StringValue
-  /// ptrs in 'dest' to contain a packed offset for the copied data comprising
-  /// page_index and the offset relative to page_start.
-  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values, int 
page_index,
-      const uint8_t* page_start, uint8_t* dest);
+  /// Copy the StringValues in 'var_values' and the CollectionValues 
referenced in
+  /// 'collection_values_and_sizes' to 'dest' in order. Update the StringValue 
ptrs in
+  /// 'dest' to contain a packed offset for the copied data comprising 
page_index and the
+  /// offset relative to page_start.
+  void CopyVarLenDataConvertOffset(const vector<StringValue*>& var_values,
+      const std::vector<CollValueAndSize>& collection_values_and_sizes,
+      int page_index, const uint8_t* page_start, uint8_t* dest);
 
   /// Convert encoded offsets to valid pointers in tuple with layout 
'sort_tuple_desc_'.
   /// 'tuple' is modified in-place. Returns true if the pointers refer to the 
page at
@@ -267,6 +276,13 @@ class Sorter::Run {
   /// data is in the next page, in which case 'tuple' is unmodified.
   bool ConvertOffsetsToPtrs(Tuple* tuple);
 
+  template <class ValueType>
+  bool ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
+      const vector<SlotDescriptor*>& slots);
+
+  bool ConvertStringValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start);
+  bool ConvertCollectionValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start);
+
   int NumOpenPages(const vector<Page>& pages);
 
   /// Close all open pages and clear vector.
@@ -289,7 +305,7 @@ class Sorter::Run {
 
   const bool has_var_len_slots_;
 
-  /// True if this is an initial run. False implies this is an sorted 
intermediate run
+  /// True if this is an initial run. False implies this is a sorted 
intermediate run
   /// resulting from merging other runs.
   const bool initial_run_;
 
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 5f853d50b..ae885252b 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -146,6 +146,9 @@ Status Sorter::Run::Init() {
   } else {
     sorter_->spilled_runs_counter_->Add(1);
   }
+
+  CheckTypeForVarLenCollectionSorting();
+
   return Status::OK();
 }
 
@@ -180,6 +183,7 @@ Status Sorter::Run::AddBatchInternal(
   // processed.
   int cur_input_index = start_index;
   vector<StringValue*> string_values;
+  vector<CollValueAndSize> coll_values_and_sizes;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
   while (cur_input_index < batch->num_rows()) {
     // tuples_remaining is the number of tuples to copy/materialize into
@@ -195,7 +199,7 @@ Status Sorter::Run::AddBatchInternal(
       if (INITIAL_RUN) {
         new_tuple->MaterializeExprs<HAS_VAR_LEN_SLOTS, true>(input_row,
             *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, nullptr,
-            &string_values, &total_var_len);
+            &string_values, &coll_values_and_sizes, &total_var_len);
         if (total_var_len > sorter_->page_len_) {
           int64_t max_row_size = sorter_->state_->query_options().max_row_size;
           return Status(TErrorCode::MAX_ROW_SIZE,
@@ -205,7 +209,8 @@ Status Sorter::Run::AddBatchInternal(
       } else {
         memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
         if (HAS_VAR_LEN_SLOTS) {
-          CollectNonNullVarSlots(new_tuple, &string_values, &total_var_len);
+          CollectNonNullVarSlots(new_tuple, &string_values,
+              &coll_values_and_sizes, &total_var_len);
         }
       }
 
@@ -225,19 +230,13 @@ Status Sorter::Run::AddBatchInternal(
           }
         }
 
-        // Sorting of tuples containing array values is not implemented. The 
planner
-        // combined with projection should guarantee that none are in each 
tuple.
-        for (const SlotDescriptor* coll_slot: 
sort_tuple_desc_->collection_slots()) {
-          DCHECK(new_tuple->IsNull(coll_slot->null_indicator_offset()));
-        }
-
+        DCHECK_EQ(&var_len_pages_.back(), cur_var_len_page);
         uint8_t* var_data_ptr = cur_var_len_page->AllocateBytes(total_var_len);
         if (INITIAL_RUN) {
-          CopyVarLenData(string_values, var_data_ptr);
+          CopyVarLenData(string_values, coll_values_and_sizes, var_data_ptr);
         } else {
-          DCHECK_EQ(&var_len_pages_.back(), cur_var_len_page);
-          CopyVarLenDataConvertOffset(string_values, var_len_pages_.size() - 1,
-              cur_var_len_page->data(), var_data_ptr);
+          CopyVarLenDataConvertOffset(string_values, coll_values_and_sizes,
+              var_len_pages_.size() - 1, cur_var_len_page->data(), 
var_data_ptr);
         }
       }
       ++num_tuples_;
@@ -257,6 +256,31 @@ Status Sorter::Run::AddBatchInternal(
   return Status::OK();
 }
 
+bool is_allowed_collection_item_type(const ColumnType& type) {
+  if (type.IsStructType()) {
+    for (const ColumnType& child_type : type.children) {
+      if (!is_allowed_collection_item_type(child_type)) return false;
+    }
+    return true;
+  }
+
+  return !type.IsComplexType() && !type.IsVarLenStringType();
+}
+
+void Sorter::Run::CheckTypeForVarLenCollectionSorting() {
+  // Sorting of tuples containing collection values is only implemented if the 
items are
+  // fixed length types. The planner combined with projection should guarantee 
that only
+  // such values are in each tuple.
+  for (const SlotDescriptor* coll_slot: sort_tuple_desc_->collection_slots()) {
+    for (SlotDescriptor* child_slot :
+        coll_slot->children_tuple_descriptor()->slots()) {
+      DCHECK(is_allowed_collection_item_type(child_slot->type()))
+          << "Type not allowed in collection in sorting tuple: "
+          << child_slot->type() << ".";
+    }
+  }
+}
+
 Status Sorter::Run::FinalizeInput() {
   DCHECK(!is_finalized_);
 
@@ -306,6 +330,7 @@ Status Sorter::Run::UnpinAllPages() {
   sorted_var_len_pages.reserve(var_len_pages_.size());
 
   vector<StringValue*> string_values;
+  vector<CollValueAndSize> collection_values_and_sizes;
   int total_var_len;
   string_values.reserve(sort_tuple_desc_->string_slots().size());
   Page* cur_sorted_var_len_page = nullptr;
@@ -331,7 +356,8 @@ Status Sorter::Run::UnpinAllPages() {
       for (int page_offset = 0; page_offset < cur_fixed_page->valid_data_len();
            page_offset += sort_tuple_size_) {
         Tuple* cur_tuple = reinterpret_cast<Tuple*>(cur_fixed_page->data() + 
page_offset);
-        CollectNonNullVarSlots(cur_tuple, &string_values, &total_var_len);
+        CollectNonNullVarSlots(cur_tuple, &string_values, 
&collection_values_and_sizes,
+            &total_var_len);
         DCHECK(cur_sorted_var_len_page->is_open());
         if (cur_sorted_var_len_page->BytesRemaining() < total_var_len) {
           bool added;
@@ -342,8 +368,9 @@ Status Sorter::Run::UnpinAllPages() {
         }
         uint8_t* var_data_ptr = 
cur_sorted_var_len_page->AllocateBytes(total_var_len);
         DCHECK_EQ(&sorted_var_len_pages.back(), cur_sorted_var_len_page);
-        CopyVarLenDataConvertOffset(string_values, sorted_var_len_pages.size() 
- 1,
-            cur_sorted_var_len_page->data(), var_data_ptr);
+        CopyVarLenDataConvertOffset(string_values, collection_values_and_sizes,
+            sorted_var_len_pages.size() - 1, cur_sorted_var_len_page->data(),
+            var_data_ptr);
       }
     }
     cur_fixed_page->Unpin(sorter_->buffer_pool_client_);
@@ -534,9 +561,12 @@ Status Sorter::Run::PinNextReadPage(vector<Page>* pages, 
int page_index) {
 }
 
 void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
-    vector<StringValue*>* string_values, int* total_var_len) {
+    vector<StringValue*>* string_values,
+    vector<CollValueAndSize>* collection_values, int* total_var_len) {
   string_values->clear();
+  collection_values->clear();
   *total_var_len = 0;
+
   for (const SlotDescriptor* string_slot: sort_tuple_desc_->string_slots()) {
     if (!src->IsNull(string_slot->null_indicator_offset())) {
       StringValue* string_val =
@@ -545,6 +575,17 @@ void Sorter::Run::CollectNonNullVarSlots(Tuple* src,
       *total_var_len += string_val->len;
     }
   }
+
+  for (const SlotDescriptor* collection_slot: 
sort_tuple_desc_->collection_slots()) {
+    if (!src->IsNull(collection_slot->null_indicator_offset())) {
+      CollectionValue* collection_value = reinterpret_cast<CollectionValue*>(
+          src->GetSlot(collection_slot->tuple_offset()));
+      int64_t byte_size = collection_value->ByteSize(
+          *collection_slot->children_tuple_descriptor());
+      collection_values->push_back(CollValueAndSize(collection_value, 
byte_size));
+      *total_var_len += byte_size;
+    }
+  }
 }
 
 Status Sorter::Run::TryAddPage(
@@ -574,15 +615,34 @@ Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
 }
 
 void Sorter::Run::CopyVarLenData(const vector<StringValue*>& string_values,
-    uint8_t* dest) {
+    const vector<CollValueAndSize>& collection_values_and_sizes, uint8_t* 
dest) {
   for (StringValue* string_val: string_values) {
     Ubsan::MemCpy(dest, string_val->ptr, string_val->len);
     string_val->ptr = reinterpret_cast<char*>(dest);
     dest += string_val->len;
   }
+
+  // TODO IMPALA-10939: Check embedded varlen types recursively.
+  for (const CollValueAndSize& coll_val_and_size : 
collection_values_and_sizes) {
+    CollectionValue* coll_val = coll_val_and_size.coll_value;
+    int64_t byte_size = coll_val_and_size.byte_size;
+    Ubsan::MemCpy(dest, coll_val->ptr, byte_size);
+    coll_val->ptr = dest;
+    dest += byte_size;
+  }
+}
+
+uint64_t PackOffset(uint64_t page_index, uint32_t page_offset) {
+  return (page_index << 32) | page_offset;
+}
+
+void UnpackOffset(uint64_t packed_offset, uint32_t* page_index, uint32_t* 
page_offset) {
+    *page_index = packed_offset >> 32;
+    *page_offset = packed_offset & 0xFFFFFFFF;
 }
 
 void Sorter::Run::CopyVarLenDataConvertOffset(const vector<StringValue*>& 
string_values,
+    const vector<CollValueAndSize>& collection_values_and_sizes,
     int page_index, const uint8_t* page_start, uint8_t* dest) {
   DCHECK_GE(page_index, 0);
   DCHECK_GE(dest - page_start, 0);
@@ -592,10 +652,23 @@ void Sorter::Run::CopyVarLenDataConvertOffset(const 
vector<StringValue*>& string
     DCHECK_LE(dest - page_start, sorter_->page_len_);
     DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
     uint32_t page_offset = dest - page_start;
-    uint64_t packed_offset = (static_cast<uint64_t>(page_index) << 32) | 
page_offset;
+    uint64_t packed_offset = PackOffset(page_index, page_offset);
     string_val->ptr = reinterpret_cast<char*>(packed_offset);
     dest += string_val->len;
   }
+
+  // TODO IMPALA-10939: Check embedded varlen types recursively.
+  for (const CollValueAndSize&  coll_val_and_size : 
collection_values_and_sizes) {
+    CollectionValue* coll_value = coll_val_and_size.coll_value;
+    int64_t byte_size = coll_val_and_size.byte_size;
+    memcpy(dest, coll_value->ptr, byte_size);
+    DCHECK_LE(dest - page_start, sorter_->page_len_);
+    DCHECK_LE(dest - page_start, numeric_limits<uint32_t>::max());
+    uint32_t page_offset = dest - page_start;
+    uint64_t packed_offset = PackOffset(page_index, page_offset);
+    coll_value->ptr = reinterpret_cast<uint8_t*>(packed_offset);
+    dest += byte_size;
+  }
 }
 
 bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
@@ -604,20 +677,46 @@ bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
   uint8_t* page_start =
       var_len_pages_.empty() ? nullptr : 
var_len_pages_[var_len_pages_index_].data();
 
-  const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
-  int num_non_null_string_slots = 0;
-  for (auto slot_desc : string_slots) {
+  bool strings_converted = ConvertStringValueOffsetsToPtrs(tuple, page_start);
+  if (!strings_converted) return false;
+  return ConvertCollectionValueOffsetsToPtrs(tuple, page_start);
+}
+
+// Helpers for Sorter::Run::ConvertValueOffsetsToPtr() to get the byte size 
based on the
+// type.
+int64_t GetByteSize(const StringValue& string_value, const SlotDescriptor& 
slot_desc) {
+  return string_value.len;
+}
+
+int64_t GetByteSize(const CollectionValue& collection_value,
+    const SlotDescriptor& slot_desc) {
+ return collection_value.ByteSize(*slot_desc.children_tuple_descriptor());
+}
+
+template <class ValueType>
+bool Sorter::Run::ConvertValueOffsetsToPtrs(Tuple* tuple, uint8_t* page_start,
+    const vector<SlotDescriptor*>& slots) {
+  static_assert(std::is_same_v<ValueType, StringValue>
+      || std::is_same_v<ValueType, CollectionValue>);
+  int num_non_null_slots = 0;
+  for (auto slot_desc : slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-    ++num_non_null_string_slots;
+    ++num_non_null_slots;
 
-    DCHECK(slot_desc->type().IsVarLenStringType());
-    StringValue* value = reinterpret_cast<StringValue*>(
+    if (std::is_same_v<ValueType, StringValue>) {
+      DCHECK(slot_desc->type().IsVarLenStringType());
+    } else {
+      DCHECK(slot_desc->type().IsCollectionType());
+    }
+
+    ValueType* value = reinterpret_cast<ValueType*>(
         tuple->GetSlot(slot_desc->tuple_offset()));
     // packed_offset includes the page index in the upper 32 bits and the page
     // offset in the lower 32 bits. See CopyVarLenDataConvertOffset().
     uint64_t packed_offset = reinterpret_cast<uint64_t>(value->ptr);
-    uint32_t page_index = packed_offset >> 32;
-    uint32_t page_offset = packed_offset & 0xFFFFFFFF;
+    uint32_t page_index;
+    uint32_t page_offset;
+    UnpackOffset(packed_offset, &page_index, &page_offset);
 
     if (page_index > var_len_pages_index_) {
       // We've reached the page boundary for the current var-len page.
@@ -626,26 +725,42 @@ bool Sorter::Run::ConvertOffsetsToPtrs(Tuple* tuple) {
       DCHECK_LE(page_index, var_len_pages_.size());
       DCHECK_EQ(page_index, var_len_pages_index_ + 1);
       DCHECK_EQ(page_offset, 0); // The data is the first thing in the next 
page.
-      // This must be the first slot with var len data for the tuple. Var len 
data
-      // for tuple shouldn't be split across blocks.
-      DCHECK_EQ(num_non_null_string_slots, 1);
+                                 // This must be the first slot with var len 
data for the
+                                 // tuple. Var len data for tuple shouldn't be 
split
+                                 // across blocks.
+      DCHECK_EQ(num_non_null_slots, 1);
       return false;
     }
 
     DCHECK_EQ(page_index, var_len_pages_index_);
+
+    const int64_t byte_size = GetByteSize(*value, *slot_desc);
+
     if (var_len_pages_.empty()) {
-      DCHECK_EQ(value->len, 0);
+      DCHECK_EQ(byte_size, 0);
     } else {
-      DCHECK_LE(page_offset + value->len, 
var_len_pages_[page_index].valid_data_len());
+      DCHECK_LE(page_offset + byte_size, 
var_len_pages_[page_index].valid_data_len());
     }
     // Calculate the address implied by the offset and assign it. May be 
nullptr for
     // zero-length strings if there are no pages in the run since page_start 
is nullptr.
     DCHECK(page_start != nullptr || page_offset == 0);
-    value->ptr = reinterpret_cast<char*>(page_start + page_offset);
+    using ptr_type = decltype(value->ptr);
+    value->ptr = reinterpret_cast<ptr_type>(page_start + page_offset);
   }
   return true;
 }
 
+bool Sorter::Run::ConvertStringValueOffsetsToPtrs(Tuple* tuple, uint8_t* 
page_start) {
+  const vector<SlotDescriptor*>& string_slots = 
sort_tuple_desc_->string_slots();
+  return ConvertValueOffsetsToPtrs<StringValue>(tuple, page_start, 
string_slots);
+}
+
+bool Sorter::Run::ConvertCollectionValueOffsetsToPtrs(Tuple* tuple, uint8_t* 
page_start) {
+  const vector<SlotDescriptor*>& collection_slots = 
sort_tuple_desc_->collection_slots();
+  return ConvertValueOffsetsToPtrs<CollectionValue>(tuple, page_start, 
collection_slots);
+
+}
+
 int64_t Sorter::Run::TotalBytes() const {
   int64_t total_bytes = 0;
   for (const Page& page : fixed_len_pages_) {
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 3136ee181..12cc50dc4 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -217,11 +217,12 @@ void Tuple::SetNullIndicators(NullIndicatorOffset offset, 
int64_t num_tuples,
   }
 }
 
-template <bool COLLECT_STRING_VALS, bool NO_POOL>
+template <bool COLLECT_VAR_LEN_VALS, bool NO_POOL>
 void Tuple::MaterializeExprs(TupleRow* row, const TupleDescriptor& desc,
     ScalarExprEvaluator* const* evals, MemPool* pool,
-    StringValue** non_null_string_values, int* total_string_lengths,
-    int* num_non_null_string_values) {
+    StringValue** non_null_string_values, CollValueAndSize* 
non_null_collection_values,
+    int* total_varlen_lengths,
+    int* num_non_null_string_values, int* num_non_null_collection_values) {
   ClearNullBits(desc);
   // Evaluate the materialize_expr_evals and place the results in the tuple.
   for (int i = 0; i < desc.slots().size(); ++i) {
@@ -232,22 +233,27 @@ void Tuple::MaterializeExprs(TupleRow* row, const 
TupleDescriptor& desc,
     DCHECK(slot_desc->type().type == TYPE_NULL ||
         slot_desc->type() == evals[i]->root().type());
     void* src = evals[i]->GetValue(row);
-    if (src != NULL) {
-      vector<StringValue*> string_values;
-      RawValue::Write<COLLECT_STRING_VALS>(src, this, slot_desc, pool, 
&string_values);
-      if (string_values.size() > 0) {
-        for (StringValue* string_val : string_values) {
-          *(non_null_string_values++) = string_val;
-          *total_string_lengths += string_val->len;
-        }
-        (*num_non_null_string_values) += string_values.size();
+
+    vector<StringValue*> string_values;
+    vector<pair<CollectionValue*, int64_t>> collection_values;
+    RawValue::Write<COLLECT_VAR_LEN_VALS>(src, this, slot_desc, pool,
+        &string_values, &collection_values);
+    if (string_values.size() > 0) {
+      for (StringValue* string_val : string_values) {
+        *(non_null_string_values++) = string_val;
+        *total_varlen_lengths += string_val->len;
       }
-    } else {
-      if (slot_desc->type().IsStructType()) {
-        SetStructToNull(slot_desc);
-      } else {
-        SetNull(slot_desc->null_indicator_offset());
+      (*num_non_null_string_values) += string_values.size();
+    }
+
+    if (collection_values.size() > 0) {
+      for (const pair<CollectionValue*, int64_t>& collection_val_pair
+          : collection_values) {
+        CollValueAndSize cvs(collection_val_pair.first, 
collection_val_pair.second);
+        *(non_null_collection_values++) = cvs;
+        *total_varlen_lengths += collection_val_pair.second;
       }
+      (*num_non_null_collection_values) += collection_values.size();
     }
   }
 }
@@ -277,20 +283,21 @@ char* Tuple::AllocateStrings(const char* err_ctx, 
RuntimeState* state,
 // Codegens an unrolled version of MaterializeExprs(). Uses codegen'd exprs 
and slot
 // writes. If 'pool' is non-NULL, string data is copied into it.
 //
-// Example IR for materializing a string column with non-NULL 'pool':
+// Example IR for materializing a string column with non-NULL 'pool', produced 
by the
+// following query:
+//   select l_comment
+//   from tpch.lineitem
+//   order by l_comment
+//   limit 10;
 //
-// Produced for the following query:
-//   select string_col from functional_orc_def.alltypes order by string_col 
limit 2;
-//
-// define void @MaterializeExprs(
-//     %"class.impala::Tuple"* %opaque_tuple,
-//     %"class.impala::TupleRow"* %row,
-//     %"class.impala::TupleDescriptor"* %desc,
-//     %"class.impala::ScalarExprEvaluator"** %slot_materialize_exprs,
+// define void @MaterializeExprs(%"class.impala::Tuple"* %opaque_tuple,
+//     %"class.impala::TupleRow"* %row, %"class.impala::TupleDescriptor"* 
%desc,
+//     %"class.impala::ScalarExprEvaluator"** %slot_materialize_expr_evals,
 //     %"class.impala::MemPool"* %pool,
 //     %"struct.impala::StringValue"** %non_null_string_values,
-//     i32* %total_string_lengths,
-//     i32* %num_non_null_string_values) #48 {
+//     %"struct.impala::CollValueAndSize"* %non_null_collection_values,
+//     i32* %total_varlen_lengths, i32* %num_non_null_string_values,
+//     i32* %num_non_null_collection_values) #50 {
 // entry:
 //   %tuple = bitcast %"class.impala::Tuple"* %opaque_tuple
 //       to <{ %"struct.impala::StringValue", i8 }>*
@@ -298,11 +305,10 @@ char* Tuple::AllocateStrings(const char* err_ctx, 
RuntimeState* state,
 //   %null_bytes_ptr = getelementptr inbounds i8, i8* %int8_ptr, i32 12
 //   call void @llvm.memset.p0i8.i64(i8* %null_bytes_ptr, i8 0, i64 1, i32 0, 
i1 false)
 //   %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-//                      %"class.impala::ScalarExprEvaluator"** 
%slot_materialize_exprs,
-//                      i32 0
+//       %"class.impala::ScalarExprEvaluator"** %slot_materialize_expr_evals, 
i32 0
 //   %expr_eval = load %"class.impala::ScalarExprEvaluator"*,
-//                     %"class.impala::ScalarExprEvaluator"** %0
-//   %src = call { i64, i8* } @GetSlotRef.3(
+//       %"class.impala::ScalarExprEvaluator"** %0
+//   %src = call { i64, i8* } @GetSlotRef.4(
 //       %"class.impala::ScalarExprEvaluator"* %expr_eval,
 //       %"class.impala::TupleRow"* %row)
 //   ; -- generated by CodegenAnyVal::ToReadWriteInfo() and 
SlotDescriptor::WriteToSlot()
@@ -314,25 +320,17 @@ char* Tuple::AllocateStrings(const char* err_ctx, 
RuntimeState* state,
 //   br i1 %is_null, label %null, label %non_null
 //
 // non_null:                                         ; preds = %entry1
-//   %src2 = extractvalue { i64, i8* } %src, 1
 //   %2 = extractvalue { i64, i8* } %src, 0
 //   %3 = ashr i64 %2, 32
 //   %4 = trunc i64 %3 to i32
+//   %src2 = extractvalue { i64, i8* } %src, 1
 //   %slot = getelementptr inbounds <{ %"struct.impala::StringValue", i8 }>,
-//                                  <{ %"struct.impala::StringValue", i8 }>* 
%tuple,
-//                                  i32 0,
-//                                  i32 0
+//       <{ %"struct.impala::StringValue", i8 }>* %tuple, i32 0, i32 0
 //   %5 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %4, 1
 //   %6 = sext i32 %4 to i64
 //   %new_ptr = call i8* @_ZN6impala7MemPool8AllocateILb0EEEPhli(
-//       %"class.impala::MemPool"* %pool,
-//       i64 %6,
-//       i32 8)
-//   call void @llvm.memcpy.p0i8.p0i8.i32(
-//       i8* %new_ptr,
-//       i8* %src2,
-//       i32 %4,
-//       i32 0,
+//       %"class.impala::MemPool"* %pool, i64 %6, i32 8)
+//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %new_ptr, i8* %src2, i32 %4, i32 
0,
 //       i1 false)
 //   %7 = insertvalue %"struct.impala::StringValue" %5, i8* %new_ptr, 0
 //   store %"struct.impala::StringValue" %7, %"struct.impala::StringValue"* 
%slot
@@ -350,12 +348,13 @@ char* Tuple::AllocateStrings(const char* err_ctx, 
RuntimeState* state,
 //   ; -- end CodegenAnyVal::ToReadWriteInfo() and 
SlotDescriptor::WriteToSlot() --------
 //   ret void
 // }
-Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool 
collect_string_vals,
+Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* codegen, bool 
collect_varlen_vals,
     const TupleDescriptor& desc, const vector<ScalarExpr*>& 
slot_materialize_exprs,
     bool use_mem_pool, llvm::Function** fn) {
-  // Only support 'collect_string_vals' == false for now.
-  if (collect_string_vals) {
-    return Status("CodegenMaterializeExprs() collect_string_vals == true NYI");
+  // Only support 'collect_varlen_vals' == false for now.
+  // TODO IMPALA-12068: implement it for 'collect_varlen_vals' == true too.
+  if (collect_varlen_vals) {
+    return Status("CodegenMaterializeExprs() collect_varlen_vals == true NYI");
   }
   llvm::LLVMContext& context = codegen->context();
 
@@ -372,9 +371,12 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* 
codegen, bool collect_string_
 
   // Construct function signature (this must exactly match the actual 
signature since it's
   // used in xcompiled IR). With 'pool':
-  // void MaterializeExprs(Tuple* tuple, TupleRow* row, TupleDescriptor* desc,
-  //     ScalarExprEvaluator** slot_materialize_exprs, MemPool* pool,
-  //     StringValue** non_null_string_values, int* total_string_lengths)
+  // void MaterializeExprs(Tuple* opaque_tuple, TupleRow* row,
+  //     const TupleDescriptor& desc, ScalarExprEvaluator* const* evals, 
MemPool* pool,
+  //     StringValue** non_null_string_values,
+  //     CollValueAndSize* non_null_collection_values,
+  //     int* total_varlen_lengths, int* num_non_null_string_values,
+  //     int* num_non_null_collection_values);
   llvm::PointerType* opaque_tuple_type = codegen->GetStructPtrType<Tuple>();
   llvm::PointerType* row_type = codegen->GetStructPtrType<TupleRow>();
   llvm::PointerType* desc_type = codegen->GetStructPtrType<TupleDescriptor>();
@@ -382,29 +384,35 @@ Status Tuple::CodegenMaterializeExprs(LlvmCodeGen* 
codegen, bool collect_string_
       codegen->GetStructPtrPtrType<ScalarExprEvaluator>();
   llvm::PointerType* pool_type = codegen->GetStructPtrType<MemPool>();
   llvm::PointerType* string_values_type = 
codegen->GetStructPtrPtrType<StringValue>();
+  llvm::PointerType* coll_values_and_sizes_type =
+      codegen->GetStructPtrType<CollValueAndSize>();
   llvm::PointerType* int_ptr_type = codegen->i32_ptr_type();
   LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeExprs", 
codegen->void_type());
   prototype.AddArgument("opaque_tuple", opaque_tuple_type);
   prototype.AddArgument("row", row_type);
   prototype.AddArgument("desc", desc_type);
-  prototype.AddArgument("slot_materialize_exprs", expr_evals_type);
+  prototype.AddArgument("slot_materialize_expr_evals", expr_evals_type);
   prototype.AddArgument("pool", pool_type);
   prototype.AddArgument("non_null_string_values", string_values_type);
-  prototype.AddArgument("total_string_lengths", int_ptr_type);
+  prototype.AddArgument("non_null_collection_values", 
coll_values_and_sizes_type);
+  prototype.AddArgument("total_varlen_lengths", int_ptr_type);
   prototype.AddArgument("num_non_null_string_values", int_ptr_type);
+  prototype.AddArgument("num_non_null_collection_values", int_ptr_type);
 
   LlvmBuilder builder(context);
-  llvm::Value* args[8];
+  llvm::Value* args[10];
   *fn = prototype.GeneratePrototype(&builder, args);
   llvm::Value* opaque_tuple_arg = args[0];
   llvm::Value* row_arg = args[1];
   // llvm::Value* desc_arg = args[2]; // unused
   llvm::Value* expr_evals_arg = args[3];
   llvm::Value* pool_arg = args[4];
-  // The followings arguments are unused as 'collect_string_vals' is false.
+  // The followings arguments are unused as 'collect_varlen_vals' is false.
   // llvm::Value* non_null_string_values_arg = args[5]; // unused
-  // llvm::Value* total_string_lengths_arg = args[6]; // unused
-  // llvm::Value* num_non_null_string_values_arg = args[7]; // unused
+  // llvm::Value* non_null_collection_values_arg = args[6]; // unused
+  // llvm::Value* total_varlen_lengths_arg = args[7]; // unused
+  // llvm::Value* num_non_null_string_values_arg = args[8]; // unused
+  // llvm::Value* num_non_null_collection_values_arg = args[9]; // unused
 
   // Cast the opaque Tuple* argument to the generated struct type
   llvm::Type* tuple_struct_type = desc.GetLlvmStruct(codegen);
@@ -509,11 +517,15 @@ llvm::Constant* SlotOffsets::ToIR(LlvmCodeGen* codegen) 
const {
 }
 
 template void Tuple::MaterializeExprs<false, false>(TupleRow*, const 
TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
+    int*, int*, int*);
 template void Tuple::MaterializeExprs<false, true>(TupleRow*, const 
TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
+    int*, int*, int*);
 template void Tuple::MaterializeExprs<true, false>(TupleRow*, const 
TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
+    int*, int*, int*);
 template void Tuple::MaterializeExprs<true, true>(TupleRow*, const 
TupleDescriptor&,
-    ScalarExprEvaluator* const*, MemPool*, StringValue**, int*, int*);
+    ScalarExprEvaluator* const*, MemPool*, StringValue**, CollValueAndSize*,
+    int*, int*, int*);
 }
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 82c78f0aa..85cded16b 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -22,6 +22,7 @@
 #include "codegen/impala-ir.h"
 #include "common/logging.h"
 #include "gutil/macros.h"
+#include "runtime/collection-value.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
 #include "util/ubsan.h"
@@ -163,35 +164,53 @@ class Tuple {
   /// Callers of CodegenMaterializeExprs must set 'use_mem_pool' to true to 
generate the
   /// IR function for the case 'pool' is non-NULL and false for the NULL pool 
case.
   ///
-  /// If 'COLLECT_STRING_VALS' is true, the materialized non-NULL string value 
slots and
-  /// the total length of the string slots are returned in 
'non_null_string_values' and
-  /// 'total_string_lengths'. 'non_null_string_values' and 
'total_string_lengths' must be
-  /// non-NULL in this case. 'non_null_string_values' does not need to be 
empty; its
-  /// original contents will be overwritten.
-  /// TODO: this function does not collect other var-len types such as 
collections.
-  template <bool COLLECT_STRING_VALS, bool NULL_POOL>
+  /// If 'COLLECT_VAR_LEN_VALS' is true
+  ///  - the materialized non-NULL string value slots and are returned in
+  ///    'non_null_string_values',
+  ///  - the materialized non-NULL collection value slots, along with their 
byte sizes,
+  ///    are returned in 'non_null_collection_values' and
+  ///  - the total length of the string and collection slots is returned in
+  ///    'total_varlen_lengths'.
+  /// 'non_null_string_values', 'non_null_collection_values' and 
'total_varlen_lengths'
+  /// must be non-NULL in this case. 'non_null_string_values' and
+  /// 'non_null_collection_values' do not need to be empty; their original 
contents will
+  /// be overwritten.
+  template <bool COLLECT_VAR_LEN_VALS, bool NULL_POOL>
   inline void IR_ALWAYS_INLINE MaterializeExprs(TupleRow* row,
       const TupleDescriptor& desc, const std::vector<ScalarExprEvaluator*>& 
evals,
-      MemPool* pool, std::vector<StringValue*>* non_null_string_values = NULL,
-      int* total_string_lengths = NULL) {
-    DCHECK_EQ(NULL_POOL, pool == NULL);
+      MemPool* pool, std::vector<StringValue*>* non_null_string_values = 
nullptr,
+      std::vector<CollValueAndSize>* non_null_collection_values = nullptr,
+      int* total_varlen_lengths = nullptr) {
+    DCHECK_EQ(NULL_POOL, pool == nullptr);
     DCHECK_EQ(evals.size(), desc.slots().size());
-    StringValue** non_null_string_values_array = NULL;
+
+    StringValue** non_null_string_values_array = nullptr;
+    CollValueAndSize* non_null_coll_vals_and_sizes_array = nullptr;
     int num_non_null_string_values = 0;
-    if (COLLECT_STRING_VALS) {
-      DCHECK(non_null_string_values != NULL);
-      DCHECK(total_string_lengths != NULL);
-      // string::resize() will zero-initialize any new values, so we resize to 
the largest
+    int num_non_null_collection_values = 0;
+    if (COLLECT_VAR_LEN_VALS) {
+      DCHECK(non_null_string_values != nullptr);
+      DCHECK(non_null_collection_values != nullptr);
+      DCHECK(total_varlen_lengths != nullptr);
+      // vector::resize() will zero-initialize any new values, so we resize to 
the largest
       // possible size here, then truncate the vector below once we know the 
actual size
       // (which preserves already-written values).
       non_null_string_values->resize(desc.string_slots().size());
+      non_null_collection_values->resize(desc.collection_slots().size());
+
       non_null_string_values_array = non_null_string_values->data();
-      *total_string_lengths = 0;
+      non_null_coll_vals_and_sizes_array = non_null_collection_values->data();
+
+      *total_varlen_lengths = 0;
     }
-    MaterializeExprs<COLLECT_STRING_VALS, NULL_POOL>(row, desc,
+    MaterializeExprs<COLLECT_VAR_LEN_VALS, NULL_POOL>(row, desc,
         evals.data(), pool, non_null_string_values_array,
-        total_string_lengths, &num_non_null_string_values);
-    if (COLLECT_STRING_VALS) 
non_null_string_values->resize(num_non_null_string_values);
+        non_null_coll_vals_and_sizes_array, total_varlen_lengths,
+        &num_non_null_string_values, &num_non_null_collection_values);
+    if (COLLECT_VAR_LEN_VALS) {
+      non_null_string_values->resize(num_non_null_string_values);
+      non_null_collection_values->resize(num_non_null_collection_values);
+    }
   }
 
   /// Copy the var-len string data in this tuple into the provided memory pool 
and update
@@ -211,7 +230,7 @@ class Tuple {
   static const char* MATERIALIZE_EXPRS_NULL_POOL_SYMBOL;
 
   /// Generates an IR version of MaterializeExprs(), returned in 'fn'. 
Currently only
-  /// 'collect_string_vals' = false is implemented and some arguments passed 
to the IR
+  /// 'collect_varlen_vals' = false is implemented and some arguments passed 
to the IR
   /// function are unused.
   ///
   /// If 'use_mem_pool' is true, any varlen data will be copied into the 
MemPool specified
@@ -219,7 +238,7 @@ class Tuple {
   /// be copied. There are two different MaterializeExprs symbols to 
differentiate between
   /// these cases when we replace the function calls during codegen. Please 
see comment
   /// of MaterializeExprs() for details.
-  static Status CodegenMaterializeExprs(LlvmCodeGen* codegen, bool 
collect_string_vals,
+  static Status CodegenMaterializeExprs(LlvmCodeGen* codegen, bool 
collect_varlen_vals,
       const TupleDescriptor& desc, const vector<ScalarExpr*>& 
slot_materialize_exprs,
       bool use_mem_pool, llvm::Function** fn);
 
@@ -321,13 +340,15 @@ class Tuple {
   void DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* 
offset,
       bool convert_ptrs);
 
-  /// Implementation of MaterializedExprs(). This function is replaced during
-  /// codegen. 'num_non_null_string_values' must be initialized by the caller.
-  template <bool COLLECT_STRING_VALS, bool NULL_POOL>
+  /// Implementation of MaterializedExprs(). This function is replaced during 
codegen.
+  /// 'num_non_null_string_values' and 'num_non_null_collection_values' must be
+  /// initialized by the caller.
+  template <bool COLLECT_VAR_LEN_VALS, bool NULL_POOL>
   void IR_NO_INLINE MaterializeExprs(TupleRow* row, const TupleDescriptor& 
desc,
       ScalarExprEvaluator* const* evals, MemPool* pool,
-      StringValue** non_null_string_values, int* total_string_lengths,
-      int* num_non_null_string_values);
+      StringValue** non_null_string_values, CollValueAndSize* 
non_null_collection_values,
+      int* total_varlen_lengths, int* num_non_null_string_values,
+      int* num_non_null_collection_values);
 
   /// Helper for CopyStrings() to allocate 'bytes' of memory. Returns a 
pointer to the
   /// allocated buffer on success. Otherwise an error was encountered, in 
which case NULL
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 3a4e95e86..63a8600bb 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -249,10 +249,6 @@ struct ColumnType {
   inline bool IsArrayType() const { return type == TYPE_ARRAY; }
   inline bool IsMapType() const { return type == TYPE_MAP; }
 
-  inline bool IsVarLenType() const {
-    return IsVarLenStringType() || IsCollectionType();
-  }
-
   /// Returns the byte size of this type.  Returns 0 for variable length types.
   inline int GetByteSize() const { return GetByteSize(*this); }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index fd3dce68e..edcb9c003 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1667,7 +1667,7 @@ public class Analyzer {
         // this is the sorting tuple.
         // TODO: IMPALA-10939: When we enable collections in sorting tuples we 
need to
         // revisit this. Currently collection SlotDescriptors cannot be 
created without a
-        // path.
+        // path. Maybe descriptors should have a path even in the sorting 
tuple.
         if (slotPath == null) {
           createStructTuplesAndSlotDescsWithoutPath(slotPath, structField);
         } else {
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 1893f3021..3ac0142a5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -20,12 +20,11 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.StructField;
-import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
@@ -322,10 +321,7 @@ public abstract class QueryStmt extends StatementBase {
       }
     }
 
-    for (Expr expr: resultExprs_) {
-      Preconditions.checkState(!expr.getType().containsCollection(),
-          "Sorting is not supported if the select list contains collection 
columns.");
-    }
+    checkForVarLenCollectionSorting(analyzer);
 
     sortInfo_.createSortTupleInfo(resultExprs_, analyzer);
 
@@ -348,6 +344,17 @@ public abstract class QueryStmt extends StatementBase {
     substituteResultExprs(smap, analyzer);
   }
 
+  private void checkForVarLenCollectionSorting(Analyzer analyzer)
+      throws AnalysisException {
+    for (Expr expr: getResultExprs()) {
+      Type exprType = expr.getType();
+      Optional<String> err = SortInfo.checkTypeForVarLenCollection(exprType);
+      if (err.isPresent()) {
+        throw new AnalysisException(err.get());
+      }
+    }
+  }
+
   /**
    * Substitutes top-level ordinals and aliases. Does not substitute ordinals 
and
    * aliases in subexpressions.
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java 
b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index e6d802ede..95e65c8d6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -21,8 +21,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
+import org.apache.impala.catalog.ArrayType;
+import org.apache.impala.catalog.MapType;
+import org.apache.impala.catalog.PrimitiveType;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.PlanNode;
@@ -258,7 +265,8 @@ public class SortInfo {
       }
       dstSlotDesc.setSourceExpr(srcExpr);
       SlotRef dstExpr = new SlotRef(dstSlotDesc);
-      if (dstSlotDesc.getType().isStructType() &&
+      Type dstType = dstSlotDesc.getType();
+      if (dstType.isStructType() &&
           dstSlotDesc.getItemTupleDesc() != null) {
         try {
           dstExpr.reExpandStruct(analyzer);
@@ -267,6 +275,8 @@ public class SortInfo {
           // analysed.
           Preconditions.checkNotNull(null);
         }
+      } else if (dstType.isCollectionType()) {
+        dstSlotDesc.setIsMaterializedRecursively(true);
       }
       outputSmap_.put(srcExpr.clone(), dstExpr);
       materializedExprs_.add(srcExpr);
@@ -321,4 +331,69 @@ public class SortInfo {
 
     return ProcessingCost.basicCost(label, inputCardinality, weight);
   }
+
+  // Collections with variable length data as well as any collections within 
structs are
+  // currently not allowed in the sorting tuple (see IMPALA-12019 and 
IMPALA-10939). This
+  // function checks whether the given type is allowed in the sorting tuple: 
returns an
+  // empty 'Optional' if the type is allowed, or an 'Optional' with an error 
message if it
+  // is not.
+  public static Optional<String> checkTypeForVarLenCollection(Type type) {
+    final String errorMsg = "Sorting is not supported if the select list 
contains " +
+      "(possibly nested) collections with variable length data types.";
+
+    if (type.isCollectionType()) {
+      if (type instanceof ArrayType) {
+        ArrayType arrayType = (ArrayType) type;
+        return isAllowedCollectionItemForSorting(arrayType.getItemType())
+            ? Optional.empty() : Optional.of(errorMsg);
+      } else {
+        Preconditions.checkState(type instanceof MapType);
+        MapType mapType = (MapType) type;
+
+        if (!isAllowedCollectionItemForSorting(mapType.getKeyType())) {
+          return Optional.of(errorMsg);
+        }
+
+        return isAllowedCollectionItemForSorting(mapType.getValueType())
+            ? Optional.empty() : Optional.of(errorMsg);
+      }
+    } else if (type.isStructType()) {
+      StructType structType = (StructType) type;
+      return checkStructTypeForVarLenCollection(structType);
+    }
+
+    return Optional.empty();
+  }
+
+  // Helper for checkTypeForVarLenCollection(), see more there.
+  private static Optional<String> checkStructTypeForVarLenCollection(
+      StructType structType) {
+    for (StructField field : structType.getFields()) {
+      Type fieldType = field.getType();
+      if (fieldType.isStructType()) {
+        return checkStructTypeForVarLenCollection((StructType) fieldType);
+      } else if (fieldType.isCollectionType()) {
+        // TODO IMPALA-10939: Once we allow sorting collections in structs, 
test that
+        // collections containing var-len types are handled correctly.
+        String error = "Sorting is not supported if the select list "
+            + "contains collection(s) nested in struct(s).";
+        return Optional.of(error);
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  private static boolean isAllowedCollectionItemForSorting(Type itemType) {
+    if (itemType.isStructType()) {
+      StructType structType = (StructType) itemType;
+      for (StructField field : structType.getFields()) {
+        Type fieldType = field.getType();
+        if (!isAllowedCollectionItemForSorting(fieldType)) return false;
+      }
+      return true;
+    }
+
+    return !itemType.isComplexType() && !itemType.isVarLenStringType();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 04f522e5a..e0a24a161 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TTupleDescriptor;
 
@@ -247,9 +248,10 @@ public class TupleDescriptor {
   }
 
   public void setParentSlotDesc(SlotDescriptor parent) {
-    Preconditions.checkState(parent.getType().isStructType(),
-        "Parent for a TupleDescriptor should be a STRUCT. Actual type is " +
-        parent.getType() + " Tuple ID: " + getId());
+    Type parentType = parent.getType();
+    Preconditions.checkState(parentType.isStructType() || 
parentType.isCollectionType(),
+        "Parent for a TupleDescriptor should be a STRUCT or a COLLECTION. " +
+        "Actual type is " + parentType + " Tuple ID: " + getId());
     parentStructSlot_ = parent;
   }
   public SlotDescriptor getParentSlotDesc() { return parentStructSlot_; }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java 
b/fe/src/main/java/org/apache/impala/catalog/Type.java
index ac418714e..c3935efd5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Type.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Type.java
@@ -189,7 +189,9 @@ public abstract class Type {
   public boolean isDecimal() { return isScalarType(PrimitiveType.DECIMAL); }
   public boolean isFullySpecifiedDecimal() { return false; }
   public boolean isVarchar() { return isScalarType(PrimitiveType.VARCHAR); }
+  public boolean isString() { return isScalarType(PrimitiveType.STRING); }
   public boolean isBinary() { return isScalarType(PrimitiveType.BINARY); }
+  public boolean isVarLenStringType() { return isVarchar() || isString() || 
isBinary(); }
   public boolean isWildcardDecimal() { return false; }
   public boolean isWildcardVarchar() { return false; }
   public boolean isWildcardChar() { return false; }
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 17b2d4e7f..77e0b8c7a 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.impala.analysis.AggregateInfoBase;
 import org.apache.impala.analysis.AnalyticExpr;
@@ -325,21 +326,22 @@ public class AnalyticPlanner {
     for (TupleId tid: input.getTupleIds()) {
       TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
       for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
-        if (!inputSlotDesc.isMaterialized()) continue;
-        if (inputSlotDesc.getType().isComplexType()) {
-          // Project out collection slots since they won't be used anymore and 
may cause
-          // troubles like IMPALA-8718. They won't be used since outputs of 
the analytic
-          // node must be in the select list of the block with the analytic, 
and we don't
-          // allow collection types to be returned from a select block, and 
also don't
-          // support any builtin or UDF functions that take collection types 
as an
-          // argument.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Project out collection slot in sort tuple of analytic: 
slot={}",
-                inputSlotDesc.debugString());
+        if (inputSlotDesc.isMaterialized()) {
+          // Project out collection slots that are not supported in the 
sorting tuple
+          // (collections containing var-len types).
+          Optional<String> err = SortInfo.checkTypeForVarLenCollection(
+              inputSlotDesc.getType());
+          // An empty 'Optional' result means there is no error so the type 
can be put
+          // into the sorting tuple.
+          if (!err.isPresent()) {
+            inputSlotRefs.add(new SlotRef(inputSlotDesc));
+          } else {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Project out unsupported collection slot in " +
+                  "sort tuple of analytic: slot={}", 
inputSlotDesc.debugString());
+            }
           }
-          continue;
         }
-        inputSlotRefs.add(new SlotRef(inputSlotDesc));
       }
     }
 
diff --git a/testdata/ComplexTypesTbl/simple_arrays_big.parq 
b/testdata/ComplexTypesTbl/simple_arrays_big.parq
new file mode 100644
index 000000000..5cb15885a
Binary files /dev/null and b/testdata/ComplexTypesTbl/simple_arrays_big.parq 
differ
diff --git a/testdata/data/README b/testdata/data/README
index ac22c7cf0..0d4940d10 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -940,3 +940,42 @@ create_table_like_parquet_test.parquet:
 Generated by Hive
 create table iceberg_create_table_like_parquet_test (col_int int, col_float 
float, col_double double, col_string string, col_struct struct<col_int:int, 
col_float:float>, col_array array<string>, col_map map<string,array<int>>) 
stored as parquet;
 insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", 
named_struct("col_int", 4, "col_float", cast(5.0 as float)), 
array("6","7","8"), map("A", array(11,12), "B", array(21,22)));
+
+simple_arrays_big.parq:
+Generated with RandomNestedDataGenerator.java from the following schema:
+{
+  "fields": [
+    {
+      "name": "int_col",
+      "type": "int"
+    },
+    {
+      "name": "string_col",
+      "type": [
+        "null",
+        "string"
+      ]
+    },
+    {
+      "name": "int_array",
+      "type": {
+        "type": "array",
+        "items": "int"
+      }
+    },
+    {
+      "name": "double_array",
+      "type": {
+        "type": "array",
+        "items": "double"
+      }
+    }
+  ],
+  "name": "table_0",
+  "namespace": "org.apache.impala",
+  "type": "record"
+}
+The following command was used:
+mvn -f "${IMPALA_HOME}/java/datagenerator/pom.xml" exec:java
+ -Dexec.mainClass="org.apache.impala.datagenerator.RandomNestedDataGenerator"
+ -Dexec.args="${input_table_schema}.avsc 1500000 15 '${output_file}.parquet'";
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 87f0c9b39..5c5591ae5 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3849,6 +3849,28 @@ INSERT OVERWRITE {db_name}{db_suffix}.{table_name} VALUES
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+map_non_varlen
+---- COLUMNS
+id INT
+map_int_int MAP<INT,INT>
+map_char3_char5 MAP<CHAR(3),CHAR(5)>
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE {db_name}{db_suffix}.{table_name} VALUES
+  (1, map(10, 100, 11, 110, 12, 120), map(cast("aaa" as char(3)), cast("aaaaa" 
as char(5)))),
+  (2, map(20, 200, 21, 210, 22, 220), map(cast("aab" as char(3)), cast("aaaab" 
as char(5)))),
+  (3, map(30, 300, 31, 310, 32, 320), map(cast("aac" as char(3)), cast("aaaac" 
as char(5)))),
+  (4, map(40, 400, 41, 410, 42, 420), map(cast("aad" as char(3)), cast("aaaad" 
as char(5)))),
+  (5, map(50, 500, 51, 510, 52, 520), map(cast("aae" as char(3)), cast("aaaae" 
as char(5)))),
+  (6, map(60, 600, 61, 610, 62, 620), map(cast("aaf" as char(3)), cast("aaaaf" 
as char(5)))),
+  (7, map(70, 700, 71, 710, 72, 720), map(cast("aag" as char(3)), cast("aaaag" 
as char(5)))),
+  (8, map(80, 800, 81, 810, 82, 820), map(cast("aah" as char(3)), cast("aaaah" 
as char(5)))),
+  (9, map(90, 900, 91, 910, 92, 920), map(cast("aai" as char(3)), cast("aaaai" 
as char(5)))),
+  (10, map(100, 1000, 101, 1010, 102, 1020), map(cast("aaj" as char(3)), 
cast("aaaaj" as char(5))));
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 collection_struct_mix
 ---- COLUMNS
 id INT
@@ -3975,6 +3997,22 @@ AS SELECT id, arr_contains_struct, 
arr_contains_nested_struct, struct_contains_n
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+simple_arrays_big
+---- COLUMNS
+int_col INT
+string_col STRING
+int_array ARRAY<INT>
+double_array ARRAY<DOUBLE>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/simple_arrays_big_parquet && \
+hadoop fs -put -f 
${IMPALA_HOME}/testdata/ComplexTypesTbl/simple_arrays_big.parq \
+/test-warehouse/simple_arrays_big_parquet/
+---- DEPENDENT_LOAD_ACID
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM 
functional_parquet.simple_arrays_big;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 binary_tbl
 ---- COLUMNS
 id INT
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index 7f2af5b6d..63053e67b 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -355,12 +355,18 @@ table_name:collection_tbl, constraint:restrict_to, 
table_format:orc/def/block
 # In parquet we can't have NULL map keys but in ORC we can.
 table_name:map_null_keys, constraint:restrict_to, table_format:orc/def/block
 
+table_name:map_non_varlen, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:map_non_varlen, constraint:restrict_to, table_format:orc/def/block
+
 table_name:collection_struct_mix, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:collection_struct_mix, constraint:restrict_to, 
table_format:orc/def/block
 
 table_name:collection_struct_mix_view, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:collection_struct_mix_view, constraint:restrict_to, 
table_format:orc/def/block
 
+table_name:simple_arrays_big, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:simple_arrays_big, constraint:restrict_to, 
table_format:orc/def/block
+
 table_name:complextypes_maps_view, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:complextypes_maps_view, constraint:restrict_to, 
table_format:orc/def/block
 
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/mixed-collections-and-structs.test
 
b/testdata/workloads/functional-query/queries/QueryTest/mixed-collections-and-structs.test
index ff6c50011..56ba1c5d1 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/mixed-collections-and-structs.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/mixed-collections-and-structs.test
@@ -266,20 +266,6 @@ select id, a.item, a.item.inner_struct, a.item.small from 
sub,
 INT,STRING,STRING,SMALLINT
 ====
 ---- QUERY
-# Sorting is not supported yet for collections: IMPALA-10939. Test with a 
struct that
-# contains an array.
-select id, struct_contains_arr from collection_struct_mix order by id
----- CATCH
-IllegalStateException: Sorting is not supported if the select list contains 
collection columns.
-====
----- QUERY
-# Sorting is not supported yet for collections: IMPALA-10939. Test with a 
struct that
-# contains a map.
-select id, struct_contains_map from collection_struct_mix order by id;
----- CATCH
-IllegalStateException: Sorting is not supported if the select list contains 
collection columns.
-====
----- QUERY
 # Zipping unnest an array that contains a struct.
 select unnest(arr_contains_struct) from collection_struct_mix;
 ---- CATCH
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
 
b/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
index 29d58b83e..c8db66cb9 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/nested-array-in-select-list.test
@@ -35,12 +35,6 @@ select id, int_array, int_array_array from complextypestbl
 bigint,string,string
 ====
 ---- QUERY
-# Sorting is not supported yet for arrays: IMPALA-10939
-select id, int_array_array, int_array from complextypestbl order by id
----- CATCH
-IllegalStateException: Sorting is not supported if the select list contains 
collection columns.
-====
----- QUERY
 # Same collection used twice in a select list.
 select id, int_array, int_array from complextypestbl
 ---- RESULTS
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
 
b/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
index d834633c9..7e4312a27 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/nested-map-in-select-list.test
@@ -35,12 +35,6 @@ select id, int_map, int_map_array from complextypestbl
 bigint,string,string
 ====
 ---- QUERY
-# Sorting is not supported yet for collections: IMPALA-10939
-select id, int_map_array, int_map from complextypestbl order by id
----- CATCH
-IllegalStateException: Sorting is not supported if the select list contains 
collection columns.
-====
----- QUERY
 # Same collection used twice in a select list.
 select id, int_map, int_map from complextypestbl
 ---- RESULTS
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
 
b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
new file mode 100644
index 000000000..9f08bfb7c
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/partitioned-top-n-complex.test
@@ -0,0 +1,50 @@
+====
+---- QUERY
+# In-memory partitioned top-N containing a collection, with some partitions 
that hit
+# limit.
+with joined as (
+  select a.*, b.int_array from alltypesagg a left join complextypestbl b on 
a.tinyint_col = b.id
+),
+v as (
+  select tinyint_col, id, int_array, row_number() over (partition by 
tinyint_col order by id) as rn
+  from joined where id % 777 = 0 or id % 10 = 7)
+select tinyint_col, id, int_array, rn from v
+where rn <= 5
+order by tinyint_col, rn
+---- RESULTS
+1,2331,'[1,2,3]',1
+2,4662,'[null,1,2,null,3,null]',1
+3,6993,'[]',1
+4,1554,'NULL',1
+4,9324,'NULL',2
+5,3885,'NULL',1
+6,6216,'NULL',1
+7,7,'NULL',1
+7,17,'NULL',2
+7,27,'NULL',3
+7,37,'NULL',4
+7,47,'NULL',5
+8,3108,'[-1]',1
+9,5439,'NULL',1
+NULL,0,'NULL',1
+NULL,0,'NULL',2
+NULL,7770,'NULL',3
+NULL,7770,'NULL',4
+---- TYPES
+TINYINT, INT, STRING, BIGINT
+====
+---- QUERY
+# Sorting is not supported yet when the sorting tuple contains collections 
containing
+# varlen types: IMPALA-10939.
+with joined as (
+  select a.*, b.int_array, b.int_array_array from alltypesagg a left join 
complextypestbl b on a.tinyint_col = b.id
+),
+v as (
+  select tinyint_col, id, int_array, int_array_array, row_number() over 
(partition by tinyint_col order by id) as rn
+  from joined where id % 777 = 0 or id % 10 = 7)
+select tinyint_col, id, int_array, int_array_array, rn from v
+where rn <= 5
+order by tinyint_col, rn
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test 
b/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test
new file mode 100644
index 000000000..634d6b85e
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/sort-complex.test
@@ -0,0 +1,205 @@
+====
+---- QUERY
+# Sort a collection.
+select id, int_array from complextypestbl order by id
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'[-1]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort collection from HMS view.
+select id, int_array from complextypes_arrays_only_view order by id
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'[-1]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort collection from WITH-clause inline view.
+with v as (
+  select id, int_array from complextypestbl
+)
+select id, int_array from v order by id
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'[-1]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort collection from nested query inline view.
+select id, int_array
+from (select id, int_array from complextypestbl) v
+order by id
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'[-1]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort a collection that is join-unnested.
+select id, a.item
+from complextypestbl t, t.int_array_array a
+order by id;
+---- RESULTS
+1,'[1,2]'
+1,'[3,4]'
+2,'[null,1,2,null]'
+2,'[3,null,4]'
+2,'[]'
+2,'NULL'
+3,'NULL'
+7,'NULL'
+7,'[5,6]'
+8,'[-1,-2]'
+8,'[]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort a collection that is join-unnested in a WITH-clause inline view.
+with v as (
+  select id, a.item arr
+  from complextypestbl t, t.int_array_array a
+)
+select id, arr from v order by id;
+---- RESULTS
+1,'[1,2]'
+1,'[3,4]'
+2,'[null,1,2,null]'
+2,'[3,null,4]'
+2,'[]'
+2,'NULL'
+3,'NULL'
+7,'NULL'
+7,'[5,6]'
+8,'[-1,-2]'
+8,'[]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort a collection that is join-unnested in a nested query inline view.
+select id, arr
+from (
+    select id, a.item arr
+    from complextypestbl t, t.int_array_array a
+  ) v
+order by id;
+---- RESULTS
+1,'[1,2]'
+1,'[3,4]'
+2,'[null,1,2,null]'
+2,'[3,null,4]'
+2,'[]'
+2,'NULL'
+3,'NULL'
+7,'NULL'
+7,'[5,6]'
+8,'[-1,-2]'
+8,'[]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sorting is not supported yet for arrays containing varlen types: IMPALA-10939
+select id, arr_string_1d from collection_tbl order by id
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
+---- QUERY
+# Sorting is not supported yet for arrays containing varlen types: IMPALA-10939
+select id, int_array_array from complextypestbl order by id
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
+---- QUERY
+# Being in the sorting tuple is supported for maps containing only fixed 
length types:
+# IMPALA-10939
+select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc
+---- RESULTS
+10,'{100:1000,101:1010,102:1020}','{"aaj":"aaaaj"}'
+9,'{90:900,91:910,92:920}','{"aai":"aaaai"}'
+8,'{80:800,81:810,82:820}','{"aah":"aaaah"}'
+7,'{70:700,71:710,72:720}','{"aag":"aaaag"}'
+6,'{60:600,61:610,62:620}','{"aaf":"aaaaf"}'
+5,'{50:500,51:510,52:520}','{"aae":"aaaae"}'
+4,'{40:400,41:410,42:420}','{"aad":"aaaad"}'
+3,'{30:300,31:310,32:320}','{"aac":"aaaac"}'
+2,'{20:200,21:210,22:220}','{"aab":"aaaab"}'
+1,'{10:100,11:110,12:120}','{"aaa":"aaaaa"}'
+---- TYPES
+int,string,string
+====
+---- QUERY
+# Sorting is not supported yet for collections containing varlen types: 
IMPALA-10939
+select id, int_map_array, int_map from complextypestbl order by id
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
+---- QUERY
+# Sorting is not supported yet for collections within structs: IMPALA-10939. 
Test with a
+# struct that contains an array.
+select id, struct_contains_arr from collection_struct_mix order by id
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
collection(s) nested in struct(s).
+====
+---- QUERY
+# Sorting is not supported yet for collections within structs: IMPALA-10939. 
Test with a
+# struct that contains a map.
+select id, struct_contains_map from collection_struct_mix order by id;
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
collection(s) nested in struct(s).
+====
+---- QUERY
+# Sort collection selected from within a struct.
+select id, struct_contains_arr.arr from collection_struct_mix order by id desc;
+---- RESULTS
+2,'NULL'
+1,'[1,2,3,4,null,null,5]'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Sort collection containing a struct.
+select id, arr_contains_struct from collection_struct_mix order by id desc;
+---- RESULTS
+2,'[{"i":100},{"i":8},{"i":35},{"i":45},null,{"i":193},{"i":null}]'
+1,'[{"i":1},{"i":2},{"i":3},{"i":4},null,{"i":5},{"i":null}]'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Sorting a collection containing a struct that contains var-len data is not 
supported.
+select id, arr_contains_nested_struct from collection_struct_mix order by id 
desc;
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test 
b/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test
new file mode 100644
index 000000000..556e37859
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/top-n-complex.test
@@ -0,0 +1,66 @@
+====
+---- QUERY
+# Sort a collection with limit.
+select id, int_array from complextypestbl order by id limit 5
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort collection from nested query inline view with limit.
+select id, int_array
+from (select id, int_array from complextypestbl) v
+order by id limit 5
+---- RESULTS
+1,'[1,2,3]'
+2,'[null,1,2,null,3,null]'
+3,'[]'
+4,'NULL'
+5,'NULL'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sort a collection that is join-unnested in a nested query inline view with 
limit.
+select id, arr
+from (
+    select id, a.item arr
+    from complextypestbl t, t.int_array_array a
+  ) v
+order by id limit 2;
+---- RESULTS
+1,'[1,2]'
+1,'[3,4]'
+---- TYPES
+bigint,string
+====
+---- QUERY
+# Sorting with limit is not supported yet for arrays containing varlen types: 
IMPALA-10939
+select id, arr_string_1d from collection_tbl order by id limit 2;
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
+---- QUERY
+# Sorting with limit is not supported yet for arrays containing varlen types: 
IMPALA-10939
+select id, int_array_array from complextypestbl order by id limit 2;
+---- CATCH
+AnalysisException: Sorting is not supported if the select list contains 
(possibly nested) collections with variable length data types.
+====
+---- QUERY
+# Being in the sorting tuple is supported for maps containing only fixed 
length types,
+# also with limit:
+# IMPALA-10939
+select id, map_int_int, map_char3_char5 from map_non_varlen order by id desc 
limit 4
+---- RESULTS
+10,'{100:1000,101:1010,102:1020}','{"aaj":"aaaaj"}'
+9,'{90:900,91:910,92:920}','{"aai":"aaaai"}'
+8,'{80:800,81:810,82:820}','{"aah":"aaaah"}'
+7,'{70:700,71:710,72:720}','{"aag":"aaaag"}'
+---- TYPES
+int,string,string
+====
diff --git a/tests/query_test/test_nested_types.py 
b/tests/query_test/test_nested_types.py
index b2dca14a9..17d924f48 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -168,7 +168,11 @@ class TestNestedCollectionsInSelectList(ImpalaTestSuite):
         ImpalaTestDimension('mt_dop', 0, 2))
     cls.ImpalaTestMatrix.add_dimension(
         create_exec_option_dimension_from_dict({
-            'disable_codegen': ['False', 'True']}))
+            'disable_codegen': ['False', 'True'],
+            # The below two options are set to prevent the planner from 
disabling codegen
+            # because of the small data size even when 'disable_codegen' is 
False.
+            'disable_codegen_rows_threshold': [0],
+            'exec_single_node_rows_threshold': [0]}))
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 
0, 1))
     cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
@@ -204,7 +208,11 @@ class 
TestMixedCollectionsAndStructsInSelectList(ImpalaTestSuite):
         ImpalaTestDimension('mt_dop', 0, 2))
     cls.ImpalaTestMatrix.add_dimension(
         create_exec_option_dimension_from_dict({
-            'disable_codegen': ['False', 'True']}))
+            'disable_codegen': ['False', 'True'],
+            # The below two options are set to prevent the planner from 
disabling codegen
+            # because of the small data size even when 'disable_codegen' is 
False.
+            'disable_codegen_rows_threshold': [0],
+            'exec_single_node_rows_threshold': [0]}))
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 
0, 1))
     cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 2b9ee01bf..8e7a1556c 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -105,11 +105,15 @@ class TestQueries(ImpalaTestSuite):
     self.run_test_case('QueryTest/limit', vector)
 
   def test_top_n(self, vector):
-    if vector.get_value('table_format').file_format == 'hbase':
+    file_format = vector.get_value('table_format').file_format
+    if file_format == 'hbase':
       pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent 
results")
     # QueryTest/top-n is also run in test_sort with disable_outermost_topn = 1
     self.run_test_case('QueryTest/top-n', vector)
 
+    if file_format in ['parquet', 'orc']:
+      self.run_test_case('QueryTest/top-n-complex', vector)
+
   def test_union(self, vector):
     self.run_test_case('QueryTest/union', vector)
     # IMPALA-3586: The passthrough and materialized children are interleaved. 
The batch
@@ -133,7 +137,8 @@ class TestQueries(ImpalaTestSuite):
     self.run_test_case('QueryTest/except', vector)
 
   def test_sort(self, vector):
-    if vector.get_value('table_format').file_format == 'hbase':
+    file_format = vector.get_value('table_format').file_format
+    if file_format == 'hbase':
       pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent 
results")
     vector.get_value('exec_option')['disable_outermost_topn'] = 1
     vector.get_value('exec_option')['analytic_rank_pushdown_threshold'] = 0
@@ -141,9 +146,15 @@ class TestQueries(ImpalaTestSuite):
     # We can get the sort tests for free from the top-n file
     self.run_test_case('QueryTest/top-n', vector)
 
+    if file_format in ['parquet', 'orc']:
+      self.run_test_case('QueryTest/sort-complex', vector)
+      self.run_test_case('QueryTest/top-n-complex', vector)
+
   def test_partitioned_top_n(self, vector):
     """Test partitioned Top-N operator."""
     self.run_test_case('QueryTest/partitioned-top-n', vector)
+    if vector.get_value('table_format').file_format in ['parquet', 'orc']:
+      self.run_test_case('QueryTest/partitioned-top-n-complex', vector)
 
   def test_inline_view(self, vector):
     if vector.get_value('table_format').file_format == 'hbase':
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index dca3ec35a..e536b0134 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -23,15 +23,19 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
 
 
+def split_result_rows(result):
+  """Split result rows by tab to produce a list of lists. i.e.
+     [[a1,a2], [b1, b2], [c1, c2]]"""
+  return [row.split('\t') for row in result]
+
+
 def transpose_results(result, map_fn=lambda x: x):
   """Given a query result (list of strings, each string represents a row), 
return a list
      of columns, where each column is a list of strings. Optionally, map_fn 
can be
      provided to be applied to every value, eg. to convert the strings to their
      underlying types."""
 
-  # Split result rows by tab to produce a list of lists. i.e.
-  # [[a1,a2], [b1, b2], [c1, c2]]
-  split_result = [row.split('\t') for row in result]
+  split_result = split_result_rows(result)
   column_result = []
   for col in zip(*split_result):
     # col is the transposed result, i.e. a1, b1, c1
@@ -276,3 +280,39 @@ class TestPartialSort(ImpalaTestSuite):
     result = self.execute_query(
         "insert into %s select string_col from functional.alltypessmall" % 
table_name)
     assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
+
+
+class TestArraySort(ImpalaTestSuite):
+  """Tests where there are arrays in the sorting tuple."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestArraySort, cls).add_test_dimensions()
+
+    # The table we use is a parquet table.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet')
+
+  def test_simple_arrays(self, vector):
+    """Test arrays that do not contain var-len data."""
+    query = """select string_col, int_array, double_array
+         from functional_parquet.simple_arrays_big order by string_col;"""
+
+    exec_option = copy(vector.get_value('exec_option'))
+    exec_option['disable_outermost_topn'] = 1
+    exec_option['num_nodes'] = 1
+    exec_option['buffer_pool_limit'] = '28m'
+    table_format = vector.get_value('table_format')
+
+    query_result = self.execute_query(query, exec_option, 
table_format=table_format)
+    assert "SpilledRuns: 2" in query_result.runtime_profile
+
+    # Split result rows (strings) into columns.
+    result = split_result_rows(query_result.data)
+    # Sort the result rows according to the first column.
+    sorted_result = sorted(result, key=lambda row: row[0])
+    assert(result == sorted_result)

Reply via email to