This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch array-type in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8dba572bfa817f24cb6f6a22f3d108432afec27e Author: Adonis Ling <adonis0...@gmail.com> AuthorDate: Thu Mar 10 09:15:48 2022 +0800 [feature-wip][array-type] Array data can be loaded in stream load. (#8368) --- be/src/exprs/cast_functions.cpp | 7 + be/src/exprs/cast_functions.h | 2 + be/src/util/array_parser.hpp | 212 +++++++++++++++++++++ be/test/util/CMakeLists.txt | 1 + be/test/util/array_parser_test.cpp | 134 +++++++++++++ .../java/org/apache/doris/analysis/CastExpr.java | 19 +- .../main/java/org/apache/doris/catalog/Type.java | 4 +- 7 files changed, 371 insertions(+), 8 deletions(-) diff --git a/be/src/exprs/cast_functions.cpp b/be/src/exprs/cast_functions.cpp index baadd09..8903d5b 100644 --- a/be/src/exprs/cast_functions.cpp +++ b/be/src/exprs/cast_functions.cpp @@ -26,6 +26,7 @@ #include "runtime/datetime_value.h" #include "runtime/string_value.h" #include "string_functions.h" +#include "util/array_parser.hpp" #include "util/mysql_global.h" #include "util/string_parser.hpp" @@ -357,4 +358,10 @@ DateTimeVal CastFunctions::cast_to_date_val(FunctionContext* ctx, const StringVa return result; } +CollectionVal CastFunctions::cast_to_array_val(FunctionContext* context, const StringVal& val) { + CollectionVal array_val; + Status status = ArrayParser::parse(array_val, context, val); + return status.ok() ? array_val : CollectionVal::null(); +} + } // namespace doris diff --git a/be/src/exprs/cast_functions.h b/be/src/exprs/cast_functions.h index 0160275..fa75d7c 100644 --- a/be/src/exprs/cast_functions.h +++ b/be/src/exprs/cast_functions.h @@ -136,6 +136,8 @@ public: static DateTimeVal cast_to_date_val(FunctionContext* context, const DoubleVal& val); static DateTimeVal cast_to_date_val(FunctionContext* context, const DateTimeVal& val); static DateTimeVal cast_to_date_val(FunctionContext* context, const StringVal& val); + + static CollectionVal cast_to_array_val(FunctionContext* context, const StringVal& val); }; } // namespace doris diff --git a/be/src/util/array_parser.hpp b/be/src/util/array_parser.hpp new file mode 100644 index 0000000..746695a --- /dev/null +++ b/be/src/util/array_parser.hpp @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <rapidjson/document.h> + +#include <unordered_map> + +#include "common/status.h" +#include "exprs/anyval_util.h" +#include "runtime/collection_value.h" +#include "runtime/primitive_type.h" +#include "runtime/types.h" +#include "util/mem_util.hpp" + +namespace doris { + +template <typename Encoding> +using ConstArray = typename rapidjson::GenericValue<Encoding>::ConstArray; + +template <typename Encoding> +using ConstArrayIterator = typename ConstArray<Encoding>::ValueIterator; + +class ArrayParser { +public: + static Status parse(CollectionVal& array_val, FunctionContext* context, + const StringVal& str_val) { + rapidjson::Document document; + if (document.Parse(reinterpret_cast<char*>(str_val.ptr), str_val.len).HasParseError() || + !document.IsArray()) { + return Status::RuntimeError("Failed to parse the json to array."); + } + if (document.IsNull()) { + array_val = CollectionVal::null(); + return Status::OK(); + } + auto type_desc = _convert_to_type_descriptor(context->get_return_type()); + return _parse<rapidjson::UTF8<>>( + array_val, context, + reinterpret_cast<const rapidjson::Document*>(&document)->GetArray(), type_desc); + } + +private: + static TypeDescriptor _convert_to_type_descriptor( + FunctionContext::TypeDesc function_type_desc) { + auto iterator = _types_mapping.find(function_type_desc.type); + if (iterator == _types_mapping.end()) { + return TypeDescriptor(); + } + auto type_desc = TypeDescriptor(iterator->second); + type_desc.len = function_type_desc.len; + type_desc.precision = function_type_desc.precision; + type_desc.scale = function_type_desc.scale; + for (auto child_type_desc : function_type_desc.children) { + type_desc.children.push_back(_convert_to_type_descriptor(child_type_desc)); + } + return type_desc; + } + + template <typename Encoding> + static Status _parse(CollectionVal& array_val, FunctionContext* context, + const ConstArray<Encoding>& array, const TypeDescriptor& type_desc) { + if (array.Empty()) { + CollectionValue(0).to_collection_val(&array_val); + return Status::OK(); + } + auto child_type_desc = type_desc.children[0]; + auto item_type = child_type_desc.type; + CollectionValue collection_value; + CollectionValue::init_collection(context, array.Size(), item_type, &collection_value); + int index = 0; + for (auto it = array.Begin(); it != array.End(); ++it) { + if (it->IsNull()) { + auto null = AnyVal(true); + collection_value.set(index++, item_type, &null); + continue; + } else if (!_is_type_valid<Encoding>(it, item_type)) { + return Status::RuntimeError("Failed to parse the json to array."); + } + AnyVal* val; + Status status = _parse<Encoding>(&val, context, it, child_type_desc); + if (!status.ok()) { + return status; + } + collection_value.set(index++, item_type, val); + } + collection_value.to_collection_val(&array_val); + return Status::OK(); + } + + template <typename Encoding> + static bool _is_type_valid(const ConstArrayIterator<Encoding> iterator, + const PrimitiveType type) { + switch (type) { + case TYPE_NULL: + return iterator->IsNull(); + case TYPE_BOOLEAN: + return iterator->IsBool(); + case TYPE_TINYINT: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: + case TYPE_FLOAT: + case TYPE_DOUBLE: + return iterator->IsNumber(); + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_HLL: + case TYPE_STRING: + return iterator->IsString(); + case TYPE_OBJECT: + return iterator->IsObject(); + case TYPE_ARRAY: + return iterator->IsArray(); + default: + return false; + } + } + + template <typename Encoding> + static Status _parse(AnyVal** val, FunctionContext* context, + const ConstArrayIterator<Encoding> iterator, + const TypeDescriptor& type_desc) { + switch (type_desc.type) { + case TYPE_ARRAY: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(CollectionVal))); + new (*val) CollectionVal(); + return _parse<Encoding>(*reinterpret_cast<CollectionVal*>(*val), context, + iterator->GetArray(), type_desc); + case TYPE_BOOLEAN: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(BooleanVal))); + new (*val) BooleanVal(iterator->GetBool()); + break; + case TYPE_TINYINT: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(TinyIntVal))); + new (*val) TinyIntVal(iterator->GetInt()); + break; + case TYPE_SMALLINT: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(SmallIntVal))); + new (*val) SmallIntVal(iterator->GetInt()); + break; + case TYPE_INT: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(IntVal))); + new (*val) IntVal(iterator->GetInt()); + break; + case TYPE_BIGINT: + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(BigIntVal))); + new (*val) BigIntVal(iterator->GetInt64()); + break; + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(StringVal))); + new (*val) StringVal(context->allocate(iterator->GetStringLength()), + iterator->GetStringLength()); + auto string_val = reinterpret_cast<StringVal*>(*val); + memory_copy(string_val->ptr, iterator->GetString(), iterator->GetStringLength()); + break; + } + default: + return Status::RuntimeError("Failed to parse json to type (" + + std::to_string(type_desc.type) + ")."); + } + return Status::OK(); + } + +private: + static std::unordered_map<FunctionContext::Type, PrimitiveType> _types_mapping; +}; + +std::unordered_map<FunctionContext::Type, PrimitiveType> ArrayParser::_types_mapping = { + {FunctionContext::INVALID_TYPE, PrimitiveType::INVALID_TYPE}, + {FunctionContext::TYPE_NULL, PrimitiveType::TYPE_NULL}, + {FunctionContext::TYPE_BOOLEAN, PrimitiveType::TYPE_BOOLEAN}, + {FunctionContext::TYPE_TINYINT, PrimitiveType::TYPE_TINYINT}, + {FunctionContext::TYPE_SMALLINT, PrimitiveType::TYPE_SMALLINT}, + {FunctionContext::TYPE_INT, PrimitiveType::TYPE_INT}, + {FunctionContext::TYPE_BIGINT, PrimitiveType::TYPE_BIGINT}, + {FunctionContext::TYPE_LARGEINT, PrimitiveType::TYPE_LARGEINT}, + {FunctionContext::TYPE_FLOAT, PrimitiveType::TYPE_FLOAT}, + {FunctionContext::TYPE_DOUBLE, PrimitiveType::TYPE_DOUBLE}, + {FunctionContext::TYPE_DECIMAL_DEPRACTED, PrimitiveType::TYPE_DECIMAL_DEPRACTED}, + {FunctionContext::TYPE_DATE, PrimitiveType::TYPE_DATE}, + {FunctionContext::TYPE_DATETIME, PrimitiveType::TYPE_DATETIME}, + {FunctionContext::TYPE_CHAR, PrimitiveType::TYPE_CHAR}, + {FunctionContext::TYPE_VARCHAR, PrimitiveType::TYPE_VARCHAR}, + {FunctionContext::TYPE_HLL, PrimitiveType::TYPE_HLL}, + {FunctionContext::TYPE_STRING, PrimitiveType::TYPE_STRING}, + {FunctionContext::TYPE_DECIMALV2, PrimitiveType::TYPE_DECIMALV2}, + {FunctionContext::TYPE_OBJECT, PrimitiveType::TYPE_OBJECT}, + {FunctionContext::TYPE_ARRAY, PrimitiveType::TYPE_ARRAY}, +}; + +} // namespace doris diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index b3ccb88..12daf38 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -75,5 +75,6 @@ ADD_BE_TEST(sort_heap_test) ADD_BE_TEST(counts_test) ADD_BE_TEST(date_func_test) ADD_BE_TEST(tuple_row_zorder_compare_test) +ADD_BE_TEST(array_parser_test) target_link_libraries(Test_util Common Util Gutil ${Boost_LIBRARIES} glog gflags fmt protobuf) diff --git a/be/test/util/array_parser_test.cpp b/be/test/util/array_parser_test.cpp new file mode 100644 index 0000000..cbda494 --- /dev/null +++ b/be/test/util/array_parser_test.cpp @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <memory> +#include <string> +#include <util/array_parser.hpp> + +#include "gutil/casts.h" +#include "olap/types.h" +#include "runtime/free_pool.hpp" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/string_value.h" +#include "udf/udf.h" +#include "udf/udf_internal.h" + +namespace doris { + +using TypeDesc = FunctionContext::TypeDesc; + +template <typename... Ts> +TypeDesc create_function_type_desc(FunctionContext::Type type, Ts... sub_types) { + TypeDesc type_desc = {.type = type, + .len = (type == FunctionContext::TYPE_ARRAY) ? OLAP_ARRAY_MAX_BYTES : 0}; + if constexpr (sizeof...(sub_types)) { + type_desc.children.push_back(create_function_type_desc(sub_types...)); + } + return type_desc; +} + +ColumnPB create_column_pb(const TypeDesc& function_type_desc) { + ColumnPB column_pb; + column_pb.set_length(function_type_desc.len); + switch (function_type_desc.type) { + case FunctionContext::TYPE_ARRAY: + column_pb.set_type("ARRAY"); + break; + case FunctionContext::TYPE_INT: + column_pb.set_type("INT"); + break; + case FunctionContext::TYPE_VARCHAR: + column_pb.set_type("VARCHAR"); + break; + default: + break; + } + for (auto child_type_desc : function_type_desc.children) { + auto sub_column_pb = create_column_pb(child_type_desc); + column_pb.add_children_columns()->Swap(&sub_column_pb); + } + return column_pb; +} + +std::shared_ptr<const TypeInfo> get_type_info(const TypeDesc& function_type_desc) { + auto column_pb = create_column_pb(function_type_desc); + TabletColumn tablet_column; + tablet_column.init_from_pb(column_pb); + return get_type_info(&tablet_column); +} + +void test_array_parser(const TypeDesc& function_type_desc, const std::string& json, + const CollectionValue& expect) { + MemTracker tracker(1024 * 1024, "ArrayParserTest"); + MemPool mem_pool(&tracker); + std::unique_ptr<FunctionContext> function_context(new FunctionContext()); + function_context->impl()->_return_type = function_type_desc; + function_context->impl()->_pool = new FreePool(&mem_pool); + CollectionVal collection_val; + auto status = + ArrayParser::parse(collection_val, function_context.get(), StringVal(json.c_str())); + EXPECT_TRUE(status.ok()); + auto actual = CollectionValue::from_collection_val(collection_val); + EXPECT_TRUE(get_type_info(function_type_desc)->equal(&expect, &actual)); +} + +TEST(ArrayParserTest, TestParseIntArray) { + auto function_type_desc = + create_function_type_desc(FunctionContext::TYPE_ARRAY, FunctionContext::TYPE_INT); + test_array_parser(function_type_desc, "[]", CollectionValue(0)); + + int num_items = 3; + std::unique_ptr<int32_t[]> data(new int32_t[num_items] {1, 2, 3}); + CollectionValue value(data.get(), num_items, false, nullptr); + test_array_parser(function_type_desc, "[1, 2, 3]", value); + + std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true, false}); + value.set_has_null(true); + value.set_null_signs(null_signs.get()); + test_array_parser(function_type_desc, "[1, null, 3]", value); +} + +TEST(ArrayParserTest, TestParseVarcharArray) { + auto function_type_desc = + create_function_type_desc(FunctionContext::TYPE_ARRAY, FunctionContext::TYPE_VARCHAR); + test_array_parser(function_type_desc, "[]", CollectionValue(0)); + + int num_items = 3; + std::unique_ptr<char[]> data(new char[num_items] {'a', 'b', 'c'}); + std::unique_ptr<StringValue[]> string_values(new StringValue[num_items] { + {&data[0], 1}, + {&data[1], 1}, + {&data[2], 1}, + }); + CollectionValue value(string_values.get(), num_items, false, nullptr); + test_array_parser(function_type_desc, "[\"a\", \"b\", \"c\"]", value); + + std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true, false}); + value.set_has_null(true); + value.set_null_signs(null_signs.get()); + test_array_parser(function_type_desc, "[\"a\", null, \"c\"]", value); +} + +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java index 387cf1a..c39b2b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java @@ -241,12 +241,19 @@ public class CastExpr extends Expr { this.opcode = TExprOpcode.CAST; FunctionName fnName = new FunctionName(getFnName(type)); Function searchDesc = new Function(fnName, Arrays.asList(collectChildReturnTypes()), Type.INVALID, false); - if (isImplicit) { - fn = Catalog.getCurrentCatalog().getFunction( - searchDesc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); - } else { - fn = Catalog.getCurrentCatalog().getFunction( - searchDesc, Function.CompareMode.IS_IDENTICAL); + if (type.isScalarType()) { + if (isImplicit) { + fn = Catalog.getCurrentCatalog().getFunction( + searchDesc, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); + } else { + fn = Catalog.getCurrentCatalog().getFunction( + searchDesc, Function.CompareMode.IS_IDENTICAL); + } + } else if (type.isArrayType()){ + fn = ScalarFunction.createBuiltin(getFnName(Type.ARRAY), + type, Function.NullableMode.ALWAYS_NULLABLE, + Lists.newArrayList(Type.VARCHAR), false , + "doris::CastFunctions::cast_to_array_val", null, null, true); } if (fn == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index bcb72d5..3f0b8d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -81,6 +81,7 @@ public abstract class Type { // Only used for alias function, to represent any type in function args public static final ScalarType ALL = new ScalarType(PrimitiveType.ALL); public static final MapType Map = new MapType(); + public static final ArrayType ARRAY = ArrayType.create(); private static ArrayList<ScalarType> integerTypes; private static ArrayList<ScalarType> numericTypes; @@ -123,7 +124,6 @@ public abstract class Type { supportedTypes.add(DECIMALV2); supportedTypes.add(TIME); supportedTypes.add(STRING); - } public static ArrayList<ScalarType> getIntegerTypes() { @@ -387,7 +387,7 @@ public abstract class Type { } else if (t1.isArrayType() && t2.isArrayType()) { return ArrayType.canCastTo((ArrayType)t1, (ArrayType)t2); } - return t1.isNull(); + return t1.isNull() || t1.getPrimitiveType() == PrimitiveType.VARCHAR; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org