This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 83521a8 [Feature](create_table) Support create table with random distribution to avoid data skew (#8041) 83521a8 is described below commit 83521a826a07693246bdb6e7834d7de2500d00d5 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Sat Feb 26 10:38:55 2022 +0800 [Feature](create_table) Support create table with random distribution to avoid data skew (#8041) In some scenarios, users cannot find a suitable hash key to avoid data skew, so we need to provide an additional data distribution for olap table to avoid data skew example: CREATE TABLE random_table ( siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(siteid, citycode, username) DISTRIBUTED BY random BUCKETS 10 PROPERTIES("replication_num" = "1"); Co-authored-by: caiconghui1 <caicongh...@jd.com> --- be/src/exec/tablet_info.cpp | 113 ++++++++++--------- be/src/exec/tablet_info.h | 16 +-- be/src/exec/tablet_sink.cpp | 29 ++++- be/src/exec/tablet_sink.h | 13 +++ be/src/http/action/stream_load.cpp | 8 ++ be/src/http/http_common.h | 5 +- be/src/vec/sink/vtablet_sink.cpp | 18 ++- .../sql-statements/Data Definition/CREATE TABLE.md | 7 +- .../Data Manipulation/BROKER LOAD.md | 2 + .../Data Manipulation/ROUTINE LOAD.md | 3 + .../Data Manipulation/STREAM LOAD.md | 2 + .../sql-statements/Data Definition/CREATE TABLE.md | 8 +- .../Data Manipulation/BROKER LOAD.md | 1 + .../Data Manipulation/ROUTINE LOAD.md | 4 + .../Data Manipulation/STREAM LOAD.md | 2 + .../doris/analysis/CreateRoutineLoadStmt.java | 10 ++ .../doris/analysis/HashDistributionDesc.java | 4 +- .../java/org/apache/doris/analysis/InsertStmt.java | 3 +- .../java/org/apache/doris/analysis/LoadStmt.java | 7 ++ .../analysis/ModifyTablePropertiesClause.java | 5 +- .../doris/analysis/RandomDistributionDesc.java | 5 +- .../java/org/apache/doris/catalog/Catalog.java | 124 ++++++++++----------- .../org/apache/doris/catalog/DistributionInfo.java | 1 - .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 5 + .../apache/doris/load/loadv2/LoadLoadingTask.java | 4 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 2 +- .../doris/load/routineload/RoutineLoadJob.java | 11 ++ .../apache/doris/load/update/UpdatePlanner.java | 2 +- .../org/apache/doris/planner/OlapScanNode.java | 10 +- .../org/apache/doris/planner/OlapTableSink.java | 20 ++-- .../apache/doris/planner/StreamLoadPlanner.java | 3 +- .../java/org/apache/doris/task/LoadTaskInfo.java | 1 + .../java/org/apache/doris/task/StreamLoadTask.java | 9 ++ .../apache/doris/analysis/CreateTableStmtTest.java | 15 ++- .../doris/load/loadv2/BrokerLoadJobTest.java | 2 +- .../apache/doris/planner/OlapTableSinkTest.java | 8 +- gensrc/thrift/DataSinks.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + 39 files changed, 314 insertions(+), 172 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index cbeda55..2a4c43a 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -21,7 +21,10 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" +#include "util/random.h" #include "util/string_parser.hpp" +#include "util/time.h" + namespace doris { @@ -204,6 +207,31 @@ Status OlapTablePartitionParam::init() { _distributed_slot_descs.emplace_back(it->second); } } + if (_distributed_slot_descs.empty()) { + Random random(UnixMillis()); + _compute_tablet_index = [&random](Tuple* key, int64_t num_buckets) -> uint32_t { + return random.Uniform(num_buckets); + }; + } else { + _compute_tablet_index = [this](Tuple* key, int64_t num_buckets) -> uint32_t { + uint32_t hash_val = 0; + for (auto slot_desc : _distributed_slot_descs) { + void* slot = nullptr; + if (!key->is_null(slot_desc->null_indicator_offset())) { + slot = key->get_slot(slot_desc->tuple_offset()); + } + if (slot != nullptr) { + hash_val = RawValue::zlib_crc32(slot, slot_desc->type(), hash_val); + } else { + //nullptr is treat as 0 when hash + static const int INT_VALUE = 0; + static const TypeDescriptor INT_TYPE(TYPE_INT); + hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val); + } + } + return hash_val % num_buckets; + }; + } // initial partitions for (int i = 0; i < _t_param.partitions.size(); ++i) { const TOlapTablePartition& t_part = _t_param.partitions[i]; @@ -267,26 +295,23 @@ Status OlapTablePartitionParam::init() { return Status::OK(); } -bool OlapTablePartitionParam::find_tablet(Tuple* tuple, const OlapTablePartition** partition, - uint32_t* dist_hashes) const { +bool OlapTablePartitionParam::find_partition(Tuple* tuple, const OlapTablePartition** partition) const { const TOlapTablePartition& t_part = _t_param.partitions[0]; - std::map<Tuple*, OlapTablePartition*, OlapTablePartKeyComparator>::iterator it; - if (t_part.__isset.in_keys) { - it = _partitions_map->find(tuple); - } else { - it = _partitions_map->upper_bound(tuple); - } + auto it = t_part.__isset.in_keys ? _partitions_map->find(tuple) : _partitions_map->upper_bound(tuple); if (it == _partitions_map->end()) { return false; } if (_part_contains(it->second, tuple)) { *partition = it->second; - *dist_hashes = _compute_dist_hash(tuple); return true; } return false; } +uint32_t OlapTablePartitionParam::find_tablet(Tuple* tuple, const OlapTablePartition& partition) const { + return _compute_tablet_index(tuple, partition.num_buckets); +} + Status OlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, Tuple** part_key) { Tuple* tuple = (Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size()); @@ -388,25 +413,6 @@ std::string OlapTablePartitionParam::debug_string() const { return ss.str(); } -uint32_t OlapTablePartitionParam::_compute_dist_hash(Tuple* key) const { - uint32_t hash_val = 0; - for (auto slot_desc : _distributed_slot_descs) { - void* slot = nullptr; - if (!key->is_null(slot_desc->null_indicator_offset())) { - slot = key->get_slot(slot_desc->tuple_offset()); - } - if (slot != nullptr) { - hash_val = RawValue::zlib_crc32(slot, slot_desc->type(), hash_val); - } else { - //nullptr is treat as 0 when hash - static const int INT_VALUE = 0; - static const TypeDescriptor INT_TYPE(TYPE_INT); - hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val); - } - } - return hash_val; -} - VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema, const TOlapTablePartitionParam& t_param) : _schema(schema), @@ -450,6 +456,30 @@ Status VOlapTablePartitionParam::init() { RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed")); } } + if (_distributed_slot_locs.empty()) { + Random random(UnixMillis()); + _compute_tablet_index = [&random](BlockRow* key, int64_t num_buckets) -> uint32_t { + return random.Uniform(num_buckets); + }; + } else { + _compute_tablet_index = [this](BlockRow* key, int64_t num_buckets) -> uint32_t { + uint32_t hash_val = 0; + for (int i = 0; i < _distributed_slot_locs.size(); ++i) { + auto slot_desc = _slots[_distributed_slot_locs[i]]; + auto column = key->first->get_by_position(_distributed_slot_locs[i]).column; + auto val = column->get_data_at(key->second); + if (val.data != nullptr) { + hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type, hash_val); + } else { + // NULL is treat as 0 when hash + static const int INT_VALUE = 0; + static const TypeDescriptor INT_TYPE(TYPE_INT); + hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val); + } + } + return hash_val % num_buckets; + }; + } DCHECK(!_t_param.partitions.empty()) << "must have at least 1 partition"; _is_in_partition = _t_param.partitions[0].__isset.in_keys; @@ -513,20 +543,22 @@ Status VOlapTablePartitionParam::init() { return Status::OK(); } -bool VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTablePartition** partition, - uint32_t* dist_hashes) const { +bool VOlapTablePartitionParam::find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const { auto it = _is_in_partition ? _partitions_map->find(block_row) : _partitions_map->upper_bound(block_row); if (it == _partitions_map->end()) { return false; } if (_is_in_partition || _part_contains(it->second, block_row)) { *partition = it->second; - *dist_hashes = _compute_dist_hash(block_row); return true; } return false; } +uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const { + return _compute_tablet_index(block_row, partition.num_buckets); +} + Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key) { for (int i = 0; i < t_exprs.size(); i++) { @@ -601,23 +633,4 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, return Status::OK(); } -uint32_t VOlapTablePartitionParam::_compute_dist_hash(BlockRow* key) const { - uint32_t hash_val = 0; - for (int i = 0; i < _distributed_slot_locs.size(); ++i) { - auto slot_desc = _slots[_distributed_slot_locs[i]]; - auto column = key->first->get_by_position(_distributed_slot_locs[i]).column; - - auto val = column->get_data_at(key->second); - if (val.data != nullptr) { - hash_val = RawValue::zlib_crc32(val.data, val.size, slot_desc->type().type, hash_val); - } else { - // NULL is treat as 0 when hash - static const int INT_VALUE = 0; - static const TypeDescriptor INT_TYPE(TYPE_INT); - hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val); - } - } - return hash_val; -} - } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index f475663..644c799 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -156,8 +156,9 @@ public: int64_t version() const { return _t_param.version; } // return true if we found this tuple in partition - bool find_tablet(Tuple* tuple, const OlapTablePartition** partitions, - uint32_t* dist_hash) const; + bool find_partition(Tuple* tuple, const OlapTablePartition** partition) const; + + uint32_t find_tablet(Tuple* tuple, const OlapTablePartition& partition) const; const std::vector<OlapTablePartition*>& get_partitions() const { return _partitions; } std::string debug_string() const; @@ -167,7 +168,7 @@ private: Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, SlotDescriptor* slot_desc); - uint32_t _compute_dist_hash(Tuple* key) const; + std::function<uint32_t(Tuple*, int64_t)> _compute_tablet_index; // check if this partition contain this key bool _part_contains(OlapTablePartition* part, Tuple* key) const { @@ -264,9 +265,10 @@ public: int64_t table_id() const { return _t_param.table_id; } int64_t version() const { return _t_param.version; } - // return true if we found this tuple in partition - bool find_tablet(BlockRow* block_row, const VOlapTablePartition** partitions, - uint32_t* dist_hash) const; + // return true if we found this block_row in partition + bool find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const; + + uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const; const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; } @@ -275,7 +277,7 @@ private: Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos); - uint32_t _compute_dist_hash(BlockRow* key) const; + std::function<uint32_t(BlockRow*, int64_t)> _compute_tablet_index; // check if this partition contain this key bool _part_contains(VOlapTablePartition* part, BlockRow* key) const { diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 998936b..eba9bb1 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -685,7 +685,16 @@ Status OlapTableSink::init(const TDataSink& t_sink) { if (table_sink.__isset.send_batch_parallelism && table_sink.send_batch_parallelism > 1) { _send_batch_parallelism = table_sink.send_batch_parallelism; } - + // if distributed column list is empty, we can ensure that tablet is with random distribution info + // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition + // for the whole olap table sink + if (table_sink.partition.distributed_columns.empty()) { + if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { + findTabletMode = FindTabletMode::FIND_TABLET_EVERY_SINK; + } else { + findTabletMode = FindTabletMode::FIND_TABLET_EVERY_BATCH; + } + } return Status::OK(); } @@ -877,14 +886,16 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { SCOPED_RAW_TIMER(&_send_data_ns); bool stop_processing = false; + if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { + _partition_to_tablet_map.clear(); + } for (int i = 0; i < batch->num_rows(); ++i) { Tuple* tuple = batch->get_row(i)->get_tuple(0); if (filtered_rows > 0 && _filter_bitmap.Get(i)) { continue; } const OlapTablePartition* partition = nullptr; - uint32_t dist_hash = 0; - if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { + if (!_partition->find_partition(tuple, &partition)) { RETURN_IF_ERROR(state->append_error_msg_to_file( []() -> std::string { return ""; }, [&]() -> std::string { @@ -900,8 +911,18 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { } continue; } + uint32_t tablet_index = 0; + if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { + if (_partition_to_tablet_map.find(partition->id) == _partition_to_tablet_map.end()) { + tablet_index = _partition->find_tablet(tuple,*partition); + _partition_to_tablet_map.emplace(partition->id, tablet_index); + } else { + tablet_index = _partition_to_tablet_map[partition->id]; + } + } else { + tablet_index = _partition->find_tablet(tuple,*partition); + } _partition_ids.emplace(partition->id); - uint32_t tablet_index = dist_hash % partition->num_buckets; for (int j = 0; j < partition->indexes.size(); ++j) { int64_t tablet_id = partition->indexes[j].tablets[tablet_index]; _channels[j]->add_row(tuple, tablet_id); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8bd1e96..d21d4f2 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -417,6 +417,8 @@ protected: RuntimeProfile* _profile = nullptr; std::set<int64_t> _partition_ids; + // only used for partition with random distribution + std::map<int64_t, int64_t> _partition_to_tablet_map; Bitmap _filter_bitmap; @@ -470,6 +472,17 @@ protected: // TODO(cmy): this should be removed after we switch to rpc attachment by default. bool _transfer_data_by_brpc_attachment = false; + + // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we + // should compute tablet index for every row + // FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should + // compute tablet index for every row batch + // FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should + // only compute tablet index in the corresponding partition once for the whole time in olap table sink + enum FindTabletMode { + FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK + }; + FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW; }; } // namespace stream_load diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index a9da838..65b0edc 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -506,6 +506,14 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } } + if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) { + if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) { + request.__set_load_to_single_tablet(true); + } else { + request.__set_load_to_single_tablet(false); + } + } + if (ctx->timeout_second != -1) { request.__set_timeout(ctx->timeout_second); } diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index a35b241..4a0a042 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -43,16 +43,13 @@ static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; static const std::string HTTP_NUM_AS_STRING = "num_as_string"; static const std::string HTTP_FUZZY_PARSE = "fuzzy_parse"; static const std::string HTTP_READ_JSON_BY_LINE = "read_json_by_line"; - static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_FUNCTION_COLUMN = "function_column"; static const std::string HTTP_SEQUENCE_COL = "sequence_col"; static const std::string HTTP_COMPRESS_TYPE = "compress_type"; - static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; - -static const std::string HTTP_100_CONTINUE = "100-continue"; +static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index f8df6a6..a6af2ee 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -96,14 +96,17 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. bool stop_processing = false; + if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { + _partition_to_tablet_map.clear(); + } for (int i = 0; i < num_rows; ++i) { if (filtered_rows > 0 && _filter_bitmap.Get(i)) { continue; } const VOlapTablePartition* partition = nullptr; - uint32_t dist_hash = 0; + uint32_t tablet_index = 0; block_row = {&block, i}; - if (!_vpartition->find_tablet(&block_row, &partition, &dist_hash)) { + if (!_vpartition->find_partition(&block_row, &partition)) { RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> std::string { return ""; }, [&]() -> std::string { fmt::memory_buffer buf; @@ -117,7 +120,16 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) continue; } _partition_ids.emplace(partition->id); - uint32_t tablet_index = dist_hash % partition->num_buckets; + if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { + if (_partition_to_tablet_map.find(partition->id) == _partition_to_tablet_map.end()) { + tablet_index = _vpartition->find_tablet(&block_row, *partition); + _partition_to_tablet_map.emplace(partition->id, tablet_index); + } else { + tablet_index = _partition_to_tablet_map[partition->id]; + } + } else { + tablet_index = _vpartition->find_tablet(&block_row, *partition); + } for (int j = 0; j < partition->indexes.size(); ++j) { int64_t tablet_id = partition->indexes[j].tablets[tablet_index]; _channels[j]->add_row(block_row, tablet_id); diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 5226d23..06ae914 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -270,7 +270,12 @@ Syntax: Syntax: `DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]` Explain: - The default buckets is 10. + Hash bucketing using the specified key column. + 2) Random + Syntax: + `DISTRIBUTED BY RANDOM [BUCKETS num]` + Explain: + Use random numbers for bucketing. 7. PROPERTIES 1) If ENGINE type is olap. User can specify storage medium, cooldown time and replication number: diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index 2150a32..869ae39 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -256,6 +256,8 @@ under the License. timezone: Specify time zones for functions affected by time zones, such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation for details. If not specified, use the "Asia/Shanghai" time zone. send_batch_parallelism: Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. + + load_to_single_tablet: Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. The number of tasks for the job depends on the overall concurrency. This parameter can only be set when loading data into the OLAP table with random partition. 5. Load data format sample diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index edaa2f5..c695f2a 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -198,6 +198,9 @@ FROM data_source 10. `send_batch_parallelism` Integer, Used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. + + 11. `load_to_single_tablet` + Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. This parameter can only be set when loading data into the OLAP table with random partition. 6. data_source diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 9a8e063..fac74a7 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -144,6 +144,8 @@ The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND `send_batch_parallelism`: Integer type, used to set the default parallelism for sending batch, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`. +`load_to_single_tablet`: Boolean type, True means that one task can only load data to one tablet in the corresponding partition at a time. The default value is false. This parameter can only be set when loading data into the OLAP table with random partition. + RETURN VALUES After the load is completed, the related content of this load will be returned in Json format. Current field included diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index a86f5d9..989a355 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -286,8 +286,12 @@ under the License. 语法: `DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]` 说明: - 使用指定的 key 列进行哈希分桶。默认分区数为10 - + 使用指定的 key 列进行哈希分桶。 + 2) Random 分桶 + 语法: + `DISTRIBUTED BY RANDOM [BUCKETS num]` + 说明: + 使用随机数进行分桶。 建议:建议使用Hash分桶方式 7. PROPERTIES diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index cddc9c7..30726c6 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -249,6 +249,7 @@ under the License. strict mode: 是否对数据进行严格限制。默认为 false。 timezone: 指定某些受时区影响的函数的时区,如 strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 "Asia/Shanghai" 时区。 send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 + load_to_single_tablet: 布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,作业的任务数取决于整体并发度。该参数只允许在对带有random分区的olap表导数的时候设置。 5. 导入数据格式样例 diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 798ff69..3e61aab 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -184,6 +184,10 @@ under the License. 10. send_batch_parallelism 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 + + 11. load_to_single_tablet + + 布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,该参数只允许在对带有random分区的olap表导数的时候设置。 6. data_source diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index ef0c4c4..9edcd46 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -108,6 +108,8 @@ under the License. read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。 send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 `max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 `max_send_batch_parallelism_per_job` 的值。 + + load_to_single_tablet: 布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,该参数只允许在对带有random分区的olap表导数的时候设置。 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 338a786..5bf72ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -117,6 +117,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private static final String NAME_TYPE = "ROUTINE LOAD NAME"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) @@ -134,6 +135,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(LoadStmt.TIMEZONE) .add(EXEC_MEM_LIMIT_PROPERTY) .add(SEND_BATCH_PARALLELISM) + .add(LOAD_TO_SINGLE_TABLET) .build(); private final LabelName labelName; @@ -157,6 +159,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private long execMemLimit = 2 * 1024 * 1024 * 1024L; private String timezone = TimeUtils.DEFAULT_TIME_ZONE; private int sendBatchParallelism = 1; + private boolean loadToSingleTablet = false; /** * RoutineLoad support json data. * Require Params: @@ -240,6 +243,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { return sendBatchParallelism; } + public boolean isLoadToSingleTablet() { + return loadToSingleTablet; + } + public boolean isStrictMode() { return strictMode; } @@ -440,6 +447,9 @@ public class CreateRoutineLoadStmt extends DdlStmt { sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED, SEND_BATCH_PARALLELISM + " should > 0")).intValue(); + loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET), + RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET, + LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean"); if (ConnectContext.get() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java index 31e1d3b..2379d3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java @@ -60,10 +60,10 @@ public class HashDistributionDesc extends DistributionDesc { @Override public void analyze(Set<String> cols) throws AnalysisException { if (numBucket <= 0) { - throw new AnalysisException("Number of hash distribution is zero."); + throw new AnalysisException("Number of hash distribution should be larger than zero."); } if (distributionColumnNames == null || distributionColumnNames.size() == 0) { - throw new AnalysisException("Number of hash column is zero."); + throw new AnalysisException("Number of hash column should be larger than zero."); } for (String columnName : distributionColumnNames) { if (!cols.contains(columnName)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index f750749..290597e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -301,7 +301,6 @@ public class InsertStmt extends DdlStmt { label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()); } if (!isExplain() && !isTransactionBegin) { - if (targetTable instanceof OlapTable) { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; MetricRepo.COUNTER_LOAD_ADD.increase(1L); @@ -318,7 +317,7 @@ public class InsertStmt extends DdlStmt { OlapTableSink sink = (OlapTableSink) dataSink; TUniqueId loadId = analyzer.getContext().queryId(); int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism(); - sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism); + sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism, false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index f68d58c..4bf0ae4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -81,6 +81,7 @@ public class LoadStmt extends DdlStmt { public static final String TIMEZONE = "timezone"; public static final String LOAD_PARALLELISM = "load_parallelism"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; // for load data from Baidu Object Store(BOS) public static final String BOS_ENDPOINT = "bos_endpoint"; @@ -173,6 +174,12 @@ public class LoadStmt extends DdlStmt { return s; } }) + .put(LOAD_TO_SINGLE_TABLET, new Function<String, Boolean>() { + @Override + public @Nullable Boolean apply(@Nullable String s) { + return Boolean.valueOf(s); + } + }) .build(); public LoadStmt(LabelName label, List<DataDescription> dataDescriptions, diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 782ad3d..9d77a9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -55,10 +55,7 @@ public class ModifyTablePropertiesClause extends AlterTableClause { throw new AnalysisException("Can only change storage type to COLUMN"); } } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) { - if (!properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE).equalsIgnoreCase("hash")) { - throw new AnalysisException("Can only change distribution type to HASH"); - } - this.needTableStable = false; + throw new AnalysisException("Cannot change distribution type for olap table now"); } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK).equalsIgnoreCase("true")) { throw new AnalysisException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java index 9dd7475..947cebb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.List; import java.util.Set; -@Deprecated public class RandomDistributionDesc extends DistributionDesc { int numBucket; @@ -44,7 +43,9 @@ public class RandomDistributionDesc extends DistributionDesc { @Override public void analyze(Set<String> colSet) throws AnalysisException { - throw new AnalysisException("Random distribution is deprecated now, use Hash distribution instead"); + if (numBucket <= 0) { + throw new AnalysisException("Number of random distribution should be larger than zero."); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 26b5210..652945b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3794,6 +3794,9 @@ public class Catalog { try { String colocateGroup = PropertyAnalyzer.analyzeColocate(properties); if (colocateGroup != null) { + if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) { + throw new AnalysisException("Random distribution for colocate table is unsupported"); + } String fullGroupName = db.getId() + "_" + colocateGroup; ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName); if (groupSchema != null) { @@ -4516,79 +4519,74 @@ public class Catalog { private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException { - DistributionInfoType distributionInfoType = distributionInfo.getType(); - if (distributionInfoType == DistributionInfoType.HASH) { - ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); - Map<Tag, List<List<Long>>> backendsPerBucketSeq = null; - GroupId groupId = null; - if (colocateIndex.isColocateTable(tabletMeta.getTableId())) { - // if this is a colocate table, try to get backend seqs from colocation index. - groupId = colocateIndex.getGroup(tabletMeta.getTableId()); - backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId); + ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); + Map<Tag, List<List<Long>>> backendsPerBucketSeq = null; + GroupId groupId = null; + if (colocateIndex.isColocateTable(tabletMeta.getTableId())) { + if (distributionInfo.getType() == DistributionInfoType.RANDOM) { + throw new DdlException("Random distribution for colocate table is unsupported"); } + // if this is a colocate table, try to get backend seqs from colocation index. + groupId = colocateIndex.getGroup(tabletMeta.getTableId()); + backendsPerBucketSeq = colocateIndex.getBackendsPerBucketSeq(groupId); + } - // chooseBackendsArbitrary is true, means this may be the first table of colocation group, - // or this is just a normal table, and we can choose backends arbitrary. - // otherwise, backends should be chosen from backendsPerBucketSeq; - boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty(); - if (chooseBackendsArbitrary) { - backendsPerBucketSeq = Maps.newHashMap(); - } - for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { - // create a new tablet with random chosen backends - Tablet tablet = new Tablet(getNextId()); - - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); - tabletIdSet.add(tablet.getId()); - - // get BackendIds - Map<Tag, List<Long>> chosenBackendIds; - if (chooseBackendsArbitrary) { - // This is the first colocate table in the group, or just a normal table, - // randomly choose backends - if (!Config.disable_storage_medium_check) { - chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, - tabletMeta.getStorageMedium()); - } else { - chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null); - } + // chooseBackendsArbitrary is true, means this may be the first table of colocation group, + // or this is just a normal table, and we can choose backends arbitrary. + // otherwise, backends should be chosen from backendsPerBucketSeq; + boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || backendsPerBucketSeq.isEmpty(); + if (chooseBackendsArbitrary) { + backendsPerBucketSeq = Maps.newHashMap(); + } + for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { + // create a new tablet with random chosen backends + Tablet tablet = new Tablet(getNextId()); - for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) { - backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList()); - backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue()); - } + // add tablet to inverted index first + index.addTablet(tablet, tabletMeta); + tabletIdSet.add(tablet.getId()); + + // get BackendIds + Map<Tag, List<Long>> chosenBackendIds; + if (chooseBackendsArbitrary) { + // This is the first colocate table in the group, or just a normal table, + // randomly choose backends + if (!Config.disable_storage_medium_check) { + chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, + tabletMeta.getStorageMedium()); } else { - // get backends from existing backend sequence - chosenBackendIds = Maps.newHashMap(); - for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) { - chosenBackendIds.put(entry.getKey(), entry.getValue().get(i)); - } + chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null); } - // create replicas - short totalReplicaNum = (short) 0; - for (List<Long> backendIds : chosenBackendIds.values()) { - for (long backendId : backendIds) { - long replicaId = getNextId(); - Replica replica = new Replica(replicaId, backendId, replicaState, version, - tabletMeta.getOldSchemaHash()); - tablet.addReplica(replica); - totalReplicaNum++; - } + for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) { + backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList()); + backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue()); + } + } else { + // get backends from existing backend sequence + chosenBackendIds = Maps.newHashMap(); + for (Map.Entry<Tag, List<List<Long>>> entry : backendsPerBucketSeq.entrySet()) { + chosenBackendIds.put(entry.getKey(), entry.getValue().get(i)); } - Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(), - totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum()); } - - if (groupId != null && chooseBackendsArbitrary) { - colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); - ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); - editLog.logColocateBackendsPerBucketSeq(info); + // create replicas + short totalReplicaNum = (short) 0; + for (List<Long> backendIds : chosenBackendIds.values()) { + for (long backendId : backendIds) { + long replicaId = getNextId(); + Replica replica = new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash()); + tablet.addReplica(replica); + totalReplicaNum++; + } } + Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum(), + totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum()); + } - } else { - throw new DdlException("Unknown distribution type: " + distributionInfoType); + if (groupId != null && chooseBackendsArbitrary) { + colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + editLog.logColocateBackendsPerBucketSeq(info); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java index 33d883d..ef1712b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java @@ -37,7 +37,6 @@ public abstract class DistributionInfo implements Writable { public enum DistributionInfoType { HASH, - @Deprecated RANDOM } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index cefd28e..d555e7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -200,7 +200,7 @@ public class BrokerLoadJob extends BulkLoadJob { brokerFileGroups, getDeadlineMs(), getExecMemLimit(), isStrictMode(), transactionId, this, getTimeZone(), getTimeout(), getLoadParallelism(), getSendBatchParallelism(), - getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null); + getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink()); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 0fe8ef0..047613c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -409,6 +409,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE); jobProperties.put(LoadStmt.LOAD_PARALLELISM, Config.default_load_parallelism); jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1); + jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, false); } public void isJobTypeRead(boolean jobTypeRead) { @@ -1225,6 +1226,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM); } + public boolean isSingleTabletLoadPerSink() { + return (boolean) jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET); + } + // Return true if this job is finished for a long time public boolean isExpired(long currentTimeMs) { if (!isCompleted()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 6ff10fe..5733133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -67,6 +67,7 @@ public class LoadLoadingTask extends LoadTask { private final int loadParallelism; private final int sendBatchParallelism; private final boolean loadZeroTolerance; + private final boolean singleTabletLoadPerSink; private LoadingTaskPlanner planner; @@ -78,7 +79,7 @@ public class LoadLoadingTask extends LoadTask { long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, - boolean loadZeroTolerance, RuntimeProfile profile) { + boolean loadZeroTolerance, RuntimeProfile profile, boolean singleTabletLoadPerSink) { super(callback, TaskType.LOADING); this.db = db; this.table = table; @@ -96,6 +97,7 @@ public class LoadLoadingTask extends LoadTask { this.sendBatchParallelism = sendBatchParallelism; this.loadZeroTolerance = loadZeroTolerance; this.jobProfile = profile; + this.singleTabletLoadPerSink = singleTabletLoadPerSink; } public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index e8928dd..903e65f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -132,7 +132,7 @@ public class LoadingTaskPlanner { // 2. Olap table sink List<Long> partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds); - olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false); olapTableSink.complete(); // 3. Plan fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 2549ce3..0026b80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -107,6 +107,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public static final long DEFAULT_EXEC_MEM_LIMIT = 2 * 1024 * 1024 * 1024L; public static final boolean DEFAULT_STRICT_MODE = false; // default is false public static final int DEFAULT_SEND_BATCH_PARALLELISM = 1; + public static final boolean DEFAULT_LOAD_TO_SINGLE_TABLET = false; protected static final String STAR_STRING = "*"; /* @@ -167,6 +168,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT; protected int sendBatchParallelism = DEFAULT_SEND_BATCH_PARALLELISM; + protected boolean loadToSingleTablet = DEFAULT_LOAD_TO_SINGLE_TABLET; // include strict mode protected Map<String, String> jobProperties = Maps.newHashMap(); @@ -286,10 +288,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (stmt.getSendBatchParallelism() > 0) { this.sendBatchParallelism = stmt.getSendBatchParallelism(); } + if (stmt.isLoadToSingleTablet()) { + this.loadToSingleTablet = stmt.isLoadToSingleTablet(); + } jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone()); jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode())); jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(this.execMemLimit)); jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism)); + jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet)); if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); @@ -552,6 +558,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } @Override + public boolean isLoadToSingleTablet() { + return loadToSingleTablet; + } + + @Override public boolean isReadJsonByLine() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java index a686d26..aaadd11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java @@ -91,7 +91,7 @@ public class UpdatePlanner extends Planner { OlapTableSink olapTableSink = new OlapTableSink(targetTable, computeTargetTupleDesc(), null); olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId, analyzer.getContext().getSessionVariable().queryTimeoutS, - analyzer.getContext().getSessionVariable().sendBatchParallelism); + analyzer.getContext().getSessionVariable().sendBatchParallelism, false); olapTableSink.complete(); // 3. gen plan fragment PlanFragment planFragment = new PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 3af8fe2..ab0e3b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -905,9 +905,9 @@ public class OlapScanNode extends ScanNode { /* Although sometimes the scan range only involves one instance, - the data distribution cannot be set to UNPARTITION here. - The reason is that @coordicator will not set the scan range for the fragment, - when data partition of fragment is UNPARTITION. + the data distribution cannot be set to UNPARTITIONED here. + The reason is that @coordinator will not set the scan range for the fragment, + when data partition of fragment is UNPARTITIONED. */ public DataPartition constructInputPartitionByDistributionInfo() throws UserException { ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex(); @@ -917,9 +917,7 @@ public class OlapScanNode extends ScanNode { || olapTable.getPartitions().size() == 1) { DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); if (!(distributionInfo instanceof HashDistributionInfo)) { - // There may be some random distribution table left, throw exception here. - // And these table should be modified to hash distribution by ALTER TABLE operation. - throw new UserException("Table with non hash distribution is not supported: " + olapTable.getName()); + return DataPartition.RANDOM; } List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); List<Expr> dataDistributeExprs = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index edc7627..7a5f0bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; @@ -93,13 +94,18 @@ public class OlapTableSink extends DataSink { this.partitionIds = partitionIds; } - public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, int sendBatchParallelism) throws AnalysisException { + public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, + int sendBatchParallelism, boolean loadToSingleTablet) throws AnalysisException { TOlapTableSink tSink = new TOlapTableSink(); tSink.setLoadId(loadId); tSink.setTxnId(txnId); tSink.setDbId(dbId); tSink.setLoadChannelTimeoutS(loadChannelTimeoutS); tSink.setSendBatchParallelism(sendBatchParallelism); + if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) { + throw new AnalysisException("if load_to_single_tablet set to true, the olap table must be with random distribution"); + } + tSink.setLoadToSingleTablet(loadToSingleTablet); tDataSink = new TDataSink(TDataSinkType.OLAP_TABLE_SINK); tDataSink.setOlapTableSink(tSink); @@ -189,7 +195,7 @@ public class OlapTableSink extends DataSink { return schemaParam; } - private List<String> getDistColumns(DistributionInfo distInfo, OlapTable table) throws UserException { + private List<String> getDistColumns(DistributionInfo distInfo) throws UserException { List<String> distColumns = Lists.newArrayList(); switch (distInfo.getType()) { case HASH: { @@ -200,11 +206,7 @@ public class OlapTableSink extends DataSink { break; } case RANDOM: { - for (Column column : table.getBaseSchema()) { - if (column.isKey()) { - distColumns.add(column.getName()); - } - } + // RandomDistributionInfo doesn't have distributedColumns break; } default: @@ -247,7 +249,7 @@ public class OlapTableSink extends DataSink { DistributionInfo distInfo = partition.getDistributionInfo(); if (selectedDistInfo == null) { - partitionParam.setDistributedColumns(getDistColumns(distInfo, table)); + partitionParam.setDistributedColumns(getDistColumns(distInfo)); selectedDistInfo = distInfo; } else { if (selectedDistInfo.getType() != distInfo.getType()) { @@ -275,7 +277,7 @@ public class OlapTableSink extends DataSink { } partitionParam.addToPartitions(tPartition); partitionParam.setDistributedColumns( - getDistColumns(partition.getDistributionInfo(), table)); + getDistColumns(partition.getDistributionInfo())); break; } default: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index c2a2e71..a454647 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -151,7 +151,8 @@ public class StreamLoadPlanner { // create dest sink List<Long> partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout(), taskInfo.getSendBatchParallelism()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), taskInfo.getTimeout(), + taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet()); olapTableSink.complete(); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index e5a39f3..5dc263d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -60,6 +60,7 @@ public interface LoadTaskInfo { public Separator getColumnSeparator(); public Separator getLineDelimiter(); public int getSendBatchParallelism(); + public boolean isLoadToSingleTablet(); public static class ImportColumnDescs { public List<ImportColumnDesc> descs = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 08b7f80..248b7f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -75,6 +75,7 @@ public class StreamLoadTask implements LoadTaskInfo { private String sequenceCol; private int sendBatchParallelism = 1; private double maxFilterRatio = 0.0; + private boolean loadToSingleTablet = false; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -130,6 +131,11 @@ public class StreamLoadTask implements LoadTaskInfo { return sendBatchParallelism; } + @Override + public boolean isLoadToSingleTablet() { + return loadToSingleTablet; + } + public PartitionNames getPartitions() { return partitions; } @@ -301,6 +307,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetMaxFilterRatio()) { maxFilterRatio = request.getMaxFilterRatio(); } + if (request.isSetLoadToSingleTablet()) { + loadToSingleTablet = request.isLoadToSingleTablet(); + } } // used for stream load diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index 552a8a6..624b829 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -106,6 +106,18 @@ public class CreateTableStmtTest { } @Test + public void testCreateTableWithRandomDistribution() throws UserException { + CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, cols, "olap", + new KeysDesc(KeysType.DUP_KEYS, colsName), null, + new RandomDistributionDesc(6), null, null, ""); + stmt.analyze(analyzer); + Assert.assertEquals("testCluster:db1", stmt.getDbName()); + Assert.assertEquals("table1", stmt.getTableName()); + Assert.assertNull(stmt.getProperties()); + Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY RANDOM\nBUCKETS 6")); + } + + @Test public void testCreateTableWithRollup() throws UserException { List<AlterClause> ops = Lists.newArrayList(); ops.add(new AddRollupClause("index1", Lists.newArrayList("col1", "col2"), null, "table1", null)); @@ -121,7 +133,7 @@ public class CreateTableStmtTest { } @Test - public void testDefaultDbNormal() throws UserException, AnalysisException { + public void testDefaultDbNormal() throws UserException { CreateTableStmt stmt = new CreateTableStmt(false, false, tblNameNoDb, cols, "olap", new KeysDesc(KeysType.AGG_KEYS, colsName), null, new HashDistributionDesc(10, Lists.newArrayList("col1")), null, null, ""); @@ -174,7 +186,6 @@ public class CreateTableStmtTest { @Test public void testBmpHllKey() throws Exception { - ColumnDef bitmap = new ColumnDef("col3", new TypeDef(ScalarType.createType(PrimitiveType.BITMAP))); cols.add(bitmap); colsName.add("col3"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index ae96752..21aa0d3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -363,7 +363,7 @@ public class BrokerLoadJobTest { RuntimeProfile jobProfile = new RuntimeProfile("test"); LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups, 100, 100, false, 100, callback, "", - 100, 1, 1, true, jobProfile); + 100, 1, 1, true, jobProfile, false); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); userInfo.setIsAnalyzed(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index ba5ca64..a072cf0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -102,7 +102,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L)); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false); sink.complete(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -132,7 +132,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId())); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false); try { sink.complete(); } catch (UserException e) { @@ -154,7 +154,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId)); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false); sink.complete(); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -184,7 +184,7 @@ public class OlapTableSinkTest { }}; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId())); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false); try { sink.complete(); } catch (UserException e) { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 7320fa9..c1bc78a 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -141,6 +141,7 @@ struct TOlapTableSink { 13: required Descriptors.TPaloNodesInfo nodes_info 14: optional i64 load_channel_timeout_s // the timeout of load channels in second 15: optional i32 send_batch_parallelism + 16: optional bool load_to_single_tablet } struct TDataSink { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a6db982..a56677b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -597,6 +597,7 @@ struct TStreamLoadPutRequest { 34: optional string auth_code_uuid 35: optional i32 send_batch_parallelism 36: optional double max_filter_ratio + 37: optional bool load_to_single_tablet } struct TStreamLoadPutResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org