This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch array-type
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 61dcbac02a8c960e10d13309645225e28c05a8a2
Author: camby <[email protected]>
AuthorDate: Tue Mar 8 20:26:51 2022 +0800

    [feature-wip](array-type)support select ARRAY data type on vectorized 
engine (#8217)
    
    Usage Example:
    1. create table for test;
    `CREATE TABLE `array_test` (
      `k1` tinyint(4) NOT NULL COMMENT "",
      `k2` smallint(6) NULL COMMENT "",
      `k3` ARRAY<int(11)> NULL COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`k1`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`k1`) BUCKETS 5
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );`
    
    2. insert some data
    `insert into array_test values(1, 2, [1, 2]);`
    `insert into array_test values(2, 3, null);`
    `insert into array_test values(3, null, null);`
    `insert into array_test values(4, null, []);`
    
    3. open vectorized
    `set enable_vectorized_engine=true;`
    
    4. query array data
    `select * from array_test;`
    +------+------+--------+
    | k1   | k2   | k3     |
    +------+------+--------+
    |    4 | NULL | []     |
    |    2 |    3 | NULL   |
    |    1 |    2 | [1, 2] |
    |    3 | NULL | NULL   |
    +------+------+--------+
    4 rows in set (0.061 sec)
    
    Code Changes include:
    1. add column_array, data_type_array codes;
    2. codes about data_type creation by Field, TabletColumn, TypeDescriptor, 
PColumnMeta move to DataTypeFactory;
    3. support create data_type for ARRAY date type;
    4. RowBlockV2::convert_to_vec_block support ARRAY date type;
    5. VMysqlResultWriter::append_block support ARRAY date type;
    6. vectorized::Block serialize and deserialize support ARRAY date type;
---
 be/src/olap/column_vector.h                        |   2 +-
 be/src/olap/field.h                                |   3 +-
 be/src/olap/row_block2.cpp                         | 256 ++++++++
 be/src/olap/row_block2.h                           |   3 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  21 +-
 be/src/olap/schema.cpp                             |  51 +-
 be/src/olap/schema.h                               |   2 +-
 be/src/olap/tablet_schema.cpp                      |  11 +-
 be/src/runtime/descriptors.cpp                     |  15 +-
 be/src/runtime/descriptors.h                       |   3 +-
 be/src/runtime/types.h                             |  54 +-
 be/src/vec/CMakeLists.txt                          |   3 +
 be/src/vec/columns/column_array.cpp                | 704 +++++++++++++++++++++
 be/src/vec/columns/column_array.h                  | 185 ++++++
 be/src/vec/core/block.cpp                          |  84 +--
 be/src/vec/core/types.h                            |   1 +
 be/src/vec/data_types/data_type.cpp                | 135 +---
 be/src/vec/data_types/data_type.h                  |   2 -
 be/src/vec/data_types/data_type_array.cpp          |  97 +++
 be/src/vec/data_types/data_type_array.h            |  77 +++
 be/src/vec/data_types/data_type_factory.cpp        | 254 ++++++++
 be/src/vec/data_types/data_type_factory.hpp        |  16 +
 be/src/vec/data_types/data_type_number_base.h      |   1 +
 be/src/vec/exprs/vectorized_agg_fn.cpp             |   7 +-
 be/src/vec/exprs/vexpr.cpp                         |  11 +-
 be/src/vec/olap/vgeneric_iterators.cpp             |   2 +-
 be/src/vec/sink/mysql_result_writer.cpp            | 106 +++-
 be/src/vec/sink/mysql_result_writer.h              |   3 +-
 be/test/vec/core/CMakeLists.txt                    |   1 +
 be/test/vec/core/column_array_test.cpp             |  85 +++
 be/test/vec/exec/vgeneric_iterators_test.cpp       |   2 +-
 31 files changed, 1836 insertions(+), 361 deletions(-)

diff --git a/be/src/olap/column_vector.h b/be/src/olap/column_vector.h
index eeeeceb..302773e 100644
--- a/be/src/olap/column_vector.h
+++ b/be/src/olap/column_vector.h
@@ -69,7 +69,7 @@ public:
 
     bool is_nullable() const { return _nullable; }
 
-    bool is_null_at(size_t row_idx) { return _nullable && 
_null_signs[row_idx]; }
+    bool is_null_at(size_t row_idx) const { return _nullable && 
_null_signs[row_idx]; }
 
     void set_is_null(size_t idx, bool is_null) {
         if (_nullable) {
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 770ef82..0565955 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -298,7 +298,8 @@ public:
     void add_sub_field(std::unique_ptr<Field> sub_field) {
         _sub_fields.emplace_back(std::move(sub_field));
     }
-    Field* get_sub_field(int i) { return _sub_fields[i].get(); }
+    Field* get_sub_field(int i) const { return _sub_fields[i].get(); }
+    size_t get_sub_field_count() const { return _sub_fields.size(); }
 
 protected:
     std::shared_ptr<const TypeInfo> _type_info;
diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp
index 73aca94..b6a2a0b 100644
--- a/be/src/olap/row_block2.cpp
+++ b/be/src/olap/row_block2.cpp
@@ -23,6 +23,7 @@
 #include "gutil/strings/substitute.h"
 #include "olap/row_cursor.h"
 #include "util/bitmap.h"
+#include "vec/columns/column_array.h"
 #include "vec/columns/column_complex.h"
 #include "vec/columns/column_vector.h"
 #include "vec/core/block.h"
@@ -277,6 +278,25 @@ Status RowBlockV2::_copy_data_to_column(int cid, 
doris::vectorized::MutableColum
         }
         break;
     }
+    case OLAP_FIELD_TYPE_ARRAY: {
+        auto column_array = assert_cast<vectorized::ColumnArray*>(column);
+        auto nested_col = (*column_array->get_data_ptr()).assume_mutable();
+        auto src_col = 
static_cast<ArrayColumnVectorBatch*>(_column_vector_batches[cid].get());
+
+        auto& offsets_col = column_array->get_offsets();
+        offsets_col.reserve(_selected_size);
+        uint32_t offset = 0;
+        for (uint16_t j = 0; j < _selected_size; ++j) {
+            uint16_t row_idx = _selection_vector[j];
+            auto cv = reinterpret_cast<const 
CollectionValue*>(column_block(cid).cell_ptr(row_idx));
+            if (!nullable_mark_array[j]) {
+                offset += cv->length();
+                _append_data_to_column(src_col->elements(), 
src_col->item_offset(row_idx), cv->length(), nested_col);
+            }
+            offsets_col.emplace_back(offset);
+        }
+        break;
+    }
     case OLAP_FIELD_TYPE_INT: {
         auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(column);
         insert_data_directly(cid, column_int);
@@ -325,6 +345,242 @@ Status RowBlockV2::_copy_data_to_column(int cid, 
doris::vectorized::MutableColum
     return Status::OK();
 }
 
+Status RowBlockV2::_append_data_to_column(const ColumnVectorBatch* batch, 
uint16_t off, uint16_t len, doris::vectorized::MutableColumnPtr& origin_column) 
{
+    constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024;
+
+    auto* column = origin_column.get();
+    uint16_t selected_size = len;
+    bool nullable_mark_array[selected_size];
+
+    bool column_nullable = origin_column->is_nullable();
+    bool origin_nullable = batch->is_nullable();
+    if (column_nullable) {
+        auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(origin_column.get());
+        auto& null_map = nullable_column->get_null_map_data();
+        column = nullable_column->get_nested_column_ptr().get();
+
+        if (origin_nullable) {
+            for (uint16_t i = 0; i < selected_size; ++i) {
+                uint16_t row_idx = i + off;
+                null_map.push_back(batch->is_null_at(row_idx));
+                nullable_mark_array[i] = null_map.back();
+            }
+        } else {
+            null_map.resize_fill(null_map.size() + selected_size, 0);
+            memset(nullable_mark_array, false, selected_size * sizeof(bool));
+        }
+    } else {
+        memset(nullable_mark_array, false, selected_size * sizeof(bool));
+    }
+
+    auto insert_data_directly = [&nullable_mark_array](auto& batch, auto& 
column, auto& off, auto& len) {
+        for (uint16_t j = 0; j < len; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                column->insert_data(
+                        reinterpret_cast<const 
char*>(batch->cell_ptr(row_idx)), 0);
+            } else {
+                column->insert_default();
+            }
+        }
+    };
+
+    switch (batch->type_info()->type()) {
+    case OLAP_FIELD_TYPE_OBJECT: {
+        auto column_bitmap = assert_cast<vectorized::ColumnBitmap*>(column);
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            column_bitmap->insert_default();
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto slice = reinterpret_cast<const 
Slice*>(batch->cell_ptr(row_idx));
+
+                BitmapValue* pvalue = 
&column_bitmap->get_element(column_bitmap->size() - 1);
+
+                if (slice->size != 0) {
+                    BitmapValue value;
+                    value.deserialize(slice->data);
+                    *pvalue = std::move(value);
+                } else {
+                    *pvalue = 
std::move(*reinterpret_cast<BitmapValue*>(slice->data));
+                }
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_HLL: {
+        auto column_hll = assert_cast<vectorized::ColumnHLL*>(column);
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            column_hll->insert_default();
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto slice = reinterpret_cast<const 
Slice*>(batch->cell_ptr(row_idx));
+
+                HyperLogLog* pvalue = 
&column_hll->get_element(column_hll->size() - 1);
+
+                if (slice->size != 0) {
+                    HyperLogLog value;
+                    value.deserialize(*slice);
+                    *pvalue = std::move(value);
+                } else {
+                    *pvalue = 
std::move(*reinterpret_cast<HyperLogLog*>(slice->data));
+                }
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_MAP:
+    case OLAP_FIELD_TYPE_VARCHAR: {
+        auto column_string = assert_cast<vectorized::ColumnString*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto slice = reinterpret_cast<const 
Slice*>(batch->cell_ptr(row_idx));
+                column_string->insert_data(slice->data, slice->size);
+            } else {
+                column_string->insert_default();
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_STRING: {
+        auto column_string = assert_cast<vectorized::ColumnString*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto slice = reinterpret_cast<const 
Slice*>(batch->cell_ptr(row_idx));
+                if (LIKELY(slice->size <= MAX_SIZE_OF_VEC_STRING)) {
+                    column_string->insert_data(slice->data, slice->size);
+                } else {
+                    return Status::NotSupported("Not support string len over 
than 1MB in vec engine.");
+                }
+            } else {
+                column_string->insert_default();
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_CHAR: {
+        auto column_string = assert_cast<vectorized::ColumnString*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto slice = reinterpret_cast<const 
Slice*>(batch->cell_ptr(row_idx));
+                column_string->insert_data(slice->data, strnlen(slice->data, 
slice->size));
+            } else {
+                column_string->insert_default();
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_DATE: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto ptr = reinterpret_cast<const 
char*>(batch->cell_ptr(row_idx));
+
+                uint64_t value = 0;
+                value = *(unsigned char*)(ptr + 2);
+                value <<= 8;
+                value |= *(unsigned char*)(ptr + 1);
+                value <<= 8;
+                value |= *(unsigned char*)(ptr);
+                vectorized::VecDateTimeValue date;
+                date.from_olap_date(value);
+                (column_int)->insert_data(reinterpret_cast<char*>(&date), 0);
+            } else
+                column_int->insert_default();
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_DATETIME: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto ptr = reinterpret_cast<const 
char*>(batch->cell_ptr(row_idx));
+
+                uint64_t value = *reinterpret_cast<const uint64_t*>(ptr);
+                vectorized::VecDateTimeValue data(value);
+                (column_int)->insert_data(reinterpret_cast<char*>(&data), 0);
+            } else {
+                column_int->insert_default();
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_DECIMAL: {
+        auto column_decimal =
+                
assert_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(column);
+
+        for (uint16_t j = 0; j < selected_size; ++j) {
+            if (!nullable_mark_array[j]) {
+                uint16_t row_idx = j + off;
+                auto ptr = reinterpret_cast<const 
char*>(batch->cell_ptr(row_idx));
+
+                int64_t int_value = *(int64_t*)(ptr);
+                int32_t frac_value = *(int32_t*)(ptr + sizeof(int64_t));
+                DecimalV2Value data(int_value, frac_value);
+                column_decimal->insert_data(reinterpret_cast<char*>(&data), 0);
+            } else {
+                column_decimal->insert_default();
+            }
+        }
+        break;
+    }
+    case OLAP_FIELD_TYPE_INT: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_BOOL: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_TINYINT: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_SMALLINT: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_BIGINT: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_LARGEINT: {
+        auto column_int = 
assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(column);
+        insert_data_directly(batch, column_int, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_FLOAT: {
+        auto column_float = 
assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(column);
+        insert_data_directly(batch, column_float, off, len);
+        break;
+    }
+    case OLAP_FIELD_TYPE_DOUBLE: {
+        auto column_float = 
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(column);
+        insert_data_directly(batch, column_float, off, len);
+        break;
+    }
+    default: {
+        DCHECK(false) << "Invalid type in RowBlockV2:" << 
batch->type_info()->type();
+    }
+    }
+
+    return Status::OK();
+}
+
 Status RowBlockV2::convert_to_vec_block(vectorized::Block* block) {
     DCHECK_LE(block->columns(), _schema.column_ids().size());
     for (int i = 0; i < block->columns(); ++i) {
diff --git a/be/src/olap/row_block2.h b/be/src/olap/row_block2.h
index 35c4a48..f2e9e5b 100644
--- a/be/src/olap/row_block2.h
+++ b/be/src/olap/row_block2.h
@@ -28,6 +28,8 @@
 #include "olap/selection_vector.h"
 #include "olap/types.h"
 #include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "vec/columns/column.h"
 
 namespace doris {
 
@@ -108,6 +110,7 @@ public:
 
 private:
     Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr& 
mutable_column_ptr);
+    Status _append_data_to_column(const ColumnVectorBatch* batch, uint16_t 
off, uint16_t len, vectorized::MutableColumnPtr& mutable_column_ptr);
 
     const Schema& _schema;
     size_t _capacity;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 560f694..20a89cf 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -715,14 +715,8 @@ void SegmentIterator::_init_current_block(
         for (size_t i = 0; i < _schema.num_column_ids(); i++) {
             auto cid = _schema.column_id(i);
             auto column_desc = _schema.column(cid);
-            auto data_type = Schema::get_data_type_ptr(column_desc->type());
-            if (column_desc->is_nullable()) {
-                block->insert({nullptr,
-                               
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)),
-                               column_desc->name()});
-            } else {
-                block->insert({nullptr, std::move(data_type), 
column_desc->name()});
-            }
+            auto data_type = Schema::get_data_type_ptr(*column_desc);
+            block->insert({nullptr, std::move(data_type), 
column_desc->name()});
         }
     }
 
@@ -735,13 +729,8 @@ void SegmentIterator::_init_current_block(
             if (is_block_mem_reuse) {
                 current_columns[cid] = 
std::move(*block->get_by_position(i).column).mutate();
             } else {
-                auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
-                if (column_desc->is_nullable()) {
-                    current_columns[cid] = 
doris::vectorized::ColumnNullable::create(
-                            data_type->create_column(), 
doris::vectorized::ColumnUInt8::create());
-                } else {
-                    current_columns[cid] = data_type->create_column();
-                }
+                auto data_type = Schema::get_data_type_ptr(*column_desc);
+                current_columns[cid] = data_type->create_column();
             }
             if (column_desc->type() == OLAP_FIELD_TYPE_DATE) {
                 current_columns[cid]->set_date_type();
@@ -929,7 +918,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
             } else { // predicate
                 if (!is_mem_reuse) {
                     auto column_desc = _schema.column(cid);
-                    auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
+                    auto data_type = Schema::get_data_type_ptr(*column_desc);
                     block->replace_by_position(i, data_type->create_column());
                 }
             }
diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp
index 77b31af..70bff3a 100644
--- a/be/src/olap/schema.cpp
+++ b/be/src/olap/schema.cpp
@@ -24,6 +24,7 @@
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 
@@ -109,54 +110,8 @@ Schema::~Schema() {
     }
 }
 
-vectorized::DataTypePtr Schema::get_data_type_ptr(FieldType type) {
-    switch (type) {
-    case OLAP_FIELD_TYPE_BOOL:
-        return std::make_shared<vectorized::DataTypeUInt8>();
-
-    case OLAP_FIELD_TYPE_TINYINT:
-        return std::make_shared<vectorized::DataTypeInt8>();
-
-    case OLAP_FIELD_TYPE_SMALLINT:
-        return std::make_shared<vectorized::DataTypeInt16>();
-
-    case OLAP_FIELD_TYPE_INT:
-        return std::make_shared<vectorized::DataTypeInt32>();
-
-    case OLAP_FIELD_TYPE_FLOAT:
-        return std::make_shared<vectorized::DataTypeFloat32>();
-
-    case OLAP_FIELD_TYPE_BIGINT:
-        return std::make_shared<vectorized::DataTypeInt64>();
-
-    case OLAP_FIELD_TYPE_LARGEINT:
-        return std::make_shared<vectorized::DataTypeInt128>();
-
-    case OLAP_FIELD_TYPE_DATE:
-        return std::make_shared<vectorized::DataTypeDate>();
-
-    case OLAP_FIELD_TYPE_DATETIME:
-        return std::make_shared<vectorized::DataTypeDateTime>();
-
-    case OLAP_FIELD_TYPE_DOUBLE:
-        return std::make_shared<vectorized::DataTypeFloat64>();
-
-    case OLAP_FIELD_TYPE_CHAR:
-    case OLAP_FIELD_TYPE_VARCHAR:
-    case OLAP_FIELD_TYPE_STRING:
-        return std::make_shared<vectorized::DataTypeString>();
-    case OLAP_FIELD_TYPE_HLL:
-        return std::make_shared<vectorized::DataTypeHLL>();
-    case OLAP_FIELD_TYPE_OBJECT:
-        return std::make_shared<vectorized::DataTypeBitMap>();
-
-    case OLAP_FIELD_TYPE_DECIMAL:
-        return 
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
-
-    default:
-        DCHECK(false);
-        return nullptr;
-    }
+vectorized::DataTypePtr Schema::get_data_type_ptr(const Field& field) {
+    return vectorized::DataTypeFactory::instance().create_data_type(field);
 }
 
 vectorized::IColumn::MutablePtr 
Schema::get_predicate_column_nullable_ptr(FieldType type,
diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h
index 89ebf4a..ce32620 100644
--- a/be/src/olap/schema.h
+++ b/be/src/olap/schema.h
@@ -100,7 +100,7 @@ public:
 
     ~Schema();
 
-    static vectorized::DataTypePtr get_data_type_ptr(FieldType type);
+    static vectorized::DataTypePtr get_data_type_ptr(const Field& field);
 
     static vectorized::IColumn::MutablePtr get_predicate_column_ptr(FieldType 
type);
 
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index afafd25..45b33f7 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -20,6 +20,7 @@
 #include "tablet_meta.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 
@@ -497,9 +498,12 @@ vectorized::Block TabletSchema::create_block(const 
std::vector<uint32_t>& return
     vectorized::Block block;
     for (int i = 0; i < return_columns.size(); ++i) {
         const auto& col = _cols[return_columns[i]];
-        auto data_type = vectorized::IDataType::from_olap_engine(col.type(),
-                col.is_nullable() || (tablet_columns_need_convert_null != 
nullptr &&
-                tablet_columns_need_convert_null->find(return_columns[i]) != 
tablet_columns_need_convert_null->end()));
+/* TODO:
+-       auto data_type = vectorized::IDataType::from_olap_engine(col.type(),
+-                col.is_nullable() || (tablet_columns_need_convert_null != 
nullptr &&
+-                tablet_columns_need_convert_null->find(return_columns[i]) != 
tablet_columns_need_convert_null->end()));
+*/
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(col);
         auto column = data_type->create_column();
         block.insert({std::move(column), data_type, col.name()});
     }
@@ -564,5 +568,4 @@ bool operator!=(const TabletSchema& a, const TabletSchema& 
b) {
     return !(a == b);
 }
 
-
 } // namespace doris
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 4225a75..29ad861 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -26,6 +26,7 @@
 #include "gen_cpp/descriptors.pb.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/columns_with_type_and_name.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_nullable.h"
 
 namespace doris {
@@ -84,19 +85,15 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) 
const {
 }
 
 vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
-    auto data_column = type().get_data_type_ptr()->create_column();
-    if (is_nullable()) {
-        return 
doris::vectorized::ColumnNullable::create(std::move(data_column),
-                                                         
doris::vectorized::ColumnUInt8::create());
+    auto data_type = get_data_type_ptr();
+    if (data_type) {
+        return data_type->create_column();
     }
-    return data_column;
+    return nullptr;
 }
 
 vectorized::DataTypePtr SlotDescriptor::get_data_type_ptr() const {
-    if (is_nullable()) {
-        return 
std::make_shared<vectorized::DataTypeNullable>(type().get_data_type_ptr());
-    }
-    return type().get_data_type_ptr();
+    return vectorized::DataTypeFactory::instance().create_data_type(type(), 
is_nullable());
 }
 
 std::string SlotDescriptor::debug_string() const {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index cdbc3d7..1801891 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -31,6 +31,7 @@
 #include "gen_cpp/FrontendService_types.h" // for TTupleId
 #include "gen_cpp/Types_types.h"
 #include "runtime/types.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris::vectorized {
 struct ColumnWithTypeAndName;
@@ -103,7 +104,7 @@ public:
 
     std::string debug_string() const;
 
-    doris::vectorized::MutableColumnPtr get_empty_mutable_column() const;
+    vectorized::MutableColumnPtr get_empty_mutable_column() const;
 
     doris::vectorized::DataTypePtr get_data_type_ptr() const;
 
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 9235e6c..152e87e 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -28,6 +28,7 @@
 #include "runtime/collection_value.h"
 #include "runtime/primitive_type.h"
 #include "thrift/protocol/TDebugProtocol.h"
+#include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_bitmap.h"
 #include "vec/data_types/data_type_date.h"
 #include "vec/data_types/data_type_date_time.h"
@@ -273,59 +274,6 @@ struct TypeDescriptor {
         return -1;
     }
 
-    inline doris::vectorized::DataTypePtr get_data_type_ptr() const {
-        switch (type) {
-        case TYPE_BOOLEAN:
-            return std::make_shared<vectorized::DataTypeUInt8>();
-
-        case TYPE_TINYINT:
-            return std::make_shared<vectorized::DataTypeInt8>();
-
-        case TYPE_SMALLINT:
-            return std::make_shared<vectorized::DataTypeInt16>();
-
-        case TYPE_INT:
-            return std::make_shared<vectorized::DataTypeInt32>();
-
-        case TYPE_FLOAT:
-            return std::make_shared<vectorized::DataTypeFloat32>();
-
-        case TYPE_BIGINT:
-            return std::make_shared<vectorized::DataTypeInt64>();
-
-        case TYPE_LARGEINT:
-            return std::make_shared<vectorized::DataTypeInt128>();
-        case TYPE_DATE:
-            return std::make_shared<vectorized::DataTypeDate>();
-        case TYPE_DATETIME:
-            return std::make_shared<vectorized::DataTypeDateTime>();
-        case TYPE_TIME:
-        case TYPE_DOUBLE:
-            return std::make_shared<vectorized::DataTypeFloat64>();
-
-        case TYPE_STRING:
-        case TYPE_CHAR:
-        case TYPE_VARCHAR:
-            return std::make_shared<vectorized::DataTypeString>();
-        case TYPE_HLL:
-            return std::make_shared<vectorized::DataTypeHLL>();
-        case TYPE_OBJECT:
-            return std::make_shared<vectorized::DataTypeBitMap>();
-
-        case TYPE_DECIMALV2:
-            return 
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
-        // Just Mock A NULL Type in Vec Exec Engine
-        case TYPE_NULL:
-            return std::make_shared<vectorized::DataTypeUInt8>();
-
-        case INVALID_TYPE:
-        default:
-            DCHECK(false);
-        }
-        // For llvm complain
-        return nullptr;
-    }
-
     static inline int get_decimal_byte_size(int precision) {
         DCHECK_GT(precision, 0);
         if (precision <= MAX_DECIMAL4_PRECISION) {
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 9a58191..3a725ee 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -39,6 +39,7 @@ set(VEC_FILES
   aggregate_functions/aggregate_function_simple_factory.cpp
   columns/collator.cpp
   columns/column.cpp
+  columns/column_array.cpp
   columns/column_const.cpp
   columns/column_decimal.cpp
   columns/column_nullable.cpp
@@ -57,7 +58,9 @@ set(VEC_FILES
   core/sort_block.cpp
   core/materialize_block.cpp
   data_types/data_type.cpp
+  data_types/data_type_array.cpp
   data_types/data_type_bitmap.cpp
+  data_types/data_type_factory.cpp
   data_types/data_type_hll.cpp
   data_types/data_type_nothing.cpp
   data_types/data_type_nothing.cpp
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
new file mode 100644
index 0000000..be611af
--- /dev/null
+++ b/be/src/vec/columns/column_array.cpp
@@ -0,0 +1,704 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnArray.cpp
+// and modified by Doris
+
+#include <string.h> // memcpy
+
+#include "vec/common/assert_cast.h"
+#include "vec/columns/collator.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/columns_common.h"
+#include "vec/columns/columns_number.h"
+
+namespace doris::vectorized {
+
+namespace ErrorCodes {
+    extern const int NOT_IMPLEMENTED;
+    extern const int BAD_ARGUMENTS;
+    extern const int PARAMETER_OUT_OF_BOUND;
+    extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
+    extern const int LOGICAL_ERROR;
+    extern const int TOO_LARGE_ARRAY_SIZE;
+}
+
+/** Obtaining array as Field can be slow for large arrays and consume vast 
amount of memory.
+  * Just don't allow to do it.
+  * You can increase the limit if the following query:
+  *  SELECT range(10000000)
+  * will take less than 500ms on your machine.
+  */
+static constexpr size_t max_array_size_as_field = 1000000;
+
+ColumnArray::ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr 
&& offsets_column)
+    : data(std::move(nested_column)), offsets(std::move(offsets_column)) {
+    const ColumnOffsets * offsets_concrete = typeid_cast<const ColumnOffsets 
*>(offsets.get());
+
+    if (!offsets_concrete) {
+        LOG(FATAL) << "offsets_column must be a ColumnUInt64";
+    }
+
+    if (!offsets_concrete->empty() && nested_column) {
+        Offset last_offset = offsets_concrete->get_data().back();
+
+        /// This will also prevent possible overflow in offset.
+        if (nested_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
nested_column";
+        }
+    }
+
+    /** NOTE
+      * Arrays with constant value are possible and used in implementation of 
higher order functions (see FunctionReplicate).
+      * But in most cases, arrays with constant value are unexpected and code 
will work wrong. Use with caution.
+      */
+}
+
+ColumnArray::ColumnArray(MutableColumnPtr && nested_column)
+    : data(std::move(nested_column)) {
+    if (!data->empty()) {
+        LOG(FATAL) << "Not empty data passed to ColumnArray, but no offsets 
passed";
+    }
+
+    offsets = ColumnOffsets::create();
+}
+
+std::string ColumnArray::get_name() const { return "Array(" + 
get_data().get_name() + ")"; }
+
+MutableColumnPtr ColumnArray::clone_resized(size_t to_size) const {
+    auto res = ColumnArray::create(get_data().clone_empty());
+
+    if (to_size == 0)
+        return res;
+    size_t from_size = size();
+
+    if (to_size <= from_size) {
+        /// Just cut column.
+        res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() 
+ to_size);
+        res->get_data().insert_range_from(get_data(), 0, get_offsets()[to_size 
- 1]);
+    } else {
+        /// Copy column and append empty arrays for extra elements.
+        Offset offset = 0;
+        if (from_size > 0) {
+            res->get_offsets().assign(get_offsets().begin(), 
get_offsets().end());
+            res->get_data().insert_range_from(get_data(), 0, 
get_data().size());
+            offset = get_offsets().back();
+        }
+
+        res->get_offsets().resize(to_size);
+        for (size_t i = from_size; i < to_size; ++i)
+            res->get_offsets()[i] = offset;
+    }
+
+    return res;
+}
+
+size_t ColumnArray::size() const {
+    return get_offsets().size();
+}
+
+Field ColumnArray::operator[](size_t n) const {
+    size_t offset = offset_at(n);
+    size_t size = size_at(n);
+
+    if (size > max_array_size_as_field)
+        LOG(FATAL) << "Array of size " << size << " is too large to be 
manipulated as single field,"
+                   << "maximum size " << max_array_size_as_field;
+
+    Array res(size);
+
+    for (size_t i = 0; i < size; ++i)
+        res[i] = get_data()[offset + i];
+
+    return res;
+}
+
+void ColumnArray::get(size_t n, Field & res) const {
+    size_t offset = offset_at(n);
+    size_t size = size_at(n);
+
+    if (size > max_array_size_as_field)
+        LOG(FATAL) << "Array of size " << size << " is too large to be 
manipulated as single field,"
+                   << " maximum size " << max_array_size_as_field;
+
+    res = Array(size);
+    Array & res_arr = doris::vectorized::get<Array &>(res);
+
+    for (size_t i = 0; i < size; ++i)
+        get_data().get(offset + i, res_arr[i]);
+}
+
+StringRef ColumnArray::get_data_at(size_t n) const {
+    /** Returns the range of memory that covers all elements of the array.
+      * Works for arrays of fixed length values.
+      * For arrays of strings and arrays of arrays, the resulting chunk of 
memory may not be one-to-one correspondence with the elements,
+      *  since it contains only the data laid in succession, but not the 
offsets.
+      */
+
+    size_t offset_of_first_elem = offset_at(n);
+    StringRef first = 
get_data().get_data_at_with_terminating_zero(offset_of_first_elem);
+
+    size_t array_size = size_at(n);
+    if (array_size == 0)
+        return StringRef(first.data, 0);
+
+    size_t offset_of_last_elem = get_offsets()[n] - 1;
+    StringRef last = 
get_data().get_data_at_with_terminating_zero(offset_of_last_elem);
+
+    return StringRef(first.data, last.data + last.size - first.data);
+}
+
+bool ColumnArray::is_default_at(size_t n) const {
+    const auto & offsets_data = get_offsets();
+    return offsets_data[n] == offsets_data[static_cast<ssize_t>(n) - 1];
+}
+
+void ColumnArray::insert_data(const char * pos, size_t length) {
+    /** Similarly - only for arrays of fixed length values.
+      */
+    if (!data->is_fixed_and_contiguous())
+        LOG(FATAL) << "Method insert_data is not supported for " << get_name();
+
+    size_t field_size = data->size_of_value_if_fixed();
+
+    size_t elems = 0;
+
+    if (length)
+    {
+        const char * end = pos + length;
+        for (; pos + field_size <= end; pos += field_size, ++elems)
+            data->insert_data(pos, field_size);
+
+        if (pos != end)
+            LOG(FATAL) << "Incorrect length argument for method 
ColumnArray::insert_data";
+    }
+
+    get_offsets().push_back(get_offsets().back() + elems);
+}
+
+StringRef ColumnArray::serialize_value_into_arena(size_t n, Arena & arena, 
char const *& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char * pos = arena.alloc_continue(sizeof(array_size), begin);
+    memcpy(pos, &array_size, sizeof(array_size));
+
+    StringRef res(pos, sizeof(array_size));
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_data().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    return res;
+}
+
+const char * ColumnArray::deserialize_and_insert_from_arena(const char * pos) {
+    size_t array_size = unaligned_load<size_t>(pos);
+    pos += sizeof(array_size);
+
+    for (size_t i = 0; i < array_size; ++i)
+        pos = get_data().deserialize_and_insert_from_arena(pos);
+
+    get_offsets().push_back(get_offsets().back() + array_size);
+    return pos;
+}
+
+void ColumnArray::update_hash_with_value(size_t n, SipHash & hash) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    hash.update(array_size);
+    for (size_t i = 0; i < array_size; ++i)
+        get_data().update_hash_with_value(offset + i, hash);
+}
+
+void ColumnArray::insert(const Field & x) {
+    const Array & array = doris::vectorized::get<const Array &>(x);
+    size_t size = array.size();
+    for (size_t i = 0; i < size; ++i)
+        get_data().insert(array[i]);
+    get_offsets().push_back(get_offsets().back() + size);
+}
+
+void ColumnArray::insert_from(const IColumn & src_, size_t n) {
+    const ColumnArray & src = assert_cast<const ColumnArray &>(src_);
+    size_t size = src.size_at(n);
+    size_t offset = src.offset_at(n);
+
+    get_data().insert_range_from(src.get_data(), offset, size);
+    get_offsets().push_back(get_offsets().back() + size);
+}
+
+void ColumnArray::insert_default() {
+    /// NOTE 1: We can use back() even if the array is empty (due to zero -1th 
element in PODArray).
+    /// NOTE 2: We cannot use reference in push_back, because reference get 
invalidated if array is reallocated.
+    auto last_offset = get_offsets().back();
+    get_offsets().push_back(last_offset);
+}
+
+void ColumnArray::pop_back(size_t n) {
+    auto & offsets_data = get_offsets();
+    DCHECK(n <= offsets_data.size());
+    size_t nested_n = offsets_data.back() - offset_at(offsets_data.size() - n);
+    if (nested_n)
+        get_data().pop_back(nested_n);
+    offsets_data.resize_assume_reserved(offsets_data.size() - n);
+}
+
+void ColumnArray::reserve(size_t n) {
+    get_offsets().reserve(n);
+    get_data().reserve(n); /// The average size of arrays is not taken into 
account here. Or it is considered to be no more than 1.
+}
+
+size_t ColumnArray::byte_size() const {
+    return get_data().byte_size() + get_offsets().size() * 
sizeof(get_offsets()[0]);
+}
+
+size_t ColumnArray::allocated_bytes() const {
+    return get_data().allocated_bytes() + get_offsets().allocated_bytes();
+}
+
+void ColumnArray::protect() {
+    get_data().protect();
+    get_offsets().protect();
+}
+
+ColumnPtr ColumnArray::convert_to_full_column_if_const() const {
+    /// It is possible to have an array with constant data and non-constant 
offsets.
+    /// Example is the result of expression: replicate('hello', [1])
+    return ColumnArray::create(data->convert_to_full_column_if_const(), 
offsets);
+}
+
+void ColumnArray::insert_range_from(const IColumn & src, size_t start, size_t 
length) {
+    if (length == 0)
+        return;
+
+    const ColumnArray & src_concrete = assert_cast<const ColumnArray &>(src);
+
+    if (start + length > src_concrete.get_offsets().size())
+        LOG(FATAL) << "Parameter out of bound in 
ColumnArray::insert_range_from method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size()) << ")]";
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    get_data().insert_range_from(src_concrete.get_data(), nested_offset, 
nested_length);
+
+    Offsets & cur_offsets = get_offsets();
+    const Offsets & src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        size_t prev_max_offset = old_size ? cur_offsets.back() : 0;
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i)
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+    }
+}
+
+ColumnPtr ColumnArray::filter(const Filter & filt, ssize_t result_size_hint) 
const {
+    if (typeid_cast<const ColumnUInt8 *>(data.get()))      return 
filter_number<UInt8>(filt, result_size_hint);
+    if (typeid_cast<const ColumnUInt16 *>(data.get()))     return 
filter_number<UInt16>(filt, result_size_hint);
+    if (typeid_cast<const ColumnUInt32 *>(data.get()))     return 
filter_number<UInt32>(filt, result_size_hint);
+    if (typeid_cast<const ColumnUInt64 *>(data.get()))     return 
filter_number<UInt64>(filt, result_size_hint);
+    if (typeid_cast<const ColumnInt8 *>(data.get()))       return 
filter_number<Int8>(filt, result_size_hint);
+    if (typeid_cast<const ColumnInt16 *>(data.get()))      return 
filter_number<Int16>(filt, result_size_hint);
+    if (typeid_cast<const ColumnInt32 *>(data.get()))      return 
filter_number<Int32>(filt, result_size_hint);
+    if (typeid_cast<const ColumnInt64 *>(data.get()))      return 
filter_number<Int64>(filt, result_size_hint);
+    if (typeid_cast<const ColumnFloat32 *>(data.get()))    return 
filter_number<Float32>(filt, result_size_hint);
+    if (typeid_cast<const ColumnFloat64 *>(data.get()))    return 
filter_number<Float64>(filt, result_size_hint);
+    if (typeid_cast<const ColumnString *>(data.get()))     return 
filter_string(filt, result_size_hint);
+    //if (typeid_cast<const ColumnTuple *>(data.get()))      return 
filterTuple(filt, result_size_hint);
+    if (typeid_cast<const ColumnNullable *>(data.get()))   return 
filter_nullable(filt, result_size_hint);
+    return filter_generic(filt, result_size_hint);
+}
+
+template <typename T>
+ColumnPtr ColumnArray::filter_number(const Filter & filt, ssize_t 
result_size_hint) const {
+    if (get_offsets().empty())
+        return ColumnArray::create(data);
+
+    auto res = ColumnArray::create(data->clone_empty());
+
+    auto & res_elems = assert_cast<ColumnVector<T> 
&>(res->get_data()).get_data();
+    Offsets & res_offsets = res->get_offsets();
+
+    filter_arrays_impl<T>(assert_cast<const ColumnVector<T> 
&>(*data).get_data(), get_offsets(), res_elems, res_offsets, filt, 
result_size_hint);
+    return res;
+}
+
+ColumnPtr ColumnArray::filter_string(const Filter & filt, ssize_t 
result_size_hint) const {
+    size_t col_size = get_offsets().size();
+    if (col_size != filt.size())
+        LOG(FATAL) << "Size of filter doesn't match size of column.";
+
+    if (0 == col_size)
+        return ColumnArray::create(data);
+
+    auto res = ColumnArray::create(data->clone_empty());
+
+    const ColumnString & src_string = typeid_cast<const ColumnString &>(*data);
+    const ColumnString::Chars & src_chars = src_string.get_chars();
+    const Offsets & src_string_offsets = src_string.get_offsets();
+    const Offsets & src_offsets = get_offsets();
+
+    ColumnString::Chars & res_chars = typeid_cast<ColumnString 
&>(res->get_data()).get_chars();
+    Offsets & res_string_offsets = typeid_cast<ColumnString 
&>(res->get_data()).get_offsets();
+    Offsets & res_offsets = res->get_offsets();
+
+    if (result_size_hint < 0) {
+        res_chars.reserve(src_chars.size());
+        res_string_offsets.reserve(src_string_offsets.size());
+        res_offsets.reserve(col_size);
+    }
+
+    Offset prev_src_offset = 0;
+    Offset prev_src_string_offset = 0;
+
+    Offset prev_res_offset = 0;
+    Offset prev_res_string_offset = 0;
+
+    for (size_t i = 0; i < col_size; ++i) {
+        /// Number of rows in the array.
+        size_t array_size = src_offsets[i] - prev_src_offset;
+
+        if (filt[i]) {
+            /// If the array is not empty - copy content.
+            if (array_size) {
+                size_t chars_to_copy = src_string_offsets[array_size + 
prev_src_offset - 1] - prev_src_string_offset;
+                size_t res_chars_prev_size = res_chars.size();
+                res_chars.resize(res_chars_prev_size + chars_to_copy);
+                memcpy(&res_chars[res_chars_prev_size], 
&src_chars[prev_src_string_offset], chars_to_copy);
+
+                for (size_t j = 0; j < array_size; ++j)
+                    res_string_offsets.push_back(src_string_offsets[j + 
prev_src_offset] + prev_res_string_offset - prev_src_string_offset);
+
+                prev_res_string_offset = res_string_offsets.back();
+            }
+
+            prev_res_offset += array_size;
+            res_offsets.push_back(prev_res_offset);
+        }
+
+        if (array_size) {
+            prev_src_offset += array_size;
+            prev_src_string_offset = src_string_offsets[prev_src_offset - 1];
+        }
+    }
+
+    return res;
+}
+
+ColumnPtr ColumnArray::filter_generic(const Filter & filt, ssize_t 
result_size_hint) const {
+    size_t size = get_offsets().size();
+    if (size != filt.size())
+        LOG(FATAL) << "Size of filter doesn't match size of column.";
+
+    if (size == 0)
+        return ColumnArray::create(data);
+
+    Filter nested_filt(get_offsets().back());
+    for (size_t i = 0; i < size; ++i) {
+        if (filt[i])
+            memset(&nested_filt[offset_at(i)], 1, size_at(i));
+        else
+            memset(&nested_filt[offset_at(i)], 0, size_at(i));
+    }
+
+    auto res = ColumnArray::create(data->clone_empty());
+
+    ssize_t nested_result_size_hint = 0;
+    if (result_size_hint < 0)
+        nested_result_size_hint = result_size_hint;
+    else if (result_size_hint && result_size_hint < 1000000000 && data->size() 
< 1000000000)    /// Avoid overflow.
+         nested_result_size_hint = result_size_hint * data->size() / size;
+
+    res->data = data->filter(nested_filt, nested_result_size_hint);
+
+    Offsets & res_offsets = res->get_offsets();
+    if (result_size_hint)
+        res_offsets.reserve(result_size_hint > 0 ? result_size_hint : size);
+
+    size_t current_offset = 0;
+    for (size_t i = 0; i < size; ++i) {
+        if (filt[i])
+        {
+            current_offset += size_at(i);
+            res_offsets.push_back(current_offset);
+        }
+    }
+
+    return res;
+}
+
+ColumnPtr ColumnArray::filter_nullable(const Filter & filt, ssize_t 
result_size_hint) const {
+    if (get_offsets().empty())
+        return ColumnArray::create(data);
+
+    const ColumnNullable & nullable_elems = assert_cast<const ColumnNullable 
&>(*data);
+
+    auto array_of_nested = 
ColumnArray::create(nullable_elems.get_nested_column_ptr(), offsets);
+    auto filtered_array_of_nested_owner = array_of_nested->filter(filt, 
result_size_hint);
+    const auto & filtered_array_of_nested = assert_cast<const ColumnArray 
&>(*filtered_array_of_nested_owner);
+    const auto & filtered_offsets = filtered_array_of_nested.get_offsets_ptr();
+
+    auto res_null_map = ColumnUInt8::create();
+
+    filter_arrays_impl_only_data(nullable_elems.get_null_map_data(), 
get_offsets(), res_null_map->get_data(), filt, result_size_hint);
+
+    return ColumnArray::create(
+        ColumnNullable::create(
+            filtered_array_of_nested.get_data_ptr(),
+            std::move(res_null_map)),
+        filtered_offsets);
+}
+
+void ColumnArray::insert_indices_from(const IColumn& src, const int* 
indices_begin, const int* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x == -1) {
+            ColumnArray::insert_default();
+        } else {
+            ColumnArray::insert_from(src, *x);
+        }
+    }
+}
+
+ColumnPtr ColumnArray::replicate(const Offsets & replicate_offsets) const {
+    if (replicate_offsets.empty())
+        return clone_empty();
+
+    if (typeid_cast<const ColumnUInt8 *>(data.get()))    return 
replicate_number<UInt8>(replicate_offsets);
+    if (typeid_cast<const ColumnUInt16 *>(data.get()))   return 
replicate_number<UInt16>(replicate_offsets);
+    if (typeid_cast<const ColumnUInt32 *>(data.get()))   return 
replicate_number<UInt32>(replicate_offsets);
+    if (typeid_cast<const ColumnUInt64 *>(data.get()))   return 
replicate_number<UInt64>(replicate_offsets);
+    if (typeid_cast<const ColumnInt8 *>(data.get()))     return 
replicate_number<Int8>(replicate_offsets);
+    if (typeid_cast<const ColumnInt16 *>(data.get()))    return 
replicate_number<Int16>(replicate_offsets);
+    if (typeid_cast<const ColumnInt32 *>(data.get()))    return 
replicate_number<Int32>(replicate_offsets);
+    if (typeid_cast<const ColumnInt64 *>(data.get()))    return 
replicate_number<Int64>(replicate_offsets);
+    if (typeid_cast<const ColumnFloat32 *>(data.get()))  return 
replicate_number<Float32>(replicate_offsets);
+    if (typeid_cast<const ColumnFloat64 *>(data.get()))  return 
replicate_number<Float64>(replicate_offsets);
+    if (typeid_cast<const ColumnString *>(data.get()))   return 
replicate_string(replicate_offsets);
+    if (typeid_cast<const ColumnConst *>(data.get()))    return 
replicate_const(replicate_offsets);
+    if (typeid_cast<const ColumnNullable *>(data.get())) return 
replicate_nullable(replicate_offsets);
+    //if (typeid_cast<const ColumnTuple *>(data.get()))    return 
replicateTuple(replicate_offsets);
+    return replicate_generic(replicate_offsets);
+}
+
+template <typename T>
+ColumnPtr ColumnArray::replicate_number(const Offsets & replicate_offsets) 
const {
+    size_t col_size = size();
+    if (col_size != replicate_offsets.size())
+        LOG(FATAL) << "Size of offsets doesn't match size of column.";
+
+    MutableColumnPtr res = clone_empty();
+
+    if (0 == col_size)
+        return res;
+
+    ColumnArray & res_arr = typeid_cast<ColumnArray &>(*res);
+
+    const typename ColumnVector<T>::Container & src_data = typeid_cast<const 
ColumnVector<T> &>(*data).get_data();
+    const Offsets & src_offsets = get_offsets();
+
+    typename ColumnVector<T>::Container & res_data = 
typeid_cast<ColumnVector<T> &>(res_arr.get_data()).get_data();
+    Offsets & res_offsets = res_arr.get_offsets();
+
+    res_data.reserve(data->size() / col_size * replicate_offsets.back());
+    res_offsets.reserve(replicate_offsets.back());
+
+    Offset prev_replicate_offset = 0;
+    Offset prev_data_offset = 0;
+    Offset current_new_offset = 0;
+
+    for (size_t i = 0; i < col_size; ++i) {
+        size_t size_to_replicate = replicate_offsets[i] - 
prev_replicate_offset;
+        size_t value_size = src_offsets[i] - prev_data_offset;
+
+        for (size_t j = 0; j < size_to_replicate; ++j) {
+            current_new_offset += value_size;
+            res_offsets.push_back(current_new_offset);
+
+            if (value_size) {
+                res_data.resize(res_data.size() + value_size);
+                memcpy(&res_data[res_data.size() - value_size], 
&src_data[prev_data_offset], value_size * sizeof(T));
+            }
+        }
+
+        prev_replicate_offset = replicate_offsets[i];
+        prev_data_offset = src_offsets[i];
+    }
+
+    return res;
+}
+
+ColumnPtr ColumnArray::replicate_string(const Offsets & replicate_offsets) 
const {
+    size_t col_size = size();
+    if (col_size != replicate_offsets.size())
+        LOG(FATAL) << "Size of offsets doesn't match size of column.";
+
+    MutableColumnPtr res = clone_empty();
+
+    if (0 == col_size)
+        return res;
+
+    ColumnArray & res_arr = assert_cast<ColumnArray &>(*res);
+
+    const ColumnString & src_string = typeid_cast<const ColumnString &>(*data);
+    const ColumnString::Chars & src_chars = src_string.get_chars();
+    const Offsets & src_string_offsets = src_string.get_offsets();
+    const Offsets & src_offsets = get_offsets();
+
+    ColumnString::Chars & res_chars = typeid_cast<ColumnString 
&>(res_arr.get_data()).get_chars();
+    Offsets & res_string_offsets = typeid_cast<ColumnString 
&>(res_arr.get_data()).get_offsets();
+    Offsets & res_offsets = res_arr.get_offsets();
+
+    res_chars.reserve(src_chars.size() / col_size * replicate_offsets.back());
+    res_string_offsets.reserve(src_string_offsets.size() / col_size * 
replicate_offsets.back());
+    res_offsets.reserve(replicate_offsets.back());
+
+    Offset prev_replicate_offset = 0;
+
+    Offset prev_src_offset = 0;
+    Offset prev_src_string_offset = 0;
+
+    Offset current_res_offset = 0;
+    Offset current_res_string_offset = 0;
+
+    for (size_t i = 0; i < col_size; ++i) {
+        /// How many times to replicate the array.
+        size_t size_to_replicate = replicate_offsets[i] - 
prev_replicate_offset;
+        /// The number of strings in the array.
+        size_t value_size = src_offsets[i] - prev_src_offset;
+        /// Number of characters in strings of the array, including zero bytes.
+        size_t sum_chars_size = src_string_offsets[prev_src_offset + 
value_size - 1] - prev_src_string_offset;  /// -1th index is Ok, see 
PaddedPODArray.
+
+        for (size_t j = 0; j < size_to_replicate; ++j) {
+            current_res_offset += value_size;
+            res_offsets.push_back(current_res_offset);
+
+            size_t prev_src_string_offset_local = prev_src_string_offset;
+            for (size_t k = 0; k < value_size; ++k) {
+                /// Size of single string.
+                size_t chars_size = src_string_offsets[k + prev_src_offset] - 
prev_src_string_offset_local;
+
+                current_res_string_offset += chars_size;
+                res_string_offsets.push_back(current_res_string_offset);
+
+                prev_src_string_offset_local += chars_size;
+            }
+
+            if (sum_chars_size) {
+                /// Copies the characters of the array of strings.
+                res_chars.resize(res_chars.size() + sum_chars_size);
+                memcpy_small_allow_read_write_overflow15(
+                    &res_chars[res_chars.size() - sum_chars_size], 
&src_chars[prev_src_string_offset], sum_chars_size);
+            }
+        }
+
+        prev_replicate_offset = replicate_offsets[i];
+        prev_src_offset = src_offsets[i];
+        prev_src_string_offset += sum_chars_size;
+    }
+
+    return res;
+}
+
+ColumnPtr ColumnArray::replicate_const(const Offsets & replicate_offsets) 
const {
+    size_t col_size = size();
+    if (col_size != replicate_offsets.size())
+        LOG(FATAL) << "Size of offsets doesn't match size of column.";
+
+    if (0 == col_size)
+        return clone_empty();
+
+    const Offsets & src_offsets = get_offsets();
+
+    auto res_column_offsets = ColumnOffsets::create();
+    Offsets & res_offsets = res_column_offsets->get_data();
+    res_offsets.reserve(replicate_offsets.back());
+
+    Offset prev_replicate_offset = 0;
+    Offset prev_data_offset = 0;
+    Offset current_new_offset = 0;
+
+    for (size_t i = 0; i < col_size; ++i) {
+        size_t size_to_replicate = replicate_offsets[i] - 
prev_replicate_offset;
+        size_t value_size = src_offsets[i] - prev_data_offset;
+
+        for (size_t j = 0; j < size_to_replicate; ++j) {
+            current_new_offset += value_size;
+            res_offsets.push_back(current_new_offset);
+        }
+
+        prev_replicate_offset = replicate_offsets[i];
+        prev_data_offset = src_offsets[i];
+    }
+
+    return ColumnArray::create(get_data().clone_resized(current_new_offset), 
std::move(res_column_offsets));
+}
+
+ColumnPtr ColumnArray::replicate_generic(const Offsets & replicate_offsets) 
const {
+    size_t col_size = size();
+    if (col_size != replicate_offsets.size())
+        LOG(FATAL) << "Size of offsets doesn't match size of column.";
+
+    MutableColumnPtr res = clone_empty();
+    ColumnArray & res_concrete = assert_cast<ColumnArray &>(*res);
+
+    if (0 == col_size)
+        return res;
+
+    IColumn::Offset prev_offset = 0;
+    for (size_t i = 0; i < col_size; ++i) {
+        size_t size_to_replicate = replicate_offsets[i] - prev_offset;
+        prev_offset = replicate_offsets[i];
+
+        for (size_t j = 0; j < size_to_replicate; ++j)
+            res_concrete.insert_from(*this, i);
+    }
+
+    return res;
+}
+
+ColumnPtr ColumnArray::replicate_nullable(const Offsets & replicate_offsets) 
const {
+    const ColumnNullable & nullable = assert_cast<const ColumnNullable 
&>(*data);
+
+    /// Make temporary arrays for each components of Nullable. Then replicate 
them independently and collect back to result.
+    /// NOTE Offsets are calculated twice and it is redundant.
+
+    auto array_of_nested = 
ColumnArray(nullable.get_nested_column_ptr()->assume_mutable(), 
get_offsets_ptr()->assume_mutable())
+            .replicate(replicate_offsets);
+    auto array_of_null_map = 
ColumnArray(nullable.get_null_map_column_ptr()->assume_mutable(), 
get_offsets_ptr()->assume_mutable())
+            .replicate(replicate_offsets);
+
+    return ColumnArray::create(
+        ColumnNullable::create(
+            assert_cast<const ColumnArray &>(*array_of_nested).get_data_ptr(),
+            assert_cast<const ColumnArray 
&>(*array_of_null_map).get_data_ptr()),
+        assert_cast<const ColumnArray &>(*array_of_nested).get_offsets_ptr());
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
new file mode 100644
index 0000000..f3b8b55
--- /dev/null
+++ b/be/src/vec/columns/column_array.h
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnArray.h
+// and modified by Doris
+
+#pragma once
+
+#include "vec/common/arena.h"
+#include "vec/common/assert_cast.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_impl.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+/** A column of array values.
+  * In memory, it is represented as one column of a nested type, whose size is 
equal to the sum of the sizes of all arrays,
+  *  and as an array of offsets in it, which allows you to get each element.
+  */
+class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
+private:
+    friend class COWHelper<IColumn, ColumnArray>;
+
+    /** Create an array column with specified values and offsets. */
+    ColumnArray(MutableColumnPtr && nested_column, MutableColumnPtr && 
offsets_column);
+
+    /** Create an empty column of arrays with the type of values as in the 
column `nested_column` */
+    explicit ColumnArray(MutableColumnPtr && nested_column);
+
+    ColumnArray(const ColumnArray &) = default;
+
+public:
+    /** Create immutable column using immutable arguments. This arguments may 
be shared with other columns.
+      * Use IColumn::mutate in order to make mutable column and mutate shared 
nested columns.
+      */
+    using Base = COWHelper<IColumn, ColumnArray>;
+
+    static Ptr create(const ColumnPtr & nested_column, const ColumnPtr & 
offsets_column) {
+        return ColumnArray::create(nested_column->assume_mutable(), 
offsets_column->assume_mutable());
+    }
+
+    static Ptr create(const ColumnPtr & nested_column) {
+        return ColumnArray::create(nested_column->assume_mutable());
+    }
+
+    template <typename ... Args, typename = typename 
std::enable_if<IsMutableColumns<Args ...>::value>::type>
+    static MutablePtr create(Args &&... args) { return 
Base::create(std::forward<Args>(args)...); }
+
+    /** On the index i there is an offset to the beginning of the i + 1 -th 
element. */
+    using ColumnOffsets = ColumnVector<Offset>;
+
+    std::string get_name() const override;
+    const char * get_family_name() const override { return "Array"; }
+    bool can_be_inside_nullable() const override { return true; }
+    TypeIndex get_data_type() const { return TypeIndex::Array; }
+    MutableColumnPtr clone_resized(size_t size) const override;
+    size_t size() const override;
+    Field operator[](size_t n) const override;
+    void get(size_t n, Field & res) const override;
+    StringRef get_data_at(size_t n) const override;
+    bool is_default_at(size_t n) const override;
+    void insert_data(const char * pos, size_t length) override;
+    StringRef serialize_value_into_arena(size_t n, Arena & arena, char const 
*& begin) const override;
+    const char * deserialize_and_insert_from_arena(const char * pos) override;
+    void update_hash_with_value(size_t n, SipHash & hash) const override;
+    void insert_range_from(const IColumn & src, size_t start, size_t length) 
override;
+    void insert(const Field & x) override;
+    void insert_from(const IColumn & src_, size_t n) override;
+    void insert_default() override;
+    void pop_back(size_t n) override;
+    ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const 
override;
+    [[noreturn]] ColumnPtr permute(const Permutation & perm, size_t limit) 
const override {
+        LOG(FATAL) << "permute not implemented";
+    }
+    //ColumnPtr index(const IColumn & indexes, size_t limit) const;
+    //template <typename Type> ColumnPtr index_impl(const PaddedPODArray<Type> 
& indexes, size_t limit) const;
+    [[noreturn]] int compare_at(size_t n, size_t m, const IColumn & rhs_, int 
nan_direction_hint) const override {
+        LOG(FATAL) << "compare_at not implemented";
+    }
+    [[noreturn]] void get_permutation(bool reverse, size_t limit, int 
nan_direction_hint, Permutation & res) const override {
+        LOG(FATAL) << "get_permutation not implemented";
+    }
+    void reserve(size_t n) override;
+    size_t byte_size() const override;
+    size_t allocated_bytes() const override;
+    void protect() override;
+    ColumnPtr replicate(const Offsets & replicate_offsets) const override;
+    ColumnPtr convert_to_full_column_if_const() const override;
+    void get_extremes(Field & min, Field & max) const override {
+        LOG(FATAL) << "get_extremes not implemented";
+    }
+
+    /** More efficient methods of manipulation */
+    IColumn & get_data() { return *data; }
+    const IColumn & get_data() const { return *data; }
+
+    IColumn & get_offsets_column() { return *offsets; }
+    const IColumn & get_offsets_column() const { return *offsets; }
+
+    Offsets & ALWAYS_INLINE get_offsets() {
+        return assert_cast<ColumnOffsets &>(*offsets).get_data();
+    }
+
+    const Offsets & ALWAYS_INLINE get_offsets() const {
+        return assert_cast<const ColumnOffsets &>(*offsets).get_data();
+    }
+
+    const ColumnPtr & get_data_ptr() const { return data; }
+    ColumnPtr & get_data_ptr() { return data; }
+
+    const ColumnPtr & get_offsets_ptr() const { return offsets; }
+    ColumnPtr & get_offsets_ptr() { return offsets; }
+
+    MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) 
const override {
+        return scatter_impl<ColumnArray>(num_columns, selector);
+    }
+
+    void for_each_subcolumn(ColumnCallback callback) override {
+        callback(offsets);
+        callback(data);
+    }
+
+    void insert_indices_from(const IColumn& src, const int* indices_begin, 
const int* indices_end) override;
+
+    void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) 
override {
+        LOG(FATAL) << "replace_column_data not implemented";
+    }
+    void replace_column_data_default(size_t self_row = 0) override {
+        LOG(FATAL) << "replace_column_data_default not implemented";
+    }
+
+private:
+    WrappedPtr data;
+    WrappedPtr offsets;
+
+    size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 
1]; }
+    size_t ALWAYS_INLINE size_at(ssize_t i) const { return get_offsets()[i] - 
get_offsets()[i - 1]; }
+
+
+    /// Multiply values if the nested column is ColumnVector<T>.
+    template <typename T>
+    ColumnPtr replicate_number(const Offsets & replicate_offsets) const;
+
+    /// Multiply the values if the nested column is ColumnString. The code is 
too complicated.
+    ColumnPtr replicate_string(const Offsets & replicate_offsets) const;
+
+    /** Non-constant arrays of constant values are quite rare.
+      * Most functions can not work with them, and does not create such 
columns as a result.
+      * An exception is the function `replicate` (see 
FunctionsMiscellaneous.h), which has service meaning for the implementation of 
lambda functions.
+      * Only for its sake is the implementation of the `replicate` method for 
ColumnArray(ColumnConst).
+      */
+    ColumnPtr replicate_const(const Offsets & replicate_offsets) const;
+
+    /** The following is done by simply replicating of nested columns.
+      */
+    ColumnPtr replicate_nullable(const Offsets & replicate_offsets) const;
+    ColumnPtr replicate_generic(const Offsets & replicate_offsets) const;
+
+
+    /// Specializations for the filter function.
+    template <typename T>
+    ColumnPtr filter_number(const Filter & filt, ssize_t result_size_hint) 
const;
+
+    ColumnPtr filter_string(const Filter & filt, ssize_t result_size_hint) 
const;
+    ColumnPtr filter_nullable(const Filter & filt, ssize_t result_size_hint) 
const;
+    ColumnPtr filter_generic(const Filter & filt, ssize_t result_size_hint) 
const;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 23022d3..831e9f7 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -44,6 +44,7 @@
 #include "vec/data_types/data_type_date.h"
 #include "vec/data_types/data_type_date_time.h"
 #include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_hll.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
@@ -51,79 +52,6 @@
 
 namespace doris::vectorized {
 
-inline DataTypePtr create_data_type(const PColumnMeta& pcolumn_meta) {
-    switch (pcolumn_meta.type()) {
-    case PGenericType::UINT8: {
-        return std::make_shared<DataTypeUInt8>();
-    }
-    case PGenericType::UINT16: {
-        return std::make_shared<DataTypeUInt16>();
-    }
-    case PGenericType::UINT32: {
-        return std::make_shared<DataTypeUInt32>();
-    }
-    case PGenericType::UINT64: {
-        return std::make_shared<DataTypeUInt64>();
-    }
-    case PGenericType::UINT128: {
-        return std::make_shared<DataTypeUInt128>();
-    }
-    case PGenericType::INT8: {
-        return std::make_shared<DataTypeInt8>();
-    }
-    case PGenericType::INT16: {
-        return std::make_shared<DataTypeInt16>();
-    }
-    case PGenericType::INT32: {
-        return std::make_shared<DataTypeInt32>();
-    }
-    case PGenericType::INT64: {
-        return std::make_shared<DataTypeInt64>();
-    }
-    case PGenericType::INT128: {
-        return std::make_shared<DataTypeInt128>();
-    }
-    case PGenericType::FLOAT: {
-        return std::make_shared<DataTypeFloat32>();
-    }
-    case PGenericType::DOUBLE: {
-        return std::make_shared<DataTypeFloat64>();
-    }
-    case PGenericType::STRING: {
-        return std::make_shared<DataTypeString>();
-    }
-    case PGenericType::DATE: {
-        return std::make_shared<DataTypeDate>();
-    }
-    case PGenericType::DATETIME: {
-        return std::make_shared<DataTypeDateTime>();
-    }
-    case PGenericType::DECIMAL32: {
-        return std::make_shared<DataTypeDecimal<Decimal32>>(
-                pcolumn_meta.decimal_param().precision(), 
pcolumn_meta.decimal_param().scale());
-    }
-    case PGenericType::DECIMAL64: {
-        return std::make_shared<DataTypeDecimal<Decimal64>>(
-                pcolumn_meta.decimal_param().precision(), 
pcolumn_meta.decimal_param().scale());
-    }
-    case PGenericType::DECIMAL128: {
-        return std::make_shared<DataTypeDecimal<Decimal128>>(
-                pcolumn_meta.decimal_param().precision(), 
pcolumn_meta.decimal_param().scale());
-    }
-    case PGenericType::BITMAP: {
-        return std::make_shared<DataTypeBitMap>();
-    }
-    case PGenericType::HLL: {
-        return std::make_shared<DataTypeHLL>();
-    }
-    default: {
-        LOG(FATAL) << fmt::format("Unknown data type: {}, data type name: {}", 
pcolumn_meta.type(),
-                                  
PGenericType_TypeId_Name(pcolumn_meta.type()));
-        return nullptr;
-    }
-    }
-}
-
 Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
     initialize_index_by_name();
 }
@@ -153,14 +81,8 @@ Block::Block(const PBlock& pblock) {
     }
 
     for (const auto& pcol_meta : pblock.column_metas()) {
-        DataTypePtr type = create_data_type(pcol_meta);
-        MutableColumnPtr data_column;
-        if (pcol_meta.is_nullable()) {
-            data_column = ColumnNullable::create(type->create_column(), 
ColumnUInt8::create());
-            type = make_nullable(type);
-        } else {
-            data_column = type->create_column();
-        }
+        DataTypePtr type = 
DataTypeFactory::instance().create_data_type(pcol_meta);
+        MutableColumnPtr data_column = type->create_column();
         buf = type->deserialize(buf, data_column.get());
         data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
     }
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index a5a39b6..ddabef8 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 
+#include "gen_cpp/data.pb.h"
 #include "util/binary_cast.hpp"
 #include "util/bitmap_value.h"
 #include "olap/hll.h"
diff --git a/be/src/vec/data_types/data_type.cpp 
b/be/src/vec/data_types/data_type.cpp
index 0af43a1..d573729 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -23,18 +23,8 @@
 #include <fmt/format.h>
 
 #include "common/logging.h"
-#include "olap/olap_common.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
-#include "vec/data_types/data_type_bitmap.h"
-#include "vec/data_types/data_type_date.h"
-#include "vec/data_types/data_type_date_time.h"
-#include "vec/data_types/data_type_decimal.h"
-#include "vec/data_types/data_type_nothing.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/data_types/data_type_string.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_hll.h"
 
 namespace doris::vectorized {
 
@@ -143,132 +133,11 @@ PGenericType_TypeId IDataType::get_pdata_type(const 
IDataType* data_type) {
         return PGenericType::BITMAP;
     case TypeIndex::HLL:
         return PGenericType::HLL;
+    case TypeIndex::Array:
+        return PGenericType::LIST;
     default:
         return PGenericType::UNKNOWN;
     }
 }
 
-DataTypePtr IDataType::from_thrift(const doris::PrimitiveType& type, const 
bool is_nullable){
-    DataTypePtr result;
-    switch (type) {
-        case TYPE_BOOLEAN:
-            result = std::make_shared<DataTypeUInt8>();
-            break;
-        case TYPE_TINYINT:
-            result = std::make_shared<DataTypeInt8>();
-            break;
-        case TYPE_SMALLINT:
-            result = std::make_shared<DataTypeInt16>();
-            break;
-        case TYPE_INT:
-            result = std::make_shared<DataTypeInt32>();
-            break;
-        case TYPE_FLOAT:
-            result = std::make_shared<DataTypeFloat32>();
-            break;
-        case TYPE_BIGINT:
-            result = std::make_shared<DataTypeInt64>();
-            break;
-        case TYPE_LARGEINT:
-            result = std::make_shared<DataTypeInt128>();
-            break;
-        case TYPE_DATE:
-            result = std::make_shared<DataTypeDate>();
-            break;
-        case TYPE_DATETIME:
-            result = std::make_shared<DataTypeDateTime>();
-            break;
-        case TYPE_TIME:
-        case TYPE_DOUBLE:
-            result = std::make_shared<DataTypeFloat64>();
-            break;
-        case TYPE_CHAR:
-        case TYPE_VARCHAR:
-        case TYPE_STRING:
-            result = std::make_shared<DataTypeString>();
-            break;
-        case TYPE_HLL:
-            result = std::make_shared<DataTypeHLL>();
-            break;        
-        case TYPE_OBJECT:
-            result = std::make_shared<DataTypeBitMap>();
-            break;
-        case TYPE_DECIMALV2:
-            result = std::make_shared<DataTypeDecimal<Decimal128>>(27, 9);
-            break;
-        case TYPE_NULL:
-            result = std::make_shared<DataTypeNothing>();
-            break;
-        case INVALID_TYPE:
-        default:
-            DCHECK(false);
-            result = nullptr;
-            break;
-    }
-    if (is_nullable) {
-        result = std::make_shared<DataTypeNullable>(result);
-    }
-
-    return result;
-}
-
-DataTypePtr IDataType::from_olap_engine(const doris::FieldType & type, const 
_Bool is_nullable) {
-    DataTypePtr result;
-    switch (type) {
-        case OLAP_FIELD_TYPE_BOOL:
-            result = std::make_shared<DataTypeUInt8>();
-            break;
-        case OLAP_FIELD_TYPE_TINYINT:
-            result = std::make_shared<DataTypeInt8>();
-            break;
-        case OLAP_FIELD_TYPE_SMALLINT:
-            result = std::make_shared<DataTypeInt16>();
-            break;
-        case OLAP_FIELD_TYPE_INT:
-            result = std::make_shared<DataTypeInt32>();
-            break;
-        case OLAP_FIELD_TYPE_FLOAT:
-            result = std::make_shared<DataTypeFloat32>();
-            break;
-        case OLAP_FIELD_TYPE_BIGINT:
-            result = std::make_shared<DataTypeInt64>();
-            break;
-        case OLAP_FIELD_TYPE_LARGEINT:
-            result = std::make_shared<DataTypeInt128>();
-            break;
-        case OLAP_FIELD_TYPE_DATE:
-            result = std::make_shared<DataTypeDate>();
-            break;
-        case OLAP_FIELD_TYPE_DATETIME:
-            result = std::make_shared<DataTypeDateTime>();
-            break;
-        case OLAP_FIELD_TYPE_DOUBLE:
-            result = std::make_shared<DataTypeFloat64>();
-            break;
-        case OLAP_FIELD_TYPE_CHAR:
-        case OLAP_FIELD_TYPE_VARCHAR:
-        case OLAP_FIELD_TYPE_STRING:
-            result = std::make_shared<DataTypeString>();
-            break;
-        case OLAP_FIELD_TYPE_HLL:
-            result = std::make_shared<DataTypeHLL>();
-            break;        
-        case OLAP_FIELD_TYPE_OBJECT:
-            result = std::make_shared<DataTypeBitMap>();
-            break;
-        case OLAP_FIELD_TYPE_DECIMAL:
-            result = std::make_shared<DataTypeDecimal<Decimal128>>(27, 9);
-            break;
-
-        default:
-            DCHECK(false) << "Invalid olap engine type";
-            result = nullptr;
-            break;
-    }
-    if (is_nullable) {
-        result = std::make_shared<DataTypeNullable>(result);
-    }
-
-    return result;
-}
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type.h 
b/be/src/vec/data_types/data_type.h
index dd8cb35..5e49fa9 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -241,8 +241,6 @@ public:
     virtual void to_pb_column_meta(PColumnMeta* col_meta) const;
 
     static PGenericType_TypeId get_pdata_type(const IDataType* data_type);
-    static DataTypePtr from_thrift(const doris::PrimitiveType& type, const 
bool is_nullable = true);
-    static DataTypePtr from_olap_engine(const doris::FieldType& type, const 
bool is_nullable = true);
 
 private:
     friend class DataTypeFactory;
diff --git a/be/src/vec/data_types/data_type_array.cpp 
b/be/src/vec/data_types/data_type_array.cpp
new file mode 100644
index 0000000..b10c5ca
--- /dev/null
+++ b/be/src/vec/data_types/data_type_array.cpp
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/DataTypes/DataTypeArray.h
+// and modified by Doris
+
+#include "vec/data_types/data_type_array.h"
+
+#include "gen_cpp/data.pb.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+namespace ErrorCodes {
+    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+
+DataTypeArray::DataTypeArray(const DataTypePtr & nested_)
+    : nested{nested_} {
+}
+
+MutableColumnPtr DataTypeArray::create_column() const {
+    return ColumnArray::create(nested->create_column(), 
ColumnArray::ColumnOffsets::create());
+}
+
+Field DataTypeArray::get_default() const {
+    return Array();
+}
+
+bool DataTypeArray::equals(const IDataType & rhs) const {
+    return typeid(rhs) == typeid(*this) && nested->equals(*static_cast<const 
DataTypeArray &>(rhs).nested);
+}
+
+size_t DataTypeArray::get_number_of_dimensions() const {
+    const DataTypeArray * nested_array = typeid_cast<const DataTypeArray 
*>(nested.get());
+    if (!nested_array)
+        return 1;
+    return 1 + nested_array->get_number_of_dimensions();   /// Every modern 
C++ compiler optimizes tail recursion.
+}
+
+int64_t DataTypeArray::get_uncompressed_serialized_bytes(const IColumn& 
column) const {
+    auto ptr = column.convert_to_full_column_if_const();
+    const auto& data_column = assert_cast<const ColumnArray&>(*ptr.get());
+    return sizeof(IColumn::Offset) * (column.size() + 1) +
+               
get_nested_type()->get_uncompressed_serialized_bytes(data_column.get_data());
+}
+
+char* DataTypeArray::serialize(const IColumn& column, char* buf) const {
+    auto ptr = column.convert_to_full_column_if_const();
+    const auto& data_column = assert_cast<const ColumnArray&>(*ptr.get());
+
+    // column num
+    *reinterpret_cast<uint32_t*>(buf) = column.size();
+    buf += sizeof(IColumn::Offset);
+    // offsets
+    memcpy(buf, data_column.get_offsets().data(), column.size() * 
sizeof(IColumn::Offset));
+    buf += column.size() * sizeof(IColumn::Offset);
+    // children
+    return get_nested_type()->serialize(data_column.get_data(), buf);
+}
+
+const char* DataTypeArray::deserialize(const char* buf, IColumn* column) const 
{
+    auto* data_column = assert_cast<ColumnArray*>(column);
+    auto& offsets = data_column->get_offsets();
+
+    // column num
+    uint32_t column_num = *reinterpret_cast<const IColumn::Offset*>(buf);
+    buf += sizeof(IColumn::Offset);
+    // offsets
+    offsets.resize(column_num);
+    memcpy(offsets.data(), buf, sizeof(IColumn::Offset) * column_num);
+    buf += sizeof(IColumn::Offset) * column_num;
+    // children
+    return get_nested_type()->deserialize(buf, 
data_column->get_data_ptr()->assume_mutable());
+}
+
+void DataTypeArray::to_pb_column_meta(PColumnMeta* col_meta) const {
+    IDataType::to_pb_column_meta(col_meta);
+    auto children = col_meta->add_children();
+    get_nested_type()->to_pb_column_meta(children);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_array.h 
b/be/src/vec/data_types/data_type_array.h
new file mode 100644
index 0000000..a389bda
--- /dev/null
+++ b/be/src/vec/data_types/data_type_array.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/DataTypes/DataTypeArray.h
+// and modified by Doris
+
+#pragma once
+
+#include "vec/data_types/data_type.h"
+#include "vec/columns/column_array.h"
+
+namespace doris::vectorized {
+
+class DataTypeArray final : public IDataType {
+private:
+    /// The type of array elements.
+    DataTypePtr nested;
+
+public:
+    static constexpr bool is_parametric = true;
+
+    DataTypeArray(const DataTypePtr & nested_);
+
+    TypeIndex get_type_id() const override { return TypeIndex::Array; }
+
+    std::string do_get_name() const override { return "Array(" + 
nested->get_name() + ")"; }
+
+    const char * get_family_name() const override { return "Array"; }
+
+    bool can_be_inside_nullable() const override { return true; }
+
+    MutableColumnPtr create_column() const override;
+
+    Field get_default() const override;
+
+    bool equals(const IDataType & rhs) const override;
+
+    bool get_is_parametric() const override { return true; }
+    bool have_subtypes() const override { return true; }
+    bool cannot_be_stored_in_tables() const override { return 
nested->cannot_be_stored_in_tables(); }
+    bool text_can_contain_only_valid_utf8() const override { return 
nested->text_can_contain_only_valid_utf8(); }
+    bool is_comparable() const override { return nested->is_comparable(); }
+    bool can_be_compared_with_collation() const override { return 
nested->can_be_compared_with_collation(); }
+
+    bool is_value_unambiguously_represented_in_contiguous_memory_region() 
const override {
+        return 
nested->is_value_unambiguously_represented_in_contiguous_memory_region();
+    }
+
+    //SerializationPtr doGetDefaultSerialization() const override;
+
+    const DataTypePtr & get_nested_type() const { return nested; }
+
+    /// 1 for plain array, 2 for array of arrays and so on.
+    size_t get_number_of_dimensions() const;
+
+    int64_t get_uncompressed_serialized_bytes(const IColumn& column) const 
override;
+    char* serialize(const IColumn& column, char* buf) const override;
+    const char* deserialize(const char* buf, IColumn* column) const override;
+
+    void to_pb_column_meta(PColumnMeta* col_meta) const override;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
new file mode 100644
index 0000000..44e9e78
--- /dev/null
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/DataTypes/DataTypeFactory.cpp
+// and modified by Doris
+
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris::vectorized {
+
+DataTypePtr DataTypeFactory::create_data_type(const doris::Field& col_desc) {
+    DataTypePtr nested = nullptr;
+    if (col_desc.type() == OLAP_FIELD_TYPE_ARRAY) {
+        DCHECK(col_desc.get_sub_field_count() == 1);
+        nested = 
std::make_shared<DataTypeArray>(create_data_type(*col_desc.get_sub_field(0)));
+    } else {
+        nested = _create_primitive_data_type(col_desc.type());
+    }
+
+    if (col_desc.is_nullable() && nested) {
+        return std::make_shared<DataTypeNullable>(std::move(nested));
+    }
+    return nested;
+}
+
+DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc) {
+    DataTypePtr nested = nullptr;
+    if (col_desc.type() == OLAP_FIELD_TYPE_ARRAY) {
+        DCHECK(col_desc.get_subtype_count() == 1);
+        nested = 
std::make_shared<DataTypeArray>(create_data_type(col_desc.get_sub_column(0)));
+    } else {
+        nested = _create_primitive_data_type(col_desc.type());
+    }
+
+    if (col_desc.is_nullable() && nested) {
+        return std::make_shared<DataTypeNullable>(std::move(nested));
+    }
+    return nested;
+}
+
+DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, 
bool is_nullable) {
+    DataTypePtr nested = nullptr;
+    switch (col_desc.type) {
+    case TYPE_BOOLEAN:
+        nested = std::make_shared<vectorized::DataTypeUInt8>();
+        break;
+    case TYPE_TINYINT:
+        nested = std::make_shared<vectorized::DataTypeInt8>();
+        break;
+    case TYPE_SMALLINT:
+        nested = std::make_shared<vectorized::DataTypeInt16>();
+        break;
+    case TYPE_INT:
+        nested = std::make_shared<vectorized::DataTypeInt32>();
+        break;
+    case TYPE_FLOAT:
+        nested = std::make_shared<vectorized::DataTypeFloat32>();
+        break;
+    case TYPE_BIGINT:
+        nested = std::make_shared<vectorized::DataTypeInt64>();
+        break;
+    case TYPE_LARGEINT:
+        nested = std::make_shared<vectorized::DataTypeInt128>();
+        break;
+    case TYPE_DATE:
+        nested = std::make_shared<vectorized::DataTypeDate>();
+        break;
+    case TYPE_DATETIME:
+        nested = std::make_shared<vectorized::DataTypeDateTime>();
+        break;
+    case TYPE_TIME:
+    case TYPE_DOUBLE:
+        nested = std::make_shared<vectorized::DataTypeFloat64>();
+        break;
+    case TYPE_STRING:
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_HLL:
+        nested = std::make_shared<vectorized::DataTypeString>();
+        break;
+    case TYPE_OBJECT:
+        nested = std::make_shared<vectorized::DataTypeBitMap>();
+        break;
+    case TYPE_DECIMALV2:
+        nested = 
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
+        break;
+    // Just Mock A NULL Type in Vec Exec Engine
+    case TYPE_NULL:
+        nested = std::make_shared<vectorized::DataTypeUInt8>();
+        break;
+    case TYPE_ARRAY:
+        DCHECK(col_desc.children.size() == 1);
+        nested = 
std::make_shared<vectorized::DataTypeArray>(create_data_type(col_desc.children[0],
 false));
+        break;
+    case INVALID_TYPE:
+    default:
+        DCHECK(false) << "invalid PrimitiveType:" << (int)col_desc.type;
+        break;
+    }
+
+    if (nested && is_nullable) {
+        return 
std::make_shared<vectorized::DataTypeNullable>(std::move(nested));
+    }
+    return nested;
+}
+
+DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& 
type) const {
+    DataTypePtr result = nullptr;
+    switch (type) {
+        case OLAP_FIELD_TYPE_BOOL:
+            result = std::make_shared<vectorized::DataTypeUInt8>();
+            break;
+        case OLAP_FIELD_TYPE_TINYINT:
+            result = std::make_shared<vectorized::DataTypeInt8>();
+            break;
+        case OLAP_FIELD_TYPE_SMALLINT:
+            result = std::make_shared<vectorized::DataTypeInt16>();
+            break;
+        case OLAP_FIELD_TYPE_INT:
+            result = std::make_shared<vectorized::DataTypeInt32>();
+            break;
+        case OLAP_FIELD_TYPE_FLOAT:
+            result = std::make_shared<vectorized::DataTypeFloat32>();
+            break;
+        case OLAP_FIELD_TYPE_BIGINT:
+            result = std::make_shared<vectorized::DataTypeInt64>();
+            break;
+        case OLAP_FIELD_TYPE_LARGEINT:
+            result = std::make_shared<vectorized::DataTypeInt128>();
+            break;
+        case OLAP_FIELD_TYPE_DATE:
+            result = std::make_shared<vectorized::DataTypeDate>();
+            break;
+        case OLAP_FIELD_TYPE_DATETIME:
+            result = std::make_shared<vectorized::DataTypeDateTime>();
+            break;
+        case OLAP_FIELD_TYPE_DOUBLE:
+            result = std::make_shared<vectorized::DataTypeFloat64>();
+            break;
+        case OLAP_FIELD_TYPE_CHAR:
+        case OLAP_FIELD_TYPE_VARCHAR:
+        case OLAP_FIELD_TYPE_HLL:
+        case OLAP_FIELD_TYPE_STRING:
+            result = std::make_shared<vectorized::DataTypeString>();
+            break;
+        case OLAP_FIELD_TYPE_OBJECT:
+            result = std::make_shared<vectorized::DataTypeBitMap>();
+            break;
+        case OLAP_FIELD_TYPE_DECIMAL:
+            result = 
std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>(27, 9);
+            break;
+        default:
+            DCHECK(false) << "Invalid FieldType:" << (int)type;
+            result = nullptr;
+            break;
+    }
+    return result;    
+}
+
+DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
+    DataTypePtr nested = nullptr;
+    switch (pcolumn.type()) {
+    case PGenericType::UINT8:
+        nested = std::make_shared<DataTypeUInt8>();
+        break;
+    case PGenericType::UINT16:
+        nested = std::make_shared<DataTypeUInt16>();
+        break;
+    case PGenericType::UINT32:
+        nested = std::make_shared<DataTypeUInt32>();
+        break;
+    case PGenericType::UINT64:
+        nested = std::make_shared<DataTypeUInt64>();
+        break;
+    case PGenericType::UINT128:
+        nested = std::make_shared<DataTypeUInt128>();
+        break;
+    case PGenericType::INT8:
+        nested = std::make_shared<DataTypeInt8>();
+        break;
+    case PGenericType::INT16:
+        nested = std::make_shared<DataTypeInt16>();
+        break;
+    case PGenericType::INT32:
+        nested = std::make_shared<DataTypeInt32>();
+        break;
+    case PGenericType::INT64:
+        nested = std::make_shared<DataTypeInt64>();
+        break;
+    case PGenericType::INT128:
+        nested = std::make_shared<DataTypeInt128>();
+        break;
+    case PGenericType::FLOAT:
+        nested = std::make_shared<DataTypeFloat32>();
+        break;
+    case PGenericType::DOUBLE:
+        nested = std::make_shared<DataTypeFloat64>();
+        break;
+    case PGenericType::STRING:
+        nested = std::make_shared<DataTypeString>();
+        break;
+    case PGenericType::DATE:
+        nested = std::make_shared<DataTypeDate>();
+        break;
+    case PGenericType::DATETIME:
+        nested = std::make_shared<DataTypeDateTime>();
+        break;
+    case PGenericType::DECIMAL32:
+        nested = 
std::make_shared<DataTypeDecimal<Decimal32>>(pcolumn.decimal_param().precision(),
+                                                            
pcolumn.decimal_param().scale());
+        break;
+    case PGenericType::DECIMAL64:
+        nested = 
std::make_shared<DataTypeDecimal<Decimal64>>(pcolumn.decimal_param().precision(),
+                                                            
pcolumn.decimal_param().scale());
+        break;
+    case PGenericType::DECIMAL128:
+        nested = 
std::make_shared<DataTypeDecimal<Decimal128>>(pcolumn.decimal_param().precision(),
+                                                             
pcolumn.decimal_param().scale());
+        break;
+    case PGenericType::BITMAP:
+        nested = std::make_shared<DataTypeBitMap>();
+        break;
+    case PGenericType::LIST:
+        DCHECK(pcolumn.children_size() == 1);
+        nested = 
std::make_shared<DataTypeArray>(std::move(create_data_type(pcolumn.children(0))));
+        break;
+    default: {
+        LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
+        return nullptr;
+    }
+    }
+
+    if (nested && pcolumn.is_nullable() > 0) {
+        return 
std::make_shared<vectorized::DataTypeNullable>(std::move(nested));
+    }
+    return nested;
+}
+
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_factory.hpp 
b/be/src/vec/data_types/data_type_factory.hpp
index e06a962..7834b12 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -22,7 +22,14 @@
 #include <mutex>
 #include <string>
 
+#include "gen_cpp/data.pb.h"
+#include "olap/field.h"
+#include "olap/tablet_schema.h"
+#include "runtime/types.h"
+
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_bitmap.h"
 #include "vec/data_types/data_type_date.h"
 #include "vec/data_types/data_type_date_time.h"
 #include "vec/data_types/data_type_decimal.h"
@@ -74,7 +81,16 @@ public:
         return _empty_string;
     }
 
+    DataTypePtr create_data_type(const doris::Field& col_desc);
+    DataTypePtr create_data_type(const TabletColumn& col_desc);
+
+    DataTypePtr create_data_type(const TypeDescriptor& col_desc, bool 
is_nullable = true);
+
+    DataTypePtr create_data_type(const PColumnMeta& pcolumn);
+
 private:
+    DataTypePtr _create_primitive_data_type(const FieldType& type) const;
+
     void regist_data_type(const std::string& name, const DataTypePtr& 
data_type) {
         _data_type_map.emplace(name, data_type);
         _invert_data_type_map.emplace_back(data_type, name);
diff --git a/be/src/vec/data_types/data_type_number_base.h 
b/be/src/vec/data_types/data_type_number_base.h
index 1ffdfb5..7b6266e 100644
--- a/be/src/vec/data_types/data_type_number_base.h
+++ b/be/src/vec/data_types/data_type_number_base.h
@@ -20,6 +20,7 @@
 
 #pragma once
 
+#include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/types.h"
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp 
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 0987190..c7e6731 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -23,6 +23,7 @@
 #include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/materialize_block.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
 
@@ -38,11 +39,11 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc)
           _exec_timer(nullptr),
           _merge_timer(nullptr),
           _expr_timer(nullptr) {
+        bool nullable = true;
         if (desc.__isset.is_nullable) {
-          _data_type = IDataType::from_thrift(_return_type.type, 
desc.is_nullable);
-        } else {
-          _data_type = IDataType::from_thrift(_return_type.type);
+            nullable = desc.is_nullable;
         }
+        _data_type = 
DataTypeFactory::instance().create_data_type(_return_type, nullable);
     }
 
 Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, 
AggFnEvaluator** result) {
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 066808c..6757b74 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -23,6 +23,7 @@
 
 #include "exprs/anyval_util.h"
 #include "gen_cpp/Exprs_types.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/exprs/vcase_expr.h"
 #include "vec/exprs/vcast_expr.h"
 #include "vec/exprs/vcompound_pred.h"
@@ -46,11 +47,12 @@ VExpr::VExpr(const doris::TExprNode& node)
     if (node.__isset.fn) {
         _fn = node.fn;
     }
+
+    bool is_nullable = true;
     if (node.__isset.is_nullable) {
-        _data_type = IDataType::from_thrift(_type.type, node.is_nullable);
-    } else {
-        _data_type = IDataType::from_thrift(_type.type);
+        is_nullable = node.is_nullable;
     }
+    _data_type = DataTypeFactory::instance().create_data_type(_type, 
is_nullable);
 }
 
 VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
@@ -58,7 +60,8 @@ VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, 
bool is_nullable)
     if (is_slotref) {
         _node_type = TExprNodeType::SLOT_REF;
     }
-    _data_type = IDataType::from_thrift(_type.type, is_nullable);
+
+    _data_type = DataTypeFactory::instance().create_data_type(_type, 
is_nullable);
 }
 
 Status VExpr::prepare(RuntimeState* state, const RowDescriptor& row_desc, 
VExprContext* context) {
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index f0c4081..e99d0f5 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -139,7 +139,7 @@ public:
             const auto& column_ids = schema.column_ids();
             for (size_t i = 0; i < schema.num_column_ids(); ++i) {
                 auto column_desc = schema.column(column_ids[i]);
-                auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
+                auto data_type = Schema::get_data_type_ptr(*column_desc);
                 if (data_type == nullptr) {
                     return Status::RuntimeError("invalid data type");
                 }
diff --git a/be/src/vec/sink/mysql_result_writer.cpp 
b/be/src/vec/sink/mysql_result_writer.cpp
index 4a8f72d..e4fc56b 100644
--- a/be/src/vec/sink/mysql_result_writer.cpp
+++ b/be/src/vec/sink/mysql_result_writer.cpp
@@ -54,7 +54,8 @@ void VMysqlResultWriter::_init_profile() {
 
 template <PrimitiveType type, bool is_nullable>
 Status VMysqlResultWriter::_add_one_column(const ColumnPtr& column_ptr,
-                                           std::unique_ptr<TFetchDataResult>& 
result) {
+                                           std::unique_ptr<TFetchDataResult>& 
result,
+                                           const DataTypePtr& nested_type_ptr) 
{
     SCOPED_TIMER(_convert_tuple_timer);
 
     const auto column_size = column_ptr->size();
@@ -105,6 +106,37 @@ Status VMysqlResultWriter::_add_one_column(const 
ColumnPtr& column_ptr,
 
             result->result_batch.rows[i].append(_buffer.buf(), 
_buffer.length());
         }
+    } else if constexpr (type == TYPE_ARRAY) {
+        auto& array_column = assert_cast<const ColumnArray&>(*column);
+        auto& offsets = array_column.get_offsets();
+        for (int i = 0; i < column_size; ++i) {
+            if (0 != buf_ret) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
+            _buffer.reset();
+
+            if constexpr (is_nullable) {
+                if (column_ptr->is_null_at(i)) {
+                    buf_ret = _buffer.push_null();
+                    result->result_batch.rows[i].append(_buffer.buf(), 
_buffer.length());
+                    continue;
+                }
+            }
+
+            _buffer.open_dynamic_mode();
+            buf_ret = _buffer.push_string("[", 1);
+            bool begin = true;
+            for (int j = offsets[i - 1]; j < offsets[i]; ++j) {
+                if (!begin) {
+                    buf_ret = _buffer.push_string(", ", 2);
+                }
+                buf_ret = _add_one_cell(array_column.get_data_ptr(), j, 
nested_type_ptr, _buffer);
+                begin = false;
+            }
+            buf_ret = _buffer.push_string("]", 1);
+            _buffer.close_dynamic_mode();
+            result->result_batch.rows[i].append(_buffer.buf(), 
_buffer.length());
+        }
     } else {
         using ColumnType = typename PrimitiveTypeTraits<type>::ColumnType;
         auto& data = assert_cast<const ColumnType&>(*column).get_data();
@@ -178,6 +210,67 @@ Status VMysqlResultWriter::_add_one_column(const 
ColumnPtr& column_ptr,
     return Status::OK();
 }
 
+int VMysqlResultWriter::_add_one_cell(const ColumnPtr& column_ptr, size_t 
row_idx,
+                                         const DataTypePtr& type, 
MysqlRowBuffer& buffer) {
+    WhichDataType which(type->get_type_id());
+    if (which.is_nullable() && column_ptr->is_null_at(row_idx)) {
+        return buffer.push_null();
+    }
+
+    ColumnPtr column;
+    if (which.is_nullable()) {
+        column = assert_cast<const 
ColumnNullable&>(*column_ptr).get_nested_column_ptr();
+        which = WhichDataType(assert_cast<const 
DataTypeNullable&>(*type).get_nested_type());
+    } else {
+        column = column_ptr;
+    }
+
+    if (which.is_uint8()) {
+        auto& data = assert_cast<const ColumnUInt8&>(*column).get_data();
+        return buffer.push_tinyint(data[row_idx]);
+    } else if (which.is_int8()) {
+        auto& data = assert_cast<const ColumnInt8&>(*column).get_data();
+        return buffer.push_tinyint(data[row_idx]);
+    } else if (which.is_int16()) {
+        auto& data = assert_cast<const ColumnInt16&>(*column).get_data();
+        return buffer.push_smallint(data[row_idx]);
+    } else if (which.is_int32()) {
+        auto& data = assert_cast<const ColumnInt32&>(*column).get_data();
+        return buffer.push_int(data[row_idx]);
+    } else if (which.is_int64()) {
+        auto& data = assert_cast<const ColumnInt64&>(*column).get_data();
+        return buffer.push_bigint(data[row_idx]);
+    } else if (which.is_int128()) {
+        auto& data = assert_cast<const ColumnInt128&>(*column).get_data();
+        auto v = LargeIntValue::to_string(data[row_idx]);
+        return buffer.push_string(v.c_str(), v.size());
+    } else if (which.is_float32()) {
+        auto& data = assert_cast<const ColumnFloat32&>(*column).get_data();
+        return buffer.push_float(data[row_idx]);
+    } else if (which.is_float64()) {
+        auto& data = assert_cast<const ColumnFloat64&>(*column).get_data();
+        return buffer.push_double(data[row_idx]);
+    } else if (which.is_string()) {
+        int buf_ret = 0;
+        const auto string_val = column->get_data_at(row_idx);
+        if (string_val.data == nullptr) {
+            if (string_val.size == 0) {
+                // 0x01 is a magic num, not useful actually, just for present 
""
+                char* tmp_val = reinterpret_cast<char*>(0x01);
+                buf_ret = buffer.push_string(tmp_val, string_val.size);
+            } else {
+                buf_ret = buffer.push_null();
+            }
+        } else {
+            buf_ret = buffer.push_string(string_val.data, string_val.size);
+        }
+        return buf_ret;
+    } else {
+        LOG(WARNING) << "sub TypeIndex(" << (int)which.idx << "not supported 
yet";
+        return -1;
+    }
+}
+
 Status VMysqlResultWriter::append_row_batch(const RowBatch* batch) {
     return Status::RuntimeError("Not Implemented 
MysqlResultWriter::append_row_batch scalar");
 }
@@ -313,6 +406,17 @@ Status VMysqlResultWriter::append_block(Block& 
input_block) {
             }
             break;
         }
+        case TYPE_ARRAY: {
+            if (type_ptr->is_nullable()) {
+                auto& nested_type = assert_cast<const 
DataTypeNullable&>(*type_ptr).get_nested_type();
+                auto& sub_type = assert_cast<const 
DataTypeArray&>(*nested_type).get_nested_type();
+                status = _add_one_column<PrimitiveType::TYPE_ARRAY, 
true>(column_ptr, result, sub_type);
+            } else {
+                auto& sub_type = assert_cast<const 
DataTypeArray&>(*type_ptr).get_nested_type();
+                status = _add_one_column<PrimitiveType::TYPE_ARRAY, 
false>(column_ptr, result, sub_type);
+            }
+            break;
+        }
         default: {
             LOG(WARNING) << "can't convert this type to mysql type. type = "
                          << _output_vexpr_ctxs[i]->root()->type();
diff --git a/be/src/vec/sink/mysql_result_writer.h 
b/be/src/vec/sink/mysql_result_writer.h
index bbd7425..5a4d490 100644
--- a/be/src/vec/sink/mysql_result_writer.h
+++ b/be/src/vec/sink/mysql_result_writer.h
@@ -49,7 +49,8 @@ private:
     void _init_profile();
 
     template <PrimitiveType type, bool is_nullable>
-    Status _add_one_column(const ColumnPtr& column_ptr, 
std::unique_ptr<TFetchDataResult>& result);
+    Status _add_one_column(const ColumnPtr& column_ptr, 
std::unique_ptr<TFetchDataResult>& result, const DataTypePtr& nested_type_ptr = 
nullptr);
+    int _add_one_cell(const ColumnPtr& column_ptr, size_t row_idx, const 
DataTypePtr& type, MysqlRowBuffer& buffer);
 
 private:
     BufferControlBlock* _sinker;
diff --git a/be/test/vec/core/CMakeLists.txt b/be/test/vec/core/CMakeLists.txt
index cdcf5c2..8df7678 100644
--- a/be/test/vec/core/CMakeLists.txt
+++ b/be/test/vec/core/CMakeLists.txt
@@ -19,6 +19,7 @@
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/vec/core")
 
 ADD_BE_TEST(block_test)
+ADD_BE_TEST(column_array_test)
 ADD_BE_TEST(column_complex_test)
 ADD_BE_TEST(column_nullable_test)
 
diff --git a/be/test/vec/core/column_array_test.cpp 
b/be/test/vec/core/column_array_test.cpp
new file mode 100644
index 0000000..251f769
--- /dev/null
+++ b/be/test/vec/core/column_array_test.cpp
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+
+namespace doris::vectorized {
+
+TEST(ColumnArrayTest, IntArrayTest) {
+    auto off_column = ColumnVector<IColumn::Offset>::create();
+    auto data_column = ColumnVector<int32_t>::create();
+    // init column array with [[1,2,3],[],[4]]
+    std::vector<IColumn::Offset> offs = {0, 3, 3, 4};
+    std::vector<int32_t> vals = {1, 2, 3, 4};
+    for (size_t i = 1; i < offs.size(); ++i) {
+        off_column->insert_data((const char*)(&offs[i]), 0);
+    }
+    for (auto& v : vals) {
+        data_column->insert_data((const char*)(&v), 0);
+    }
+
+    // check column array result
+    ColumnArray array_column(std::move(data_column), std::move(off_column));
+    ASSERT_EQ(array_column.size(), offs.size() - 1);
+    for (size_t i = 0; i < array_column.size(); ++i) {
+        auto v = get<Array>(array_column[i]);
+        ASSERT_EQ(v.size(), offs[i + 1] - offs[i]);
+        for (size_t j = 0; j < v.size(); ++j) {
+            ASSERT_EQ(vals[offs[i] + j], get<int32_t>(v[j]));
+        }
+    }
+}
+
+TEST(ColumnArrayTest, StringArrayTest) {
+    auto off_column = ColumnVector<IColumn::Offset>::create();
+    auto data_column = ColumnString::create();
+    // init column array with [["abc","d"],["ef"],[], [""]];
+    std::vector<IColumn::Offset> offs = {0, 2, 3, 3, 4};
+    std::vector<std::string> vals = {"abc", "d", "ef", ""};
+    for (size_t i = 1; i < offs.size(); ++i) {
+        off_column->insert_data((const char*)(&offs[i]), 0);
+    }
+    for (auto& v : vals) {
+        data_column->insert_data(v.data(), v.size());
+    }
+
+    // check column array result
+    ColumnArray array_column(std::move(data_column), std::move(off_column));
+    ASSERT_EQ(array_column.size(), offs.size() - 1);
+    for (size_t i = 0; i < array_column.size(); ++i) {
+        auto v = get<Array>(array_column[i]);
+        ASSERT_EQ(v.size(), offs[i + 1] - offs[i]);
+        for (size_t j = 0; j < v.size(); ++j) {
+            ASSERT_EQ(vals[offs[i] + j], get<std::string>(v[j]));
+        }
+    }
+}
+
+} // namespace doris::vectorized
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp 
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 20fcb0b..a257ff7 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -53,7 +53,7 @@ static void create_block(Schema& schema, vectorized::Block& 
block)
 {
     for (auto &column_desc : schema.columns()) {
         ASSERT_TRUE(column_desc);
-        auto data_type = Schema::get_data_type_ptr(column_desc->type());
+        auto data_type = Schema::get_data_type_ptr(*column_desc);
         ASSERT_NE(data_type, nullptr);
         if (column_desc->is_nullable()) {
             data_type = 
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to