xy720 commented on code in PR #17330:
URL: https://github.com/apache/doris/pull/17330#discussion_r1122983062


##########
be/src/vec/columns/column_map.cpp:
##########
@@ -25,50 +25,108 @@ namespace doris::vectorized {
 /** A column of map values.
   */
 std::string ColumnMap::get_name() const {
-    return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
+    return "Map(" + keys_column->get_name() + ", " + values_column->get_name() 
+ ")";
 }
 
-ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
-        : keys(std::move(keys)), values(std::move(values)) {
-    check_size();
-}
+ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets)
+        : keys_column(std::move(keys)),
+          values_column(std::move(values)),
+          offsets_column(std::move(offsets)) {
+    const COffsets* offsets_concrete = typeid_cast<const 
COffsets*>(offsets_column.get());
 
-ColumnArray::Offsets64& ColumnMap::get_offsets() const {
-    const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(get_keys());
-    // todo . did here check size ?
-    return const_cast<Offsets64&>(column_keys.get_offsets());
-}
+    if (!offsets_concrete) {
+        LOG(FATAL) << "offsets_column must be a ColumnUInt64";
+    }
 
-void ColumnMap::check_size() const {
-    const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
-    const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
-    CHECK(key_array) << "ColumnMap keys can be created only from array";
-    CHECK(value_array) << "ColumnMap values can be created only from array";
-    CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
+    if (!offsets_concrete->empty() && keys && values) {
+        auto last_offset = offsets_concrete->get_data().back();
+
+        /// This will also prevent possible overflow in offset.
+        if (keys_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
key_column";
+        }
+        if (values_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
value_column";
+        }
+    }
 }
 
 // todo. here to resize every row map
 MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
-    auto res = ColumnMap::create(keys->clone_resized(to_size), 
values->clone_resized(to_size));
+    auto res = ColumnMap::create(get_keys().clone_empty(), 
get_values().clone_empty(),
+                                 COffsets::create());
+    if (to_size == 0) {
+        return res;
+    }
+
+    size_t from_size = size();
+
+    if (to_size <= from_size) {
+        res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() 
+ to_size);
+        res->get_keys().insert_range_from(get_keys(), 0, get_offsets()[to_size 
- 1]);
+        res->get_values().insert_range_from(get_values(), 0, 
get_offsets()[to_size - 1]);
+    } else {
+        /// Copy column and append empty arrays for extra elements.
+        Offset64 offset = 0;
+        if (from_size > 0) {
+            res->get_offsets().assign(get_offsets().begin(), 
get_offsets().end());
+            res->get_keys().insert_range_from(get_keys(), 0, 
get_keys().size());
+            res->get_values().insert_range_from(get_values(), 0, 
get_values().size());
+            offset = get_offsets().back();
+        }
+        res->get_offsets().resize(to_size);
+        for (size_t i = from_size; i < to_size; ++i) {
+            res->get_offsets()[i] = offset;
+        }
+    }
     return res;
 }
 
 // to support field functions
 Field ColumnMap::operator[](size_t n) const {
-    // Map is FieldVector , see in field.h
-    Map res(2);
-    keys->get(n, res[0]);
-    values->get(n, res[1]);
+    // Map is FieldVector, now we keep key value in seperate  , see in field.h
+    Map m(2);
+    size_t start_offset = offset_at(n);
+    size_t element_size = size_at(n);
+
+    if (element_size > max_array_size_as_field) {
+        LOG(FATAL) << "element size " << start_offset
+                   << " is too large to be manipulated as single map field,"
+                   << "maximum size " << max_array_size_as_field;
+    }
 
-    return res;
+    Array k(element_size), v(element_size);
+
+    for (size_t i = 0; i < element_size; ++i) {
+        k[i] = get_keys()[start_offset + i];
+        v[i] = get_values()[start_offset + i];
+    }
+
+    m.push_back(k);
+    m.push_back(v);
+    return m;
 }
 
 // here to compare to below
 void ColumnMap::get(size_t n, Field& res) const {
     Map map(2);
-    keys->get(n, map[0]);
-    values->get(n, map[1]);
+    size_t start_offset = offset_at(n);
+    size_t element_size = size_at(n);
+
+    if (element_size > max_array_size_as_field) {
+        LOG(FATAL) << "element size " << start_offset
+                   << " is too large to be manipulated as single map field,"
+                   << "maximum size " << max_array_size_as_field;
+    }
 
+    Array k(element_size), v(element_size);

Review Comment:
   same as above.
   The structure should be a flat array like [key1, value1, key2, value2, ...]



##########
be/src/vec/columns/column_array.h:
##########
@@ -29,6 +29,13 @@
 
 namespace doris::vectorized {
 
+/** Obtaining array as Field can be slow for large arrays and consume vast 
amount of memory.
+  * Just don't allow to do it.
+  * You can increase the limit if the following query:
+  *  SELECT range(10000000)

Review Comment:
   Do you mean array_range?



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);

Review Comment:
   calculate `n` by offset.



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);

Review Comment:
   Same as above.
   Should be correspond to `serialized_value_into_Arena` method.



##########
regression-test/data/delete_p0/test_map_column_delete.out:
##########
@@ -2,3 +2,4 @@
 -- !sql --
 1      {1:'a', 2:'doris'}
 2      {}
+

Review Comment:
   empty line



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);
+    values_column->update_hash_with_value(n, hash);
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 size_t ColumnMap::filter(const Filter& filter) {
-    const auto key_result_size = keys->filter(filter);
-    const auto value_result_size = values->filter(filter);
-    CHECK_EQ(key_result_size, value_result_size);
-    return value_result_size;
+    return this->filter(filter, 0);
 }
 
 Status ColumnMap::filter_by_selector(const uint16_t* sel, size_t sel_size, 
IColumn* col_ptr) {
     auto to = reinterpret_cast<vectorized::ColumnMap*>(col_ptr);
 
-    auto& array_keys = assert_cast<vectorized::ColumnArray&>(*keys);
-    array_keys.filter_by_selector(sel, sel_size, &to->get_keys());
+    auto& to_offsets = to->get_offsets();
+
+    size_t element_size = 0;
+    size_t max_offset = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        element_size += size_at(sel[i]);
+        max_offset = std::max(max_offset, offset_at(sel[i]));
+    }
+    if (max_offset > std::numeric_limits<uint16_t>::max()) {
+        return Status::IOError("map elements too large than uint16_t::max");
+    }
 
-    auto& array_values = assert_cast<vectorized::ColumnArray&>(*values);
-    array_values.filter_by_selector(sel, sel_size, &to->get_values());
+    to_offsets.reserve(to_offsets.size() + sel_size);
+    auto nested_sel = std::make_unique<uint16_t[]>(element_size);
+    size_t nested_sel_size = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        auto row_off = offset_at(sel[i]);
+        auto row_size = size_at(sel[i]);
+        to_offsets.push_back(to_offsets.back() + row_size);
+        for (auto j = 0; j < row_size; ++j) {
+            nested_sel[nested_sel_size++] = row_off + j;
+        }
+    }
 
+    if (nested_sel_size > 0) {
+        keys_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_keys());
+        values_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_values());
+    }
     return Status::OK();
 }
 
 ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
-    return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, 
limit));
+    // Make a temp column array
+    auto k_arr =

Review Comment:
   need refactor later



##########
be/src/vec/olap/olap_data_convertor.cpp:
##########
@@ -810,30 +808,52 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
         const ColumnMap* column_map, const DataTypeMap* data_type_map) {
     ColumnPtr key_data = column_map->get_keys_ptr();
     ColumnPtr value_data = column_map->get_values_ptr();
+    const UInt8* key_null_map = nullptr;
+    const UInt8* val_null_map = nullptr;
     if (column_map->get_keys().is_nullable()) {
         const auto& key_nullable_column =
                 assert_cast<const ColumnNullable&>(column_map->get_keys());
         key_data = key_nullable_column.get_nested_column_ptr();
+        key_null_map = key_nullable_column.get_null_map_data().data();
     }
 
     if (column_map->get_values().is_nullable()) {
         const auto& val_nullable_column =
                 assert_cast<const ColumnNullable&>(column_map->get_values());
         value_data = val_nullable_column.get_nested_column_ptr();
+        val_null_map = val_nullable_column.get_null_map_data().data();
     }
 
-    ColumnWithTypeAndName key_typed_column = {key_data, 
remove_nullable(data_type_map->get_keys()),
-                                              "map.key"};
-    _key_convertor->set_source_column(key_typed_column, _row_pos, _num_rows);
+    // offsets data
+    auto& offsets = column_map->get_offsets();
+    // make first offset
+    auto offsets_col = ColumnArray::ColumnOffsets::create();
+
+    _offsets.reserve(offsets.size() + 1);
+    _offsets.push_back(_row_pos);
+    _offsets.insert_assume_reserved(offsets.begin(), offsets.end());
+
+    int64_t start_index = _row_pos - 1;
+    int64_t end_index = _row_pos + _num_rows - 1;
+    auto start = offsets[start_index];
+    auto size = offsets[end_index] - start;
+
+    ColumnWithTypeAndName key_typed_column = {
+            key_data, remove_nullable(data_type_map->get_key_type()), 
"map.key"};

Review Comment:
   Is remove_nullable is necessary?



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);

Review Comment:
   The `n` in this line should be calculated by offset.
   Just like column array.



##########
be/src/vec/columns/column_map.h:
##########
@@ -116,38 +120,52 @@ class ColumnMap final : public COWHelper<IColumn, 
ColumnMap> {
     void replace_column_data_default(size_t self_row = 0) override {
         LOG(FATAL) << "replace_column_data_default not implemented";
     }
-    void check_size() const;
-    ColumnArray::Offsets64& get_offsets() const;
+
+    ColumnArray::Offsets64& get_offsets() {
+        return assert_cast<COffsets&>(*offsets_column).get_data();
+    }
+    const ColumnArray::Offsets64& get_offsets() const {
+        return assert_cast<const COffsets&>(*offsets_column).get_data();
+    }
+    IColumn& get_offsets_column() { return *offsets_column; }
+    const IColumn& get_offsets_column() const { return *offsets_column; }
+
+    const ColumnPtr& get_offsets_ptr() const { return offsets_column; }
+    ColumnPtr& get_offsets_ptr() { return offsets_column; }
+
+    size_t size() const override { return get_offsets().size(); }
     void reserve(size_t n) override;
     size_t byte_size() const override;
     size_t allocated_bytes() const override;
     void protect() override;
 
     /******************** keys and values ***************/
-    const ColumnPtr& get_keys_ptr() const { return keys; }
-    ColumnPtr& get_keys_ptr() { return keys; }
+    const ColumnPtr& get_keys_ptr() const { return keys_column; }
+    ColumnPtr& get_keys_ptr() { return keys_column; }
 
-    const IColumn& get_keys() const { return *keys; }
-    IColumn& get_keys() { return *keys; }
+    const IColumn& get_keys() const { return *keys_column; }
+    IColumn& get_keys() { return *keys_column; }
 
-    const ColumnPtr& get_values_ptr() const { return values; }
-    ColumnPtr& get_values_ptr() { return values; }
+    const ColumnPtr& get_values_ptr() const { return values_column; }
+    ColumnPtr& get_values_ptr() { return values_column; }
 
-    const IColumn& get_values() const { return *values; }
-    IColumn& get_values() { return *values; }
+    const IColumn& get_values() const { return *values_column; }
+    IColumn& get_values() { return *values_column; }
 
 private:
     friend class COWHelper<IColumn, ColumnMap>;
 
-    WrappedPtr keys;   // nullable
-    WrappedPtr values; // nullable
+    WrappedPtr keys_column;    // nullable
+    WrappedPtr values_column;  // nullable
+    WrappedPtr offsets_column; // offset
 
     size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 
1]; }
     size_t ALWAYS_INLINE size_at(ssize_t i) const {
         return get_offsets()[i] - get_offsets()[i - 1];
     }
 
-    ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values);
+    explicit ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values,

Review Comment:
   why use explicit?



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -83,34 +141,39 @@ void ColumnMap::insert_data(const char*, size_t) {
 void ColumnMap::insert(const Field& x) {
     const auto& map = doris::vectorized::get<const Map&>(x);
     CHECK_EQ(map.size(), 2);
-    keys->insert(map[0]);
-    values->insert(map[1]);
+    const auto& k_f = doris::vectorized::get<const Array&>(map[0]);

Review Comment:
   Same as above.
   The structure should be a flat array like [key1, value1, key2, value2, ...]



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -25,50 +25,108 @@ namespace doris::vectorized {
 /** A column of map values.
   */
 std::string ColumnMap::get_name() const {
-    return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
+    return "Map(" + keys_column->get_name() + ", " + values_column->get_name() 
+ ")";
 }
 
-ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
-        : keys(std::move(keys)), values(std::move(values)) {
-    check_size();
-}
+ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets)
+        : keys_column(std::move(keys)),
+          values_column(std::move(values)),
+          offsets_column(std::move(offsets)) {
+    const COffsets* offsets_concrete = typeid_cast<const 
COffsets*>(offsets_column.get());
 
-ColumnArray::Offsets64& ColumnMap::get_offsets() const {
-    const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(get_keys());
-    // todo . did here check size ?
-    return const_cast<Offsets64&>(column_keys.get_offsets());
-}
+    if (!offsets_concrete) {
+        LOG(FATAL) << "offsets_column must be a ColumnUInt64";
+    }
 
-void ColumnMap::check_size() const {
-    const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
-    const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
-    CHECK(key_array) << "ColumnMap keys can be created only from array";
-    CHECK(value_array) << "ColumnMap values can be created only from array";
-    CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
+    if (!offsets_concrete->empty() && keys && values) {
+        auto last_offset = offsets_concrete->get_data().back();
+
+        /// This will also prevent possible overflow in offset.
+        if (keys_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
key_column";
+        }
+        if (values_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
value_column";
+        }
+    }
 }
 
 // todo. here to resize every row map
 MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
-    auto res = ColumnMap::create(keys->clone_resized(to_size), 
values->clone_resized(to_size));
+    auto res = ColumnMap::create(get_keys().clone_empty(), 
get_values().clone_empty(),
+                                 COffsets::create());
+    if (to_size == 0) {
+        return res;
+    }
+
+    size_t from_size = size();
+
+    if (to_size <= from_size) {
+        res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() 
+ to_size);
+        res->get_keys().insert_range_from(get_keys(), 0, get_offsets()[to_size 
- 1]);
+        res->get_values().insert_range_from(get_values(), 0, 
get_offsets()[to_size - 1]);
+    } else {
+        /// Copy column and append empty arrays for extra elements.
+        Offset64 offset = 0;
+        if (from_size > 0) {
+            res->get_offsets().assign(get_offsets().begin(), 
get_offsets().end());
+            res->get_keys().insert_range_from(get_keys(), 0, 
get_keys().size());
+            res->get_values().insert_range_from(get_values(), 0, 
get_values().size());
+            offset = get_offsets().back();
+        }
+        res->get_offsets().resize(to_size);
+        for (size_t i = from_size; i < to_size; ++i) {
+            res->get_offsets()[i] = offset;
+        }
+    }
     return res;
 }
 
 // to support field functions
 Field ColumnMap::operator[](size_t n) const {
-    // Map is FieldVector , see in field.h
-    Map res(2);
-    keys->get(n, res[0]);
-    values->get(n, res[1]);
+    // Map is FieldVector, now we keep key value in seperate  , see in field.h
+    Map m(2);
+    size_t start_offset = offset_at(n);
+    size_t element_size = size_at(n);
+
+    if (element_size > max_array_size_as_field) {
+        LOG(FATAL) << "element size " << start_offset
+                   << " is too large to be manipulated as single map field,"
+                   << "maximum size " << max_array_size_as_field;
+    }
 
-    return res;
+    Array k(element_size), v(element_size);

Review Comment:
   This logic is not same with the design of vmap_literal.h and field.h
   The structure should be flat array like [key1, value1, key2, value2, ...]



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);
+    values_column->update_hash_with_value(n, hash);
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 size_t ColumnMap::filter(const Filter& filter) {
-    const auto key_result_size = keys->filter(filter);
-    const auto value_result_size = values->filter(filter);
-    CHECK_EQ(key_result_size, value_result_size);
-    return value_result_size;
+    return this->filter(filter, 0);

Review Comment:
   It seems that this line will fail to compile



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);
+    values_column->update_hash_with_value(n, hash);
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =

Review Comment:
   Need to think about a better implementation.
   But we can refactor it later



##########
be/src/vec/olap/olap_data_convertor.h:
##########
@@ -408,7 +408,7 @@ class OlapBlockDataConvertor {
                                    OlapColumnDataConvertorBaseUPtr 
value_convertor)
                 : _key_convertor(std::move(key_convertor)),
                   _value_convertor(std::move(value_convertor)) {
-            _results.resize(2);
+            _results.resize(6); // size + offset + k_data + k_nullmap + v_data 
+ v_nullmap

Review Comment:
   check order in convert_to_olap() function



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);
+    values_column->update_hash_with_value(n, hash);
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 size_t ColumnMap::filter(const Filter& filter) {
-    const auto key_result_size = keys->filter(filter);
-    const auto value_result_size = values->filter(filter);
-    CHECK_EQ(key_result_size, value_result_size);
-    return value_result_size;
+    return this->filter(filter, 0);
 }
 
 Status ColumnMap::filter_by_selector(const uint16_t* sel, size_t sel_size, 
IColumn* col_ptr) {
     auto to = reinterpret_cast<vectorized::ColumnMap*>(col_ptr);
 
-    auto& array_keys = assert_cast<vectorized::ColumnArray&>(*keys);
-    array_keys.filter_by_selector(sel, sel_size, &to->get_keys());
+    auto& to_offsets = to->get_offsets();
+
+    size_t element_size = 0;
+    size_t max_offset = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        element_size += size_at(sel[i]);
+        max_offset = std::max(max_offset, offset_at(sel[i]));
+    }
+    if (max_offset > std::numeric_limits<uint16_t>::max()) {
+        return Status::IOError("map elements too large than uint16_t::max");
+    }
 
-    auto& array_values = assert_cast<vectorized::ColumnArray&>(*values);
-    array_values.filter_by_selector(sel, sel_size, &to->get_values());
+    to_offsets.reserve(to_offsets.size() + sel_size);
+    auto nested_sel = std::make_unique<uint16_t[]>(element_size);
+    size_t nested_sel_size = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        auto row_off = offset_at(sel[i]);
+        auto row_size = size_at(sel[i]);
+        to_offsets.push_back(to_offsets.back() + row_size);
+        for (auto j = 0; j < row_size; ++j) {
+            nested_sel[nested_sel_size++] = row_off + j;
+        }
+    }
 
+    if (nested_sel_size > 0) {
+        keys_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_keys());
+        values_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_values());
+    }
     return Status::OK();
 }
 
 ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
-    return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, 
limit));
+    // Make a temp column array
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
-    return ColumnMap::create(keys->replicate(offsets), 
values->replicate(offsets));
+    // Make a temp column array for reusing its replicate function
+    auto k_arr =

Review Comment:
   need refactor later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to