This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6f7e614 IMPALA-14472: Add create/read support for ARRAY column of 
Kudu
6b6f7e614 is described below

commit 6b6f7e614d91e1b27411cb4a71ff6a79afac5b70
Author: Xuebin Su <[email protected]>
AuthorDate: Wed Sep 17 17:06:39 2025 +0800

    IMPALA-14472: Add create/read support for ARRAY column of Kudu
    
    Initial implementation of KUDU-1261 (array column type) recently merged
    in upstream Apache Kudu repository. This patch add initial Impala
    support for working with Kudu tables having array type columns.
    
    Unlike rows, the elements of a Kudu array are stored in a different
    format than Impala. Instead of per-row bit flag for NULL info, values
    and NULL bits are stored in separate arrays.
    
    The following types of queries are not supported in this patch:
    - (IMPALA-14538) Queries that reference an array column as a table, e.g.
      ```sql
      SELECT item FROM kudu_array.array_int;
      ```
    - (IMPALA-14539) Queries that create duplicate collection slots, e.g.
      ```sql
      SELECT array_int FROM kudu_array AS t, t.array_int AS unnested;
      ```
    
    Testing:
    - Add some FE tests in AnalyzeDDLTest and AnalyzeKuduDDLTest.
    - Add EE test test_kudu.py::TestKuduArray.
      Since Impala does not support inserting complex types, including
      array, the data insertion part of the test is achieved through
      custom C++ code kudu-array-inserter.cc that insert into Kudu via
      Kudu C++ client. It would be great if we could migrate it to Python so
      that it can be moved to the same file as the test (IMPALA-14537).
    - Pass core tests.
    
    Co-authored-by: Riza Suminto
    
    Change-Id: I9282aac821bd30668189f84b2ed8fff7047e7310
    Reviewed-on: http://gerrit.cloudera.org:8080/23493
    Reviewed-by: Alexey Serbin <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/kudu/CMakeLists.txt                    |   6 +
 be/src/exec/kudu/kudu-array-inserter.cc            | 192 ++++++++++++++++
 be/src/exec/kudu/kudu-scanner.cc                   | 221 ++++++++++++++----
 be/src/exec/kudu/kudu-scanner.h                    |  21 ++
 be/src/exec/kudu/kudu-util-ir.cc                   |  12 +
 be/src/exec/kudu/kudu-util.h                       |   4 +
 be/src/runtime/raw-value.h                         |   2 +-
 bin/jenkins/dockerized-impala-run-tests.sh         |   3 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  10 +-
 .../org/apache/impala/analysis/FromClause.java     |  11 +
 .../org/apache/impala/analysis/SlotDescriptor.java |   6 +-
 .../apache/impala/analysis/TupleDescriptor.java    |  14 +-
 .../java/org/apache/impala/catalog/KuduColumn.java |   4 +-
 .../org/apache/impala/planner/KuduScanNode.java    |   3 +-
 .../impala/service/KuduCatalogOpExecutor.java      |  31 ++-
 .../main/java/org/apache/impala/util/KuduUtil.java |  62 +++--
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   8 +-
 .../apache/impala/analysis/AnalyzeKuduDDLTest.java |  16 +-
 .../queries/QueryTest/kudu_create.test             |   5 +
 tests/query_test/test_kudu.py                      | 249 +++++++++++++++++++++
 20 files changed, 784 insertions(+), 96 deletions(-)

diff --git a/be/src/exec/kudu/CMakeLists.txt b/be/src/exec/kudu/CMakeLists.txt
index 72cebf431..aff3cb343 100644
--- a/be/src/exec/kudu/CMakeLists.txt
+++ b/be/src/exec/kudu/CMakeLists.txt
@@ -37,3 +37,9 @@ add_library(ExecKudu
 )
 
 add_dependencies(ExecKudu gen-deps)
+
+# kudu-array-inserter is used in tests/query_test/test_kudu.py::TestKuduArray.
+# TODO(IMPALA-14537): Implement kudu-array-inserter using Kudu's Python API.
+add_executable(kudu-array-inserter kudu-array-inserter.cc)
+target_link_libraries(kudu-array-inserter ${IMPALA_LINK_LIBS})
+
diff --git a/be/src/exec/kudu/kudu-array-inserter.cc 
b/be/src/exec/kudu/kudu-array-inserter.cc
new file mode 100644
index 000000000..cac2b73ca
--- /dev/null
+++ b/be/src/exec/kudu/kudu-array-inserter.cc
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <limits>
+#include <memory>
+
+#include <kudu/client/client.h>
+#include <kudu/client/write_op.h>
+
+#include "gutil/stl_util.h"
+#include "util/kudu-status-util.h"
+
+using kudu::Slice;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduError;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using std::cerr;
+using std::endl;
+
+namespace impala {
+
+// Utility program to facilitate complex column insertion into Kudu table.
+// This script specifically tailored to insert into Kudu table with following 
columns:
+// (
+//   id TINYINT PRIMARY KEY,
+//   array_INT ARRAY<INT>,
+//   array_TIMESTAMP ARRAY<TIMESTAMP>,
+//   array_VARCHAR ARRAY<VARCHAR(1)>,
+//   array_DECIMAL ARRAY<DECIMAL(18,18)>,
+//   array_DOUBLE ARRAY<DOUBLE>,
+//   array_BINARY ARRAY<BINARY>,
+//   array_BOOLEAN ARRAY<BOOLEAN>
+// )
+//
+// The destination table must be empty before this program run.
+
+// Same as in tests/conftest.py
+constexpr const char* KUDU_MASTER_DEFAULT_ADDR = "localhost:7051";
+const char* KUDU_TEST_TABLE_NAME;
+
+const vector<int32_t> INT32_ARRAY = {
+    std::numeric_limits<int32_t>::lowest(), -1, 
std::numeric_limits<int32_t>::max()};
+const vector<int64_t> TIMESTAMP_ARRAY = {
+    -17987443200000000, // See MIN_DATE_AS_UNIX_TIME in 
be/src/runtime/timestamp-test.cc
+    -1L,
+    253402300799999999, // See MAX_DATE_AS_UNIX_TIME in 
be/src/runtime/timestamp-test.cc
+};
+// To test multi-byte characters.
+const vector<Slice> UTF8_ARRAY = {u8"Σ", u8"π", u8"λ"};
+const vector<int64_t> DECIMAL18_ARRAY = {
+    -999'999'999'999'999'999, // 18 digits
+    -1L,
+    999'999'999'999'999'999, // 18 digits
+};
+// See StringParser::StringToFloatInternal() for how the special values are 
generated.
+const vector<double> DOUBLE_ARRAY = {
+    -std::numeric_limits<double>::infinity(),
+    -std::numeric_limits<double>::quiet_NaN(),
+    std::numeric_limits<double>::infinity()
+};
+const vector<bool> BOOL_ARRAY = {true, false, true};
+
+// 'id' starts from 0, same as Python's range().
+int id = 0;
+
+kudu::Status KuduInsertNulls(
+    const shared_ptr<KuduSession>& session, const shared_ptr<KuduTable>& 
table) {
+  std::unique_ptr<KuduInsert> insert(table->NewInsert());
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_int"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_timestamp"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_varchar"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_decimal"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_double"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_binary"));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_boolean"));
+  KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
+  ++id;
+  return kudu::Status::OK();
+}
+
+// Generates a vector whose length is the same as 'non_null' using the data in 
'array'.
+template <typename T>
+vector<T> repeat(const vector<T>& array, const vector<bool>& non_null) {
+  vector<T> result;
+  result.reserve(non_null.size());
+  for (size_t i = 0UL; i < non_null.size(); ++i) {
+    result.push_back(array[i % array.size()]);
+  }
+  return result;
+}
+
+kudu::Status KuduInsertArrays(const shared_ptr<KuduSession>& session,
+    const shared_ptr<KuduTable>& table, const vector<bool>& non_null) {
+  std::unique_ptr<KuduInsert> insert(table->NewInsert());
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayInt32(
+      "array_int", repeat(INT32_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnixTimeMicros(
+      "array_timestamp", repeat(TIMESTAMP_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayVarchar(
+      "array_varchar", repeat(UTF8_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnscaledDecimal(
+      "array_decimal", repeat(DECIMAL18_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayDouble(
+      "array_double", repeat(DOUBLE_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBinary(
+      "array_binary", repeat(UTF8_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBool(
+      "array_boolean", repeat(BOOL_ARRAY, non_null), non_null));
+  KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
+  ++id;
+  return kudu::Status::OK();
+}
+
+kudu::Status RunKuduArrayInsert() {
+  shared_ptr<KuduClient> client;
+  // Connect to the cluster.
+  KUDU_RETURN_NOT_OK(KuduClientBuilder()
+          .add_master_server_addr(KUDU_MASTER_DEFAULT_ADDR)
+          .Build(&client));
+  shared_ptr<KuduTable> table;
+  KUDU_RETURN_NOT_OK(client->OpenTable(KUDU_TEST_TABLE_NAME, &table));
+
+  shared_ptr<KuduSession> session = client->NewSession();
+  KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // The array slot is NULL.
+  KUDU_RETURN_NOT_OK(KuduInsertNulls(session, table));
+
+  // The array is not empty and no element is NULL.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true}));
+
+  // The array is empty.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {}));
+
+  // Array element at the start is NULL.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {false, true, true}));
+
+  // Array element at the middle is NULL.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, false, true}));
+
+  // Array element at the end is NULL.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, false}));
+
+  // The array is longer than those in the previous rows.
+  KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true, true, 
true}));
+
+  kudu::Status status = session->Flush();
+  if (status.ok()) return status;
+  vector<KuduError*> errors;
+  ElementDeleter drop(&errors);
+  bool overflowed;
+  session->GetPendingErrors(&errors, &overflowed);
+  for (const KuduError* error : errors) {
+    cerr << "Error: " << error->status().ToString() << endl;
+  }
+  return status;
+}
+} // namespace impala
+
+int main(int argc, char** argv) {
+  // Example usage:
+  //   kudu-array-inserter impala::functional_kudu.kudu_array
+  assert(argc == 2);
+  impala::KUDU_TEST_TABLE_NAME = argv[1];
+  kudu::Status status = impala::RunKuduArrayInsert();
+  if (!status.ok()) {
+    cerr << "Error: " << status.ToString() << endl;
+    return 1;
+  }
+  return 0;
+}
diff --git a/be/src/exec/kudu/kudu-scanner.cc b/be/src/exec/kudu/kudu-scanner.cc
index a7f24b872..a6e108cb7 100644
--- a/be/src/exec/kudu/kudu-scanner.cc
+++ b/be/src/exec/kudu/kudu-scanner.cc
@@ -33,9 +33,11 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/slot-ref.h"
 #include "gutil/strings/substitute.h"
+#include "kudu/util/bitmap.h"
 #include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/slice.h"
+#include "runtime/collection-value.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -52,6 +54,7 @@
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 
+using kudu::client::KuduArrayCellView;
 using kudu::client::KuduClient;
 using kudu::client::KuduPredicate;
 using kudu::client::KuduScanBatch;
@@ -89,6 +92,23 @@ Status KuduScanner::Open() {
       varchar_slots_.push_back(slot);
     }
   }
+  // Precompute the element byte size for each array slot since it is stable 
for the
+  // whole column.
+  kudu_array_element_byte_sizes_.assign(
+      scan_node_->tuple_desc()->collection_slots().size(), 0);
+  for (int i = 0; i < scan_node_->tuple_desc()->collection_slots().size(); 
++i) {
+    auto slot = scan_node_->tuple_desc()->collection_slots()[i];
+    // Check the slot type.
+    DCHECK(slot->type().IsArrayType());
+    DCHECK_NE(slot->children_tuple_descriptor(), nullptr);
+    // If the children tuple descriptor contains no slots, we don't need to 
materialize
+    // the elements.
+    if (slot->children_tuple_descriptor()->slots().size() > 0) {
+      DCHECK_EQ(slot->children_tuple_descriptor()->slots().size(), 1);
+      SlotDescriptor* item_slot = 
slot->children_tuple_descriptor()->slots().front();
+      kudu_array_element_byte_sizes_[i] = GetKuduArrayElementSize(item_slot);
+    }
+  }
   return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
       expr_results_pool_.get(), scan_node_->conjunct_evals(), 
&conjunct_evals_);
 }
@@ -370,6 +390,142 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* 
row_batch) {
   return Status::OK();
 }
 
+// Kudu tuples containing TIMESTAMP columns (UNIXTIME_MICROS in Kudu, stored 
as an
+// int64) have 8 bytes of padding following the timestamp. Because this 
padding is
+// provided, Impala can convert these unixtime values to Impala's 
TimestampValue
+// format in place and copy the rows to Impala row batches.
+// TODO: avoid mem copies with a Kudu mem 'release' mechanism, attaching mem 
to the
+// batch.
+// TODO: consider codegen for this per-timestamp col fixup
+Status KuduScanner::ConvertTimestampFromKudu(
+    Tuple* kudu_tuple, const SlotDescriptor* slot) {
+  DCHECK(slot->type().type == TYPE_TIMESTAMP);
+  if (slot->is_nullable() && 
kudu_tuple->IsNull(slot->null_indicator_offset())) {
+    return Status::OK();
+  }
+  int64_t ts_micros =
+      *reinterpret_cast<int64_t*>(kudu_tuple->GetSlot(slot->tuple_offset()));
+
+  TimestampValue tv;
+  if (state_->query_options().convert_kudu_utc_timestamps) {
+    tv = TimestampValue::FromUnixTimeMicros(ts_micros, 
state_->local_time_zone());
+  } else {
+    tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
+  }
+
+  if (tv.HasDateAndTime()) {
+    RawValue::WriteNonNull<false>(&tv, kudu_tuple, slot, nullptr, nullptr, 
nullptr);
+    return Status::OK();
+  }
+
+  kudu_tuple->SetNull(slot->null_indicator_offset());
+  return Status(ErrorMsg::Init(TErrorCode::KUDU_TIMESTAMP_OUT_OF_RANGE,
+      scan_node_->table_desc()->table_name(),
+      scanner_->GetKuduTable()->schema().Column(slot->col_pos()).name()));
+}
+
+// Kudu tuples containing VARCHAR columns use characters instead of bytes to 
limit
+// the length. In the case of ASCII values there is no difference. However, if
+// multi-byte characters are written to Kudu the length could be longer than 
allowed.
+// This checks the actual length and truncates the value length if it is too 
long.
+// TODO(IMPALA-5675): Remove this when Impala supports UTF-8 character VARCHAR 
length.
+Status KuduScanner::ConvertVarcharFromKudu(
+    Tuple* kudu_tuple, const SlotDescriptor* slot) {
+  DCHECK(slot->type().type == TYPE_VARCHAR);
+  if (slot->is_nullable() && 
kudu_tuple->IsNull(slot->null_indicator_offset())) {
+    return Status::OK();
+  }
+  StringValue* sv =
+      
reinterpret_cast<StringValue*>(kudu_tuple->GetSlot(slot->tuple_offset()));
+  int src_len = sv->Len();
+  int dst_len = slot->type().len;
+  if (src_len > dst_len) {
+    sv->SetLen(dst_len);
+  }
+  return Status::OK();
+}
+
+Status KuduScanner::ConvertArrayFromKudu(Tuple* kudu_tuple, const 
SlotDescriptor* slot,
+    MemRange& buffer, MemPool* item_tuple_mem_pool, size_t 
kudu_array_element_byte_size) {
+  // Check if the slot is NULL.
+  if (slot->is_nullable() && 
kudu_tuple->IsNull(slot->null_indicator_offset())) {
+    return Status::OK();
+  }
+  // Check the Kudu column type.
+  const auto& kudu_column = 
scanner_->GetKuduTable()->schema().Column(slot->col_pos());
+  if (UNLIKELY(kudu_column.nested_type() == nullptr
+          || kudu_column.nested_type()->array() == nullptr)) {
+    return Status(Substitute("Kudu table '$0' column '$1' is not an ARRAY 
column.",
+        scan_node_->table_desc()->table_name(), kudu_column.name()));
+  }
+  // The slot is not NULL. Get the array value.
+  auto slice = 
reinterpret_cast<kudu::Slice*>(kudu_tuple->GetSlot(slot->tuple_offset()));
+  KuduArrayCellView kudu_array(slice->data(), slice->size());
+  RETURN_IF_ERROR(FromKuduStatus(kudu_array.Init()));
+  if (UNLIKELY(kudu_array.elem_num() > INT_MAX)) {
+    return Status(
+        Substitute("Kudu array length in table '$0' column '$1' is out of 
limit.",
+            scan_node_->table_desc()->table_name(), kudu_column.name()));
+  }
+  CollectionValue result;
+  result.num_tuples = kudu_array.elem_num();
+  // The data pointer is valid only when the array is not empty.
+  // If the children tuple descriptor contains no slots, we don't need to 
materialize
+  // the elements.
+  if (kudu_array.elem_num() > 0
+      && slot->children_tuple_descriptor()->slots().size() > 0) {
+    int64_t total_tuple_byte_size =
+        slot->children_tuple_descriptor()->byte_size() * kudu_array.elem_num();
+    // buffer.len() is 0 initially before the buffer is allocated.
+    if (UNLIKELY(buffer.len() < total_tuple_byte_size)) {
+      buffer = MemRange(
+          item_tuple_mem_pool->TryAllocate(total_tuple_byte_size), 
total_tuple_byte_size);
+    }
+    if (UNLIKELY(buffer.data() == nullptr)) {
+      return Status(Substitute(
+          "Could not allocate memory when reading Kudu ARRAY in table '$0' 
column '$1'",
+          scan_node_->table_desc()->table_name(), kudu_column.name()));
+    }
+    memset(buffer.data(), 0, total_tuple_byte_size);
+    result.ptr = buffer.data();
+
+    // Check the element type.
+    DCHECK_EQ(slot->children_tuple_descriptor()->slots().size(), 1);
+    const SlotDescriptor* item_slot = 
slot->children_tuple_descriptor()->slots().front();
+    DCHECK_NE(item_slot, nullptr);
+    DCHECK(!item_slot->type().IsComplexType());
+    const auto kudu_elem_type = kudu_column.nested_type()->array()->type();
+    DCHECK_EQ(KuduDataTypeToColumnType(kudu_elem_type, 
kudu_column.type_attributes()),
+        item_slot->type());
+    // Get the data pointer to access the elements.
+    auto kudu_array_data = reinterpret_cast<const uint8_t*>(
+        kudu_array.data(kudu_elem_type, kudu_column.type_attributes()));
+    DCHECK_NE(kudu_array_data, nullptr);
+    DCHECK_GT(kudu_array_element_byte_size, 0);
+    const bool no_null_element = !kudu_array.has_nulls();
+    for (int i = 0; i < result.num_tuples; ++i) {
+      Tuple* item_tuple = reinterpret_cast<Tuple*>(
+          result.ptr + i * slot->children_tuple_descriptor()->byte_size());
+      // The 'not_null_bitmap()' is valid only when 'has_nulls()' returns true.
+      if (no_null_element || kudu::BitmapTest(kudu_array.not_null_bitmap(), 
i)) {
+        memcpy(item_tuple, kudu_array_data + i * kudu_array_element_byte_size,
+            kudu_array_element_byte_size);
+        if (item_slot->type().type == TYPE_TIMESTAMP) {
+          RETURN_IF_ERROR(ConvertTimestampFromKudu(item_tuple, item_slot));
+        } else if (item_slot->type().type == TYPE_VARCHAR) {
+          RETURN_IF_ERROR(ConvertVarcharFromKudu(item_tuple, item_slot));
+        }
+      } else {
+        item_tuple->SetNull(item_slot->null_indicator_offset());
+      }
+    }
+  }
+  // Copy the result CollectionValue to the slot.
+  slice->clear();
+  *reinterpret_cast<CollectionValue*>(slice) = result;
+  return Status::OK();
+}
+
 Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** 
tuple_mem) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
   // Short-circuit for empty projection cases.
@@ -382,61 +538,39 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
   bool has_conjuncts = !conjunct_evals_.empty();
   int num_rows = cur_kudu_batch_.NumRows();
 
+  // MemPool for the item tuples if the result tuple contains collection slots.
+  MemPool item_tuple_mem_pool(scan_node_->mem_tracker());
+  // Buffer to hold the item tuples for each collection slot.
+  vector<MemRange> item_tuple_buffers(
+      scan_node_->tuple_desc()->collection_slots().size(), MemRange::null());
+
   for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; 
++krow_idx) {
     Tuple* kudu_tuple = const_cast<Tuple*>(
         reinterpret_cast<const Tuple*>(cur_kudu_batch_.direct_data().data()
             + (krow_idx * scan_node_->row_desc()->GetRowSize())));
     ++cur_kudu_batch_num_read_;
 
-    // Kudu tuples containing TIMESTAMP columns (UNIXTIME_MICROS in Kudu, 
stored as an
-    // int64) have 8 bytes of padding following the timestamp. Because this 
padding is
-    // provided, Impala can convert these unixtime values to Impala's 
TimestampValue
-    // format in place and copy the rows to Impala row batches.
-    // TODO: avoid mem copies with a Kudu mem 'release' mechanism, attaching 
mem to the
-    // batch.
-    // TODO: consider codegen for this per-timestamp col fixup
     for (const SlotDescriptor* slot : timestamp_slots_) {
-      DCHECK(slot->type().type == TYPE_TIMESTAMP);
-      if (slot->is_nullable() && 
kudu_tuple->IsNull(slot->null_indicator_offset())) {
-        continue;
-      }
-      int64_t ts_micros = *reinterpret_cast<int64_t*>(
-          kudu_tuple->GetSlot(slot->tuple_offset()));
-
-      TimestampValue tv;
-      if (state_->query_options().convert_kudu_utc_timestamps) {
-        tv = TimestampValue::FromUnixTimeMicros(ts_micros, 
state_->local_time_zone());
-      } else {
-        tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
-      }
-
-      if (tv.HasDateAndTime()) {
-        RawValue::Write(&tv, kudu_tuple, slot, nullptr);
-      } else {
-        kudu_tuple->SetNull(slot->null_indicator_offset());
-        RETURN_IF_ERROR(state_->LogOrReturnError(
-            ErrorMsg::Init(TErrorCode::KUDU_TIMESTAMP_OUT_OF_RANGE,
-              scan_node_->table_desc()->table_name(),
-              
scanner_->GetKuduTable()->schema().Column(slot->col_pos()).name())));
+      Status status = ConvertTimestampFromKudu(kudu_tuple, slot);
+      if (UNLIKELY(!status.ok())) {
+        RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
       }
     }
 
-    // Kudu tuples containing VARCHAR columns use characters instead of bytes 
to limit
-    // the length. In the case of ASCII values there is no difference. 
However, if
-    // multi-byte characters are written to Kudu the length could be longer 
than allowed.
-    // This checks the actual length and truncates the value length if it is 
too long.
-    // TODO(IMPALA-5675): Remove this when Impala supports UTF-8 character 
VARCHAR length.
     for (const SlotDescriptor* slot : varchar_slots_) {
-      DCHECK(slot->type().type == TYPE_VARCHAR);
-      if (slot->is_nullable() && 
kudu_tuple->IsNull(slot->null_indicator_offset())) {
-        continue;
+      Status status = ConvertVarcharFromKudu(kudu_tuple, slot);
+      if (UNLIKELY(!status.ok())) {
+        RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
       }
-      StringValue* sv = reinterpret_cast<StringValue*>(
-          kudu_tuple->GetSlot(slot->tuple_offset()));
-      int src_len = sv->Len();
-      int dst_len = slot->type().len;
-      if (src_len > dst_len) {
-        sv->SetLen(dst_len);
+    }
+
+    item_tuple_mem_pool.Clear();
+    for (int i = 0; i < scan_node_->tuple_desc()->collection_slots().size(); 
++i) {
+      auto slot = scan_node_->tuple_desc()->collection_slots()[i];
+      Status status = ConvertArrayFromKudu(kudu_tuple, slot, 
item_tuple_buffers[i],
+          &item_tuple_mem_pool, kudu_array_element_byte_sizes_[i]);
+      if (!status.ok()) {
+        RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
       }
     }
 
@@ -459,6 +593,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
     *tuple_mem = next_tuple(*tuple_mem);
   }
   expr_results_pool_->Clear();
+  item_tuple_mem_pool.FreeAll();
 
   // Check the status in case an error status was set during conjunct 
evaluation.
   return state_->GetQueryStatus();
diff --git a/be/src/exec/kudu/kudu-scanner.h b/be/src/exec/kudu/kudu-scanner.h
index 48ef4134f..59ba8e491 100644
--- a/be/src/exec/kudu/kudu-scanner.h
+++ b/be/src/exec/kudu/kudu-scanner.h
@@ -24,6 +24,7 @@
 #include "common/object-pool.h"
 #include "exec/kudu/kudu-scan-node-base.h"
 #include "runtime/descriptors.h"
+#include "util/mem-range.h"
 
 namespace impala {
 
@@ -144,6 +145,26 @@ class KuduScanner {
   /// Varchar slots in the tuple descriptor of the scan node. Used to resize 
Kudu
   /// VARCHAR values inline.
   vector<const SlotDescriptor*> varchar_slots_;
+
+  // Converts the UNIXTIME_MICROS value in the 'slot' of the 'kudu_tuple' to a
+  // TimestampValue in place.
+  Status ConvertTimestampFromKudu(Tuple* kudu_tuple, const SlotDescriptor* 
slot);
+
+  // Converts the VARCHAR value in the 'slot' of the 'kudu_table' to a 
StringValue
+  // with length limit in place.
+  Status ConvertVarcharFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot);
+
+  // The byte size of an element in the Kudu array for each collection slot in 
the tuple.
+  vector<size_t> kudu_array_element_byte_sizes_;
+
+  // Converts the ARRAY value in the 'slot' of the 'kudu_tuple' to a 
CollectionValue in
+  // place, using
+  // - 'buffer' to hold the item tuples, and
+  // - 'item_tuple_mem_pool' to allocate memory if 'buffer' is too small.
+  // 'kudu_array_element_byte_size' is the byte size of an element in the Kudu 
array
+  Status ConvertArrayFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot,
+      MemRange& buffer, MemPool* item_tuple_mem_pool,
+      size_t kudu_array_element_byte_size);
 };
 
 } /// namespace impala
diff --git a/be/src/exec/kudu/kudu-util-ir.cc b/be/src/exec/kudu/kudu-util-ir.cc
index 2bd28fa52..c2aa98159 100644
--- a/be/src/exec/kudu/kudu-util-ir.cc
+++ b/be/src/exec/kudu/kudu-util-ir.cc
@@ -161,4 +161,16 @@ Status WriteKuduValue(int col, const ColumnType& col_type, 
const void* value,
   return Status::OK();
 }
 
+size_t GetKuduArrayElementSize(const SlotDescriptor* slot) {
+  // KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES does not apply to array 
elements.
+  if (slot->type().IsTimestampType()) {
+    return sizeof(int64_t);
+  }
+  // Same as TupleDescriptor#getSlotSize()
+  constexpr auto KUDU_SLICE_PADDING = 4U;
+  if (slot->type().IsStringType()) {
+    return slot->slot_size() + KUDU_SLICE_PADDING;
+  }
+  return slot->slot_size();
+}
 }
diff --git a/be/src/exec/kudu/kudu-util.h b/be/src/exec/kudu/kudu-util.h
index 1ad8b8d27..125775d22 100644
--- a/be/src/exec/kudu/kudu-util.h
+++ b/be/src/exec/kudu/kudu-util.h
@@ -21,6 +21,7 @@
 // TODO: Remove when toolchain callbacks.h properly defines ::tm.
 struct tm;
 
+#include <kudu/client/array_cell.h>
 #include <kudu/client/callbacks.h>
 #include <kudu/client/client.h>
 #include <kudu/client/value.h>
@@ -28,6 +29,7 @@ struct tm;
 #include "util/kudu-status-util.h"
 #include "runtime/string-value.h"
 #include "runtime/types.h"
+#include "runtime/tuple.h"
 
 namespace impala {
 
@@ -90,5 +92,7 @@ Status ConvertTimestampValueToKudu(const TimestampValue* tv, 
int64_t* ts_micros)
 // Converts a DateValue to Kudu's representation which is returned in 'days'.
 Status ConvertDateValueToKudu(const DateValue* dv, int32_t* days);
 
+size_t IR_ALWAYS_INLINE GetKuduArrayElementSize(const SlotDescriptor* slot);
+
 } /// namespace impala
 #endif
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index 0f66d0498..a78ff362c 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -172,7 +172,6 @@ class RawValue {
     return top_level ? "NULL" : "null";
   }
 
-private:
   /// Like Write() but 'value' must be non-NULL.
   template <bool COLLECT_VAR_LEN_VALS>
   static void WriteNonNull(const void* value, Tuple* tuple,
@@ -180,6 +179,7 @@ private:
       std::vector<StringValue*>* string_values,
       std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
 
+private:
   /// Recursive helper function for Write() to handle struct slots.
   template <bool COLLECT_VAR_LEN_VALS>
   static void WriteStruct(const void* value, Tuple* tuple,
diff --git a/bin/jenkins/dockerized-impala-run-tests.sh 
b/bin/jenkins/dockerized-impala-run-tests.sh
index c2ed71a35..4c06efe50 100755
--- a/bin/jenkins/dockerized-impala-run-tests.sh
+++ b/bin/jenkins/dockerized-impala-run-tests.sh
@@ -91,7 +91,8 @@ case ${IMPALA_DOCKER_JAVA:-8} in
   *)
     ;;
 esac
-make -j ${IMPALA_BUILD_THREADS} ${IMAGE_TYPE}_images parquet-reader 
impala-profile-tool
+make -j ${IMPALA_BUILD_THREADS} \
+    ${IMAGE_TYPE}_images parquet-reader impala-profile-tool kudu-array-inserter
 
 source_impala_config
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index bef7b11f9..c80e68d06 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -491,11 +491,11 @@ public class CreateTableStmt extends StatementBase 
implements SingleTableStmt {
 
     // Check column types are valid Kudu types
     for (ColumnDef col: getColumnDefs()) {
-      try {
-        KuduUtil.fromImpalaType(col.getType());
-      } catch (ImpalaRuntimeException e) {
-        throw new AnalysisException(String.format(
-            "Cannot create table '%s': %s", getTbl(), e.getMessage()));
+      if (KuduUtil.fromImpalaType(col.getType()) == null) {
+        String error_msg =
+            String.format("Cannot create table '%s': Type %s is not supported 
in Kudu",
+                getTbl(), col.getType().toSql());
+        throw new AnalysisException(error_msg);
       }
     }
     
AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java 
b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index ca731f907..726fdca36 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.impala.analysis.TableRef.ZippingUnnestType;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
@@ -90,6 +91,7 @@ public class FromClause extends StmtNode implements 
Iterable<TableRef> {
       tblRef.analyze(analyzer);
       leftTblRef = tblRef;
       if (tblRef instanceof CollectionTableRef) {
+        checkKuduCollectionSupport((CollectionTableRef)tblRef);
         checkIcebergCollectionSupport((CollectionTableRef)tblRef);
         checkTopLevelComplexAcidScan(analyzer, (CollectionTableRef)tblRef);
         if (firstZippingUnnestRef != null && tblRef.isZippingUnnest() &&
@@ -172,6 +174,15 @@ public class FromClause extends StmtNode implements 
Iterable<TableRef> {
     }
   }
 
+  private void checkKuduCollectionSupport(CollectionTableRef tblRef)
+      throws AnalysisException {
+    // TODO(IMPALA-14538): Support referencing a Kudu collection column as a 
table.
+    if (tblRef.getTable() instanceof FeKuduTable && !tblRef.isRelative()) {
+      throw new AnalysisException(
+          "Referencing a Kudu collection column as a table is not supported.");
+    }
+  }
+
   @Override
   public FromClause clone() {
     List<TableRef> clone = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 7b7b10c7c..a9cd8590b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -384,12 +384,12 @@ public class SlotDescriptor {
   }
 
   /**
-   * Returns true if this slot is of STRING type in a kudu table.
+   * Returns true if this slot is for a Kudu Slice.
    */
-  public boolean isKuduStringSlot() {
+  public boolean isKuduSliceSlot() {
     if (getParent() == null) return false;
     if (!(getParent().getTable() instanceof FeKuduTable)) return false;
-    return getType().isStringType();
+    return getType().isStringType() || getType().isArrayType();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java 
b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 37e8b1449..41fe67452 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -75,8 +75,8 @@ import com.google.common.base.Preconditions;
  * Offsets: 0          12      16           18       19
  */
 public class TupleDescriptor {
-  // Padding size in bytes for Kudu string slots.
-  private static final int KUDU_STRING_PADDING = 4;
+  // Padding size in bytes for Kudu Slice slots.
+  private static final int KUDU_SLICE_PADDING = 4;
 
   private final TupleId id_;
   private final String debugName_;  // debug-only
@@ -466,9 +466,7 @@ public class TupleDescriptor {
   private int getSlotSize(SlotDescriptor slotDesc) {
     int slotSize = slotDesc.getMaterializedSlotSize();
     // Add padding for a KUDU string slot.
-    if (slotDesc.isKuduStringSlot()) {
-      slotSize += KUDU_STRING_PADDING;
-    }
+    if (slotDesc.isKuduSliceSlot()) { slotSize += KUDU_SLICE_PADDING; }
     return slotSize;
   }
 
@@ -489,9 +487,9 @@ public class TupleDescriptor {
       avgSerializedSize_ += slotDesc.getMaterializedSlotSize();
     }
     // Add padding for a KUDU string slot.
-    if (slotDesc.isKuduStringSlot()) {
-      avgSerializedSize_ += KUDU_STRING_PADDING;
-      serializedPadSize_ += KUDU_STRING_PADDING;
+    if (slotDesc.isKuduSliceSlot()) {
+      avgSerializedSize_ += KUDU_SLICE_PADDING;
+      serializedPadSize_ += KUDU_SLICE_PADDING;
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index a39cf613e..3d41d1248 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -86,7 +86,7 @@ public class KuduColumn extends Column {
 
   public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int 
position)
       throws ImpalaRuntimeException {
-    Type type = KuduUtil.toImpalaType(colSchema.getType(), 
colSchema.getTypeAttributes());
+    Type type = KuduUtil.toImpalaType(colSchema);
     Object defaultValue = colSchema.getDefaultValue();
     LiteralExpr defaultValueExpr = null;
     if (defaultValue != null) {
@@ -144,7 +144,7 @@ public class KuduColumn extends Column {
     org.apache.kudu.Type kuduType = Schema.getAutoIncrementingColumnType();
     Preconditions.checkArgument(kuduType != org.apache.kudu.Type.DECIMAL &&
         kuduType != org.apache.kudu.Type.VARCHAR);
-    Type type = KuduUtil.toImpalaType(kuduType, null);
+    Type type = KuduUtil.toImpalaScalarType(kuduType, null);
     return new KuduColumn(Schema.getAutoIncrementingColumnName(), type,
         /* isKey */true, /* isPrimaryKeyUnique */false, /* isNullable */false,
         /* isAutoIncrementing */true, /* encoding */null, /* compression 
*/null,
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 35d6c8f34..a0ffb730d 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -204,8 +204,7 @@ public class KuduScanNode extends ScanNode {
             "outdated and need to be refreshed.");
       }
 
-      Type kuduColType =
-          KuduUtil.toImpalaType(kuduCol.getType(), 
kuduCol.getTypeAttributes());
+      Type kuduColType = KuduUtil.toImpalaType(kuduCol);
       if (!colType.equals(kuduColType)) {
         throw new ImpalaRuntimeException("Column '" + colName + "' is type " +
             kuduColType.toSql() + " but Impala expected " + colType.toSql() +
diff --git 
a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 4f645e8bc..1bbb25be9 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
@@ -37,8 +38,8 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
-import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TKuduPartitionByHashParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TRangePartition;
 import org.apache.impala.thrift.TRangePartitionOperationType;
 import org.apache.impala.util.EventSequence;
@@ -163,9 +164,18 @@ public class KuduCatalogOpExecutor {
       boolean isKeyUnique) throws ImpalaRuntimeException {
     Type type = Type.fromThrift(column.getColumnType());
     Preconditions.checkState(type != null);
+    // If the type is an ARRAY type, we need to
+    // 1. Create a ColumnSchemaBuilder for its item type first, and
+    // 2. Make it an ARRAY column by calling ColumnSchemaBuilder#array().
+    // Otherwise we need to create a ColumnSchemaBuilder for the type itself.
     org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
-
+    if (kuduType == null) {
+      throw new ImpalaRuntimeException(
+          String.format("Type %s is not supported in Kudu", type.toSql()));
+    }
     ColumnSchemaBuilder csb = new ColumnSchemaBuilder(column.getColumnName(), 
kuduType);
+    Type typeOrItemType = type;
+    if (type.isArrayType()) { typeOrItemType = ((ArrayType) 
type).getItemType(); }
     if (isKey && !isKeyUnique) {
       csb.nonUniqueKey(true);
     } else {
@@ -183,7 +193,7 @@ public class KuduCatalogOpExecutor {
     }
     if (column.isSetDefault_value()) {
       csb.defaultValue(KuduUtil.getKuduDefaultValue(
-          column.getDefault_value(), type, column.getColumnName()));
+          column.getDefault_value(), typeOrItemType, column.getColumnName()));
     }
     if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size());
     if (column.isSetEncoding()) {
@@ -192,17 +202,17 @@ public class KuduCatalogOpExecutor {
     if (column.isSetCompression()) {
       csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression()));
     }
-    if (type.isDecimal()) {
-      csb.typeAttributes(
-          DecimalUtil.typeAttributes(type.getPrecision(), 
type.getDecimalDigits()));
+    if (typeOrItemType.isDecimal()) {
+      csb.typeAttributes(DecimalUtil.typeAttributes(
+          typeOrItemType.getPrecision(), typeOrItemType.getDecimalDigits()));
     }
-    if (kuduType == org.apache.kudu.Type.VARCHAR) {
-      csb.typeAttributes(
-          CharUtil.typeAttributes(type.getColumnSize()));
+    if (typeOrItemType.isVarchar()) {
+      
csb.typeAttributes(CharUtil.typeAttributes(typeOrItemType.getColumnSize()));
     }
     if (column.isSetComment() && !column.getComment().isEmpty()) {
       csb.comment(column.getComment());
     }
+    if (type.isArrayType()) { csb.array(true); }
     return csb.build();
   }
 
@@ -385,8 +395,7 @@ public class KuduCatalogOpExecutor {
               "Error loading Kudu table: Impala does not support column names 
that " +
               "differ only in casing '%s'", colSchema.getName()));
         }
-        Type type =
-            KuduUtil.toImpalaType(colSchema.getType(), 
colSchema.getTypeAttributes());
+        Type type = KuduUtil.toImpalaType(colSchema);
         String comment =
             !colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
         cols.add(new FieldSchema(colSchema.getName(), 
type.toSql().toLowerCase(),
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 137fadabc..acce6b53f 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -28,30 +28,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.Nullable;
+
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
-import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnEncoding;
-import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.TExprNodeType;
 import org.apache.impala.thrift.THdfsCompression;
-import org.apache.impala.util.StringUtils;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
 import org.apache.kudu.ColumnSchema.Encoding;
@@ -226,6 +224,10 @@ public class KuduUtil {
     TExprNode literal = defaultValue.getNodes().get(0);
     if (literal.getNode_type() == TExprNodeType.NULL_LITERAL) return null;
     org.apache.kudu.Type type = KuduUtil.fromImpalaType(impalaType);
+    if (type == null) {
+      throw new ImpalaRuntimeException(
+          String.format("Type %s is not supported in Kudu", 
impalaType.toSql()));
+    }
     switch (type) {
       case INT8:
         checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
@@ -423,15 +425,24 @@ public class KuduUtil {
   }
 
   /**
-   * Converts a given Impala catalog type to the Kudu type. Throws an 
exception if the
-   * type cannot be converted.
+   * Converts a given Impala catalog type or its item type to the Kudu type.
+   * Returns null if the type cannot be converted instead of throwing an 
exception so
+   * that the caller can report the full type in the error message. Since this 
function
+   * contains recursion, only the outer-most caller has the full type info.
    */
-  public static org.apache.kudu.Type fromImpalaType(Type t)
-      throws ImpalaRuntimeException {
+  public static @Nullable org.apache.kudu.Type fromImpalaType(Type t) {
     if (!t.isScalarType()) {
-      throw new ImpalaRuntimeException(format(
-          "Type %s is not supported in Kudu", t.toSql()));
+      // Kudu does not support complex types other than ARRAY.
+      if (!t.isArrayType()) { return null; }
+      Type itemType = ((ArrayType) t).getItemType();
+      // Kudu does not support array of non-scalar types or 16-byte DECIMAL.
+      if (!itemType.isScalarType()
+          || ((ScalarType) itemType).storageBytesForDecimal() == 16) {
+        return null;
+      }
+      return KuduUtil.fromImpalaType(itemType);
     }
+
     ScalarType s = (ScalarType) t;
     switch (s.getPrimitiveType()) {
       case TINYINT: return org.apache.kudu.Type.INT8;
@@ -452,14 +463,16 @@ public class KuduUtil {
       case NULL_TYPE:
       case DATETIME:
       case CHAR:
-      default:
-        throw new ImpalaRuntimeException(format(
-            "Type %s is not supported in Kudu", s.toSql()));
+      default: return null;
     }
   }
 
-  public static Type toImpalaType(org.apache.kudu.Type t,
+  /**
+   * Converts a given Kudu scalar type to its matching Impala scalar type.
+   */
+  public static ScalarType toImpalaScalarType(org.apache.kudu.Type t,
       ColumnTypeAttributes typeAttributes) throws ImpalaRuntimeException {
+    Preconditions.checkState(t != org.apache.kudu.Type.NESTED);
     switch (t) {
       case BOOL: return Type.BOOLEAN;
       case DOUBLE: return Type.DOUBLE;
@@ -482,6 +495,25 @@ public class KuduUtil {
     }
   }
 
+  /**
+   * Converts a given Kudu colSchema to its matching Impala type. If the Kudu 
type is
+   * NESTED, it must be an array and the element type is converted to the
+   * corresponding Impala type.
+   */
+  public static Type toImpalaType(org.apache.kudu.ColumnSchema colSchema)
+      throws ImpalaRuntimeException {
+    org.apache.kudu.Type t = colSchema.getType();
+    ColumnTypeAttributes typeAttributes = colSchema.getTypeAttributes();
+    if (t != org.apache.kudu.Type.NESTED) {
+      return toImpalaScalarType(t, typeAttributes);
+    }
+    Preconditions.checkState(colSchema.getNestedTypeDescriptor() != null);
+    Preconditions.checkState(colSchema.getNestedTypeDescriptor().isArray());
+    org.apache.kudu.Type kuduElementType =
+        colSchema.getNestedTypeDescriptor().getArrayDescriptor().getElemType();
+    return new ArrayType(toImpalaScalarType(kuduElementType, typeAttributes));
+  }
+
   /**
    * Creates and returns an Expr that takes rows being inserted by 
'insertStmt' and
    * returns the partition number for each row.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 2bd046854..bcbe96991 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2683,9 +2683,11 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table t primary key (id) partition by hash 
partitions 3" +
         " stored as kudu as select id, m from 
functional.complextypes_fileformat",
         "Cannot create table 't': Type MAP<STRING,BIGINT> is not supported in 
Kudu");
-    AnalysisError("create table t primary key (id) partition by hash 
partitions 3" +
-        " stored as kudu as select id, a from 
functional.complextypes_fileformat",
-        "Cannot create table 't': Type ARRAY<INT> is not supported in Kudu");
+    AnalysisError("create table t primary key (id) partition by hash 
partitions 3"
+            + " stored as kudu as select id, a from 
functional.complextypes_fileformat",
+        "Unable to INSERT into target table (default.t) because the column 'a' 
has "
+            + "a complex type 'ARRAY<INT>' and Impala doesn't support 
inserting into "
+            + "tables containing complex type columns");
 
     // IMPALA-6454: CTAS into Kudu tables with primary key specified in upper 
case.
     AnalyzesOk("create table part_kudu_tbl primary key(INT_COL, SMALLINT_COL, 
ID)" +
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
index 276e09acc..1a1d02b07 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
@@ -390,10 +390,18 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < 
VALUES <= 1'",
         isExternalPurgeTbl);
 
+    // Test supported Kudu complex types
+    AnalyzesOk("create table tab (x int primary key, y ARRAY<INT>) "
+            + "partition by hash(x) partitions 3 stored as kudu",
+        isExternalPurgeTbl);
 
     // Test unsupported Kudu types
-    List<String> unsupportedTypes = Lists.newArrayList("CHAR(20)",
-        "STRUCT<f1:INT,f2:STRING>", "ARRAY<INT>", "MAP<STRING,STRING>");
+    List<String> unsupportedTypes = Lists.newArrayList(
+        "CHAR(20)", "STRUCT<f1:INT,f2:STRING>", "MAP<STRING,STRING>",
+        // ARRAY of any complex type or 16-byte DECIMAL is not supported yet.
+        "ARRAY<ARRAY<INT>>", "ARRAY<MAP<INT,INT>>", 
"ARRAY<STRUCT<a:INT,b:INT>>",
+        "ARRAY<DECIMAL(19,19)>"
+    );
     for (String t: unsupportedTypes) {
       String expectedError = String.format(
           "Cannot create table 'tab': Type %s is not supported in Kudu", t);
@@ -533,6 +541,10 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "default isnull(null, null)) partition by hash (i) partitions 3 " +
         "stored as kudu", "Default value of NULL not allowed on non-nullable 
column: " +
         "'x'", isExternalPurgeTbl);
+    AnalysisError("create table tab (i int primary key, a array<int> default 
null) " +
+        "partition by hash (i) partitions 3 stored as kudu",
+        "Default value NULL (type: NULL_TYPE) is not compatible with column " +
+        "'a' (type: ARRAY<INT>).", isExternalPurgeTbl);
     // Invalid block_size values
     AnalysisError("create table tab (i int primary key block_size 1.1) " +
         "partition by hash (i) partitions 3 stored as kudu", "Invalid value " +
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
index d536578c1..5cfff8820 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
@@ -747,3 +747,8 @@ stored as kudu
 ---- CATCH
 AnalysisException: BOOLEAN type is not allowed to be part of a PRIMARY KEY 
therefore not allowed for range-partitioning.
 ====
+---- QUERY
+# Array primary key column
+create table tab_array_primary_key (array_int array<int> primary key) stored 
as kudu
+---- CATCH
+IllegalArgumentException: Array type column: array_int cannot be a key column
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 1fb09871b..2b659f127 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 from builtins import range
 from copy import deepcopy
 
+from base64 import b64decode
 from kudu.schema import (
     BOOL,
     DOUBLE,
@@ -34,10 +35,13 @@ from kudu.schema import (
     DATE)
 from kudu.client import Partitioning
 from kudu.util import to_unixtime_micros
+import json
 import logging
 import pytest
+import os
 import random
 import re
+import subprocess
 import textwrap
 import threading
 import time
@@ -1844,3 +1848,248 @@ class 
TestKuduInsertWithBufferedTupleDesc(KuduTestSuite):
     except Exception as e:
       # Not expect to throw exception like "IllegalStateException: null"
       assert False, str(e)
+
+
+class TestKuduArray(KuduTestSuite):
+  """
+  Tests Kudu 1-D array suppport.
+  """
+
+  def _get_name_from_type(self, data_type):
+    return re.split("[(<)]", data_type)[0]
+
+  def _insert_arrays_into_kudu(self, kudu_table_name):
+    exec_path = os.environ["IMPALA_HOME"] + \
+        "/be/build/latest/exec/kudu/kudu-array-inserter"
+    self.client.log_client(str([exec_path, kudu_table_name]))
+    subprocess.check_call([exec_path, kudu_table_name])
+
+  def _check_table_schema(self, db, table_name, types):
+    result = self.execute_query("DESCRIBE {0}.{1}".format(db, table_name))
+    assert ("id", "tinyint") == result.tuples()[0][:2]
+    for i in range(1, len(result.tuples())):
+      (col_name, col_type) = result.tuples()[i][:2]
+      assert (col_type == "array<{0}>".format(types[i - 1].lower())
+              and col_name == "array_" + self._get_name_from_type(types[i - 
1]).lower())
+
+  # See be/src/exec/kudu/kudu-array-inserter.cc for the test data
+  EXPECTED_COLUMNS = {
+      "INT": (
+          None,
+          '[-2147483648,-1,2147483647]',
+          '[]',
+          '[null,-1,2147483647]',
+          '[-2147483648,null,2147483647]',
+          '[-2147483648,-1,null]',
+          '[-2147483648,-1,2147483647,-2147483648,-1]',
+      ),
+      "TIMESTAMP": (
+          None,
+          ('["1400-01-01 00:00:00",'
+            '"1969-12-31 23:59:59.999999000",'
+            '"9999-12-31 23:59:59.999999000"]'),
+          '[]',
+          '[null,"1969-12-31 23:59:59.999999000","9999-12-31 
23:59:59.999999000"]',
+          '["1400-01-01 00:00:00",null,"9999-12-31 23:59:59.999999000"]',
+          '["1400-01-01 00:00:00","1969-12-31 23:59:59.999999000",null]',
+          ('["1400-01-01 00:00:00",'
+            '"1969-12-31 23:59:59.999999000",'
+            '"9999-12-31 23:59:59.999999000",'
+            '"1400-01-01 00:00:00",'
+            '"1969-12-31 23:59:59.999999000"]'),
+      ),
+      # The output of the ARRAY<VARCHAR(1)> data are NOT valid UTF-8 strings.
+      "VARCHAR(1)": (
+          None,
+          b'["\xce","\xcf","\xce"]',
+          '[]',
+          b'[null,"\xcf","\xce"]',
+          b'["\xce",null,"\xce"]',
+          b'["\xce","\xcf",null]',
+          b'["\xce","\xcf","\xce","\xce","\xcf"]',
+      ),
+      "DECIMAL(18,18)": (
+          None,
+          '[-0.999999999999999999,-0.000000000000000001,0.999999999999999999]',
+          '[]',
+          '[null,-0.000000000000000001,0.999999999999999999]',
+          '[-0.999999999999999999,null,0.999999999999999999]',
+          '[-0.999999999999999999,-0.000000000000000001,null]',
+          ('[-0.999999999999999999,-0.000000000000000001,0.999999999999999999,'
+            '-0.999999999999999999,-0.000000000000000001]'),
+      ),
+      "DOUBLE": (
+          None,
+          '[-Infinity,NaN,Infinity]',
+          '[]',
+          '[null,NaN,Infinity]',
+          '[-Infinity,null,Infinity]',
+          '[-Infinity,NaN,null]',
+          '[-Infinity,NaN,Infinity,-Infinity,NaN]',
+      ),
+      # The output of each element in an ARRAY<BINARY> is Base64 encoded.
+      "BINARY": (
+          None,
+          '["zqM=","z4A=","zrs="]',
+          '[]',
+          '[null,"z4A=","zrs="]',
+          '["zqM=",null,"zrs="]',
+          '["zqM=","z4A=",null]',
+          '["zqM=","z4A=","zrs=","zqM=","z4A="]',
+      ),
+      "BOOLEAN": (
+          None,
+          '[true,false,true]',
+          '[]',
+          '[null,false,true]',
+          '[true,null,true]',
+          '[true,false,null]',
+          '[true,false,true,true,false]',
+      ),
+  }
+
+  def _check_table_data(self, db, table_name, types, query_options):
+    columns = ", ".join([
+        "array_{0}".format(self._get_name_from_type(item_type))
+        for item_type in types
+    ])
+    result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
+        columns, db, table_name), query_options=query_options)
+    for i, result_column in enumerate(zip(*result.tuples())):
+      if i == 0:
+        assert result_column == tuple(range(len(result.tuples())))
+      else:
+        assert result_column == self.EXPECTED_COLUMNS[types[i - 1]]
+
+  def _unnest_expected_column(self, item_type):
+    if item_type == "VARCHAR(1)":
+      return (
+          b'\xce', b'\xcf', b'\xce',
+          None, b'\xcf', b'\xce',
+          b'\xce', None, b'\xce',
+          b'\xce', b'\xcf', None,
+          b'\xce', b'\xcf', b'\xce', b'\xce', b'\xcf',
+      )
+    result = []
+    for serialized_array in self.EXPECTED_COLUMNS[item_type]:
+      if serialized_array is not None:
+        array = json.loads(
+            serialized_array,
+            parse_float=(lambda s: s) if item_type.startswith("DECIMAL") else 
None)
+        if item_type == "BINARY":
+          result += [b64decode(elem) if elem is not None else None for elem in 
array]
+        else:
+          result += array
+    return tuple(result)
+
+  EXPECTED_ID_UNNESTED = (
+      1, 1, 1,
+      3, 3, 3,
+      4, 4, 4,
+      5, 5, 5,
+      6, 6, 6, 6, 6
+  )
+
+  def _check_unnest(self, db, table_name, types, query_options, 
in_select_list):
+    if in_select_list:
+      columns = ", ".join([
+          "UNNEST(array_{0})".format(self._get_name_from_type(item_type))
+          for item_type in types
+      ])
+      result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
+          columns, db, table_name), query_options=query_options)
+    else:
+      columns = ", ".join([
+          "{0}.array_{1}".format(table_name, 
self._get_name_from_type(item_type))
+          for item_type in types
+      ])
+      result = self.execute_query("SELECT * FROM {1}.{2}, UNNEST({0})".format(
+          columns, db, table_name), query_options=query_options)
+    for i, result_column in enumerate(zip(*result.tuples())):
+      if i == 0:
+        assert result_column == self.EXPECTED_ID_UNNESTED
+      elif types[i - 1] == "DOUBLE":
+        # NaN cannot be compared directly.
+        assert str(result_column) == str(self._unnest_expected_column(types[i 
- 1]))
+      else:
+        assert result_column == self._unnest_expected_column(types[i - 1])
+
+  def _check_non_materialzied_elements(self, db, table_name, item_type, 
options):
+    result = self.execute_query("SELECT id FROM {0}.{1} AS t, 
t.array_{2}".format(
+        db, table_name, self._get_name_from_type(item_type)), 
query_options=options)
+    for result_column in zip(*result.tuples()):
+      assert result_column == self.EXPECTED_ID_UNNESTED
+
+  def test_supported_types(self, unique_database, vector):
+    """
+    Test array column support for kudu against [unique_database].kudu_array
+    and external table [unique_database].kudu_array_external.
+    """
+    db = unique_database
+    options = vector.get_value('exec_option')
+    SUPPORTED_ITEM_TYPES = [
+        "INT",
+        "TIMESTAMP",
+        "VARCHAR(1)",
+        "DECIMAL(18,18)",
+        "DOUBLE",
+        "BINARY",
+        "BOOLEAN",
+    ]
+    TEST_TABLE, TEST_EXTERNAL_TABLE = "kudu_array", "kudu_array_external"
+    column_defs = ", ".join([
+        "array_{0} ARRAY<{1}>".format(self._get_name_from_type(item_type), 
item_type)
+        for item_type in SUPPORTED_ITEM_TYPES
+    ])
+    # The table is unpartitioned to ensure the rows are read in the insertion 
order.
+    create_table_sql = (
+        "CREATE TABLE {0}.{1} (id TINYINT PRIMARY KEY, {2}) "
+        "STORED AS KUDU"
+    )
+
+    # Create table [unique_database].kudu_array
+    self.execute_query(create_table_sql.format(db, TEST_TABLE, column_defs))
+    self._check_table_schema(db, TEST_TABLE, SUPPORTED_ITEM_TYPES)
+
+    # Create external table [unique_database].kudu_array_external pointing to 
the same
+    # table as [unique_database].kudu_array and check the schema as well.
+    kudu_table_name = "impala::{0}.{1}".format(db, TEST_TABLE)
+    self.execute_query(create_external_kudu_query(
+        db, TEST_EXTERNAL_TABLE, kudu_table_name))
+    self._check_table_schema(
+        db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES)
+
+    # Insert some rows using kudu-array-inserter and read the data back
+    # through both table.
+    self._insert_arrays_into_kudu(kudu_table_name)
+    self._check_table_data(
+      db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options)
+    self._check_table_data(
+      db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES, options)
+
+    # Check the result of UNNEST().
+    self._check_unnest(
+        db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=True)
+    self._check_unnest(
+        db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=False)
+
+    # Check the result when the array elements are not materialized.
+    self._check_non_materialzied_elements(db, TEST_TABLE, "BINARY", options)
+
+    # TODO(IMPALA-14539): Support duplicate collection slots.
+    sql = "SELECT array_{2} FROM {0}.{1}, {1}.array_{2} AS unnested"
+    exc = str(self.execute_query_expect_failure(self.client, sql.format(
+        db, TEST_TABLE, "INT")))
+    assert (
+        "Unable to deserialize scan token for node with id '0' for Kudu table "
+        "'impala::{0}.{1}': Invalid argument: Duplicate column name: array_{2}"
+    ).format(db, TEST_TABLE, "int") in exc
+
+    # TODO(IMPALA-14538): Support referencing a Kudu collection column as a 
table.
+    sql = "SELECT pos, item FROM {0}.{1}.array_{2}"
+    exc = str(self.execute_query_expect_failure(self.client, sql.format(
+        db, TEST_TABLE, "INT")))
+    assert (
+        "AnalysisException: "
+        "Referencing a Kudu collection column as a table is not supported."
+    ) in exc

Reply via email to