eldenmoon commented on code in PR #24554:
URL: https://github.com/apache/doris/pull/24554#discussion_r1336905544
##########
be/src/vec/common/schema_util.cpp:
##########
@@ -129,266 +139,254 @@ bool is_conversion_required_between_integers(FieldType
lhs, FieldType rhs) {
return true;
}
-Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type,
ColumnPtr* result,
- RuntimeState* state) {
- ColumnsWithTypeAndName arguments;
- if (WhichDataType(type->get_type_id()).is_string()) {
- // Special handle ColumnString, since the original cast logic use
ColumnString's first item
- // as the name of the dest type
- arguments = {arg, {type->create_column_const(1, type->get_name()),
type, ""}};
- } else {
- arguments = {arg, {type->create_column_const_with_default_value(1),
type, ""}};
- }
+Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type,
ColumnPtr* result) {
+ ColumnsWithTypeAndName arguments {
+ arg, {type->create_column_const_with_default_value(1), type,
type->get_name()}};
auto function = SimpleFunctionFactory::instance().get_function("CAST",
arguments, type);
+ if (!function) {
+ return Status::InternalError("Not found cast function {} to {}",
arg.type->get_name(),
+ type->get_name());
+ }
Block tmp_block {arguments};
- // the 0 position is input argument, the 1 position is to type argument,
the 2 position is result argument
vectorized::ColumnNumbers argnum;
argnum.emplace_back(0);
argnum.emplace_back(1);
size_t result_column = tmp_block.columns();
+ auto ctx = FunctionContext::create_context(nullptr, {}, {});
+ // We convert column string to jsonb type just add a string jsonb field to
dst column instead of parse
+ // each line in original string column.
+ ctx->set_string_as_jsonb_string(true);
tmp_block.insert({nullptr, type, arg.name});
- auto need_state_only = FunctionContext::create_context(state, {}, {});
- RETURN_IF_ERROR(function->execute(need_state_only.get(), tmp_block,
argnum, result_column,
- arg.column->size()));
+ RETURN_IF_ERROR(
+ function->execute(ctx.get(), tmp_block, argnum, result_column,
arg.column->size()));
*result = std::move(tmp_block.get_by_position(result_column).column);
+ // Variant column is a really special case, src type is nullable but dst
variant type is none nullable,
+ // but we still need to wrap nullmap into variant root column to prevent
from nullable info lost.
+ // TODO rethink and better handle this sepecial situation
+ if (arg.type->is_nullable() && WhichDataType(type).is_variant_type()) {
+ auto variant = ColumnObject::create(true);
+ auto& old_variant = assert_cast<const
ColumnObject&>(*(*result)->assume_mutable());
+ DCHECK(!old_variant.get_root()->is_nullable());
+ auto nullable = ColumnNullable::create(
+ old_variant.get_root(),
+ assert_cast<const
ColumnNullable&>(*arg.column).get_null_map_column_ptr());
+ variant->create_root(make_nullable(arg.type),
nullable->assume_mutable());
+ *result = std::move(variant);
+ }
return Status::OK();
}
-static void get_column_def(const vectorized::DataTypePtr& data_type, const
std::string& name,
- TColumnDef* column) {
- if (!name.empty()) {
- column->columnDesc.__set_columnName(name);
+void get_column_by_type(const vectorized::DataTypePtr& data_type, const
std::string& name,
+ TabletColumn& column, const ExtraInfo& ext_info) {
+ column.set_name(name);
+ column.set_type(data_type->get_type_as_field_type());
+ if (ext_info.unique_id >= 0) {
+ column.set_unique_id(ext_info.unique_id);
+ }
+ if (ext_info.parent_unique_id >= 0) {
+ column.set_parent_unique_id(ext_info.parent_unique_id);
+ }
+ if (!ext_info.path_info.empty()) {
+ column.set_path_info(ext_info.path_info);
}
if (data_type->is_nullable()) {
const auto& real_type = static_cast<const
DataTypeNullable&>(*data_type);
- column->columnDesc.__set_isAllowNull(true);
- get_column_def(real_type.get_nested_type(), "", column);
+ column.set_is_nullable(true);
+ get_column_by_type(real_type.get_nested_type(), name, column, {});
return;
}
-
column->columnDesc.__set_columnType(data_type->get_type_as_tprimitive_type());
if (data_type->get_type_id() == TypeIndex::Array) {
- TColumnDef child;
- column->columnDesc.__set_children({});
- get_column_def(assert_cast<const
DataTypeArray*>(data_type.get())->get_nested_type(), "",
- &child);
- column->columnDesc.columnLength =
-
TabletColumn::get_field_length_by_type(column->columnDesc.columnType, 0);
- column->columnDesc.children.push_back(child.columnDesc);
+ TabletColumn child;
+ get_column_by_type(assert_cast<const
DataTypeArray*>(data_type.get())->get_nested_type(),
+ "", child, {});
+ column.set_length(TabletColumn::get_field_length_by_type(
+ data_type->get_type_as_tprimitive_type(), 0));
+ column.add_sub_column(child);
+ column.set_default_value("[]");
return;
}
- if (data_type->get_type_id() == TypeIndex::Tuple) {
- // TODO
- // auto tuple_type = assert_cast<const
DataTypeTuple*>(data_type.get());
- // DCHECK_EQ(tuple_type->get_elements().size(),
tuple_type->get_element_names().size());
- // for (size_t i = 0; i < tuple_type->get_elements().size(); ++i) {
- // TColumnDef child;
- // get_column_def(tuple_type->get_element(i),
tuple_type->get_element_names()[i], &child);
- // column->columnDesc.children.push_back(child.columnDesc);
- // }
- // return;
- }
- if (data_type->get_type_id() == TypeIndex::String) {
+ // size is not fixed when type is string or json
+ if (WhichDataType(*data_type).is_string() ||
WhichDataType(*data_type).is_json()) {
return;
}
if (WhichDataType(*data_type).is_simple()) {
-
column->columnDesc.__set_columnLength(data_type->get_size_of_value_in_memory());
+ column.set_length(data_type->get_size_of_value_in_memory());
return;
}
+ // TODO handle more types like struct/date/datetime/decimal...
+ __builtin_unreachable();
}
-// send an empty add columns rpc, the rpc response will fill with base schema
info
-// maybe we could seperate this rpc from add columns rpc
-Status send_fetch_full_base_schema_view_rpc(FullBaseSchemaView* schema_view) {
- TAddColumnsRequest req;
- TAddColumnsResult res;
- TTabletInfo tablet_info;
- req.__set_table_name(schema_view->table_name);
- req.__set_db_name(schema_view->db_name);
- req.__set_table_id(schema_view->table_id);
- // Set empty columns
- req.__set_addColumns({});
- auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
- Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&req, &res](FrontendServiceConnection& client) {
client->addColumns(res, req); },
- config::txn_commit_rpc_timeout_ms);
- if (!rpc_st.ok()) {
- return Status::InternalError("Failed to fetch schema info, encounter
rpc failure");
- }
- // TODO(lhy) handle more status code
- if (res.status.status_code != TStatusCode::OK) {
- LOG(WARNING) << "failed to fetch schema info, code:" <<
res.status.status_code
- << ", msg:" << res.status.error_msgs[0];
- return Status::InvalidArgument(
- fmt::format("Failed to fetch schema info, {}",
res.status.error_msgs[0]));
- }
- for (const auto& column : res.allColumns) {
- schema_view->column_name_to_column[column.column_name] = column;
+TabletColumn get_least_type_column(const TabletColumn& original, const
DataTypePtr& new_type,
+ const ExtraInfo& ext_info, bool* changed) {
+ TabletColumn result_column;
+ vectorized::DataTypePtr original_type = original.get_vec_type();
+ vectorized::DataTypePtr common_type;
+ vectorized::get_least_supertype<vectorized::LeastSupertypeOnError::Jsonb>(
+ vectorized::DataTypes {original_type, new_type}, &common_type);
+ if (!original_type->equals(*common_type)) {
+ // update to common type
+ *changed = true;
+ vectorized::schema_util::get_column_by_type(common_type,
original.name(), result_column,
+ ext_info);
+ } else {
+ *changed = false;
+ result_column = original;
+ result_column.set_parent_unique_id(ext_info.parent_unique_id);
+ result_column.set_unique_id(ext_info.unique_id);
+ result_column.set_path_info(ext_info.path_info);
}
- schema_view->schema_version = res.schema_version;
- return Status::OK();
+ return result_column;
}
-static const std::regex COLUMN_NAME_REGEX(
- "^[_a-zA-Z@0-9\\s<>/][.a-zA-Z0-9_+-/><?@#$%^&*\"\\s,:]{0,255}$");
-
-// Do batch add columns schema change
-// only the base table supported
-Status send_add_columns_rpc(ColumnsWithTypeAndName column_type_names,
- FullBaseSchemaView* schema_view) {
- if (column_type_names.empty()) {
- return Status::OK();
+void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
+ TabletSchemaSPtr& common_schema, int32_t
variant_col_unique_id) {
+ // Types of subcolumns by path from all tuples.
+ std::unordered_map<PathInData, DataTypes, PathInData::Hash>
subcolumns_types;
+ for (const TabletSchemaSPtr& schema : schemas) {
+ for (const TabletColumn& col : schema->columns()) {
+ // Get subcolumns of this variant
+ if (!col.path_info().empty() && col.parent_unique_id() > 0 &&
+ col.parent_unique_id() == variant_col_unique_id) {
+ subcolumns_types[col.path_info()].push_back(
+ DataTypeFactory::instance().create_data_type(col,
col.is_nullable()));
+ }
+ }
}
- TAddColumnsRequest req;
- TAddColumnsResult res;
- TTabletInfo tablet_info;
- req.__set_table_name(schema_view->table_name);
- req.__set_db_name(schema_view->db_name);
- req.__set_table_id(schema_view->table_id);
- // TODO(lhy) more configurable
- req.__set_allow_type_conflict(true);
- req.__set_addColumns({});
- // Deduplicate Column like `Level` and `level`
- // TODO we will implement new version of dynamic column soon to handle
this issue,
- // also ignore column missmatch with regex
- std::set<std::string> dedup;
- for (const auto& column_type_name : column_type_names) {
- if (dedup.contains(to_lower(column_type_name.name))) {
+ PathsInData tuple_paths;
+ DataTypes tuple_types;
+ // Get the least common type for all paths.
+ for (const auto& [key, subtypes] : subcolumns_types) {
+ assert(!subtypes.empty());
+ if (key.get_path() == ColumnObject::COLUMN_NAME_DUMMY) {
continue;
}
- if (!std::regex_match(column_type_name.name, COLUMN_NAME_REGEX)) {
+ size_t first_dim = get_number_of_dimensions(*subtypes[0]);
+ tuple_paths.emplace_back(key);
+ for (size_t i = 1; i < subtypes.size(); ++i) {
+ if (first_dim != get_number_of_dimensions(*subtypes[i])) {
+
tuple_types.emplace_back(make_nullable(std::make_shared<DataTypeJsonb>()));
+ LOG(INFO) << fmt::format(
+ "Uncompatible types of subcolumn '{}': {} and {}, cast
to JSONB",
+ key.get_path(), subtypes[0]->get_name(),
subtypes[i]->get_name());
+ break;
+ }
+ }
+ if (tuple_paths.size() == tuple_types.size()) {
continue;
}
- dedup.insert(to_lower(column_type_name.name));
- TColumnDef col;
- get_column_def(column_type_name.type, column_type_name.name, &col);
- req.addColumns.push_back(col);
- }
- auto master_addr = ExecEnv::GetInstance()->master_info()->network_address;
- Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&req, &res](FrontendServiceConnection& client) {
client->addColumns(res, req); },
- config::txn_commit_rpc_timeout_ms);
- if (!rpc_st.ok()) {
- return Status::InternalError("Failed to do schema change, rpc error");
- }
- // TODO(lhy) handle more status code
- if (res.status.status_code != TStatusCode::OK) {
- LOG(WARNING) << "failed to do schema change, code:" <<
res.status.status_code
- << ", msg:" << res.status.error_msgs[0];
- return Status::InvalidArgument(
- fmt::format("Failed to do schema change, {}",
res.status.error_msgs[0]));
- }
- size_t sz = res.allColumns.size();
- if (sz < dedup.size()) {
- return Status::InternalError(
- fmt::format("Unexpected result columns {}, expected at least
{}",
- res.allColumns.size(), column_type_names.size()));
- }
- for (const auto& column : res.allColumns) {
- schema_view->column_name_to_column[column.column_name] = column;
+ DataTypePtr common_type;
+ get_least_supertype<LeastSupertypeOnError::Jsonb>(subtypes,
&common_type);
+ if (!common_type->is_nullable()) {
+ common_type = make_nullable(common_type);
+ }
+ tuple_types.emplace_back(common_type);
}
- schema_view->schema_version = res.schema_version;
- return Status::OK();
-}
+ CHECK_EQ(tuple_paths.size(), tuple_types.size());
-Status unfold_object(size_t dynamic_col_position, Block& block, bool
cast_to_original_type,
- RuntimeState* state) {
- auto dynamic_col =
block.get_by_position(dynamic_col_position).column->assume_mutable();
- auto* column_object_ptr = assert_cast<ColumnObject*>(dynamic_col.get());
- if (column_object_ptr->empty()) {
- return Status::OK();
+ std::string variant_col_name =
common_schema->column_by_uid(variant_col_unique_id).name();
+ // Append all common type columns of this variant
+ for (int i = 0; i < tuple_paths.size(); ++i) {
+ TabletColumn common_column;
+ // const std::string& column_name = variant_col_name + "." +
tuple_paths[i].get_path();
Review Comment:
`tuple_paths[i].get_path()` alread has variant_col_name prefix, so use
`tuple_paths[i].get_path() ` as it's name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]