This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 83521a8  [Feature](create_table) Support create table with random 
distribution to avoid data skew (#8041)
83521a8 is described below

commit 83521a826a07693246bdb6e7834d7de2500d00d5
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Sat Feb 26 10:38:55 2022 +0800

    [Feature](create_table) Support create table with random distribution to 
avoid data skew (#8041)
    
    In some scenarios, users cannot find a suitable hash key to avoid data 
skew, so we need to provide an additional data distribution for olap table to 
avoid data skew
    
    example:
    CREATE TABLE random_table
    (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY random BUCKETS 10
    PROPERTIES("replication_num" = "1");
    
    Co-authored-by: caiconghui1 <caicongh...@jd.com>
---
 be/src/exec/tablet_info.cpp                        | 113 ++++++++++---------
 be/src/exec/tablet_info.h                          |  16 +--
 be/src/exec/tablet_sink.cpp                        |  29 ++++-
 be/src/exec/tablet_sink.h                          |  13 +++
 be/src/http/action/stream_load.cpp                 |   8 ++
 be/src/http/http_common.h                          |   5 +-
 be/src/vec/sink/vtablet_sink.cpp                   |  18 ++-
 .../sql-statements/Data Definition/CREATE TABLE.md |   7 +-
 .../Data Manipulation/BROKER LOAD.md               |   2 +
 .../Data Manipulation/ROUTINE LOAD.md              |   3 +
 .../Data Manipulation/STREAM LOAD.md               |   2 +
 .../sql-statements/Data Definition/CREATE TABLE.md |   8 +-
 .../Data Manipulation/BROKER LOAD.md               |   1 +
 .../Data Manipulation/ROUTINE LOAD.md              |   4 +
 .../Data Manipulation/STREAM LOAD.md               |   2 +
 .../doris/analysis/CreateRoutineLoadStmt.java      |  10 ++
 .../doris/analysis/HashDistributionDesc.java       |   4 +-
 .../java/org/apache/doris/analysis/InsertStmt.java |   3 +-
 .../java/org/apache/doris/analysis/LoadStmt.java   |   7 ++
 .../analysis/ModifyTablePropertiesClause.java      |   5 +-
 .../doris/analysis/RandomDistributionDesc.java     |   5 +-
 .../java/org/apache/doris/catalog/Catalog.java     | 124 ++++++++++-----------
 .../org/apache/doris/catalog/DistributionInfo.java |   1 -
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   2 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   5 +
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   4 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   2 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  11 ++
 .../apache/doris/load/update/UpdatePlanner.java    |   2 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  10 +-
 .../org/apache/doris/planner/OlapTableSink.java    |  20 ++--
 .../apache/doris/planner/StreamLoadPlanner.java    |   3 +-
 .../java/org/apache/doris/task/LoadTaskInfo.java   |   1 +
 .../java/org/apache/doris/task/StreamLoadTask.java |   9 ++
 .../apache/doris/analysis/CreateTableStmtTest.java |  15 ++-
 .../doris/load/loadv2/BrokerLoadJobTest.java       |   2 +-
 .../apache/doris/planner/OlapTableSinkTest.java    |   8 +-
 gensrc/thrift/DataSinks.thrift                     |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 39 files changed, 314 insertions(+), 172 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index cbeda55..2a4c43a 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -21,7 +21,10 @@
 #include "runtime/mem_tracker.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
+#include "util/random.h"
 #include "util/string_parser.hpp"
+#include "util/time.h"
+
 
 namespace doris {
 
@@ -204,6 +207,31 @@ Status OlapTablePartitionParam::init() {
             _distributed_slot_descs.emplace_back(it->second);
         }
     }
+    if (_distributed_slot_descs.empty()) {
+        Random random(UnixMillis());
+        _compute_tablet_index = [&random](Tuple* key, int64_t num_buckets) -> 
uint32_t {
+            return random.Uniform(num_buckets);
+        };
+    } else {
+        _compute_tablet_index = [this](Tuple* key, int64_t num_buckets) -> 
uint32_t {
+            uint32_t hash_val = 0;
+            for (auto slot_desc : _distributed_slot_descs) {
+                void* slot = nullptr;
+                if (!key->is_null(slot_desc->null_indicator_offset())) {
+                    slot = key->get_slot(slot_desc->tuple_offset());
+                }
+                if (slot != nullptr) {
+                    hash_val = RawValue::zlib_crc32(slot, slot_desc->type(), 
hash_val);
+                } else {
+                    //nullptr is treat as 0 when hash
+                    static const int INT_VALUE = 0;
+                    static const TypeDescriptor INT_TYPE(TYPE_INT);
+                    hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, 
hash_val);
+                }
+            }
+            return hash_val % num_buckets;
+        };
+    }
     // initial partitions
     for (int i = 0; i < _t_param.partitions.size(); ++i) {
         const TOlapTablePartition& t_part = _t_param.partitions[i];
@@ -267,26 +295,23 @@ Status OlapTablePartitionParam::init() {
     return Status::OK();
 }
 
-bool OlapTablePartitionParam::find_tablet(Tuple* tuple, const 
OlapTablePartition** partition,
-                                          uint32_t* dist_hashes) const {
+bool OlapTablePartitionParam::find_partition(Tuple* tuple, const 
OlapTablePartition** partition) const {
     const TOlapTablePartition& t_part = _t_param.partitions[0];
-    std::map<Tuple*, OlapTablePartition*, 
OlapTablePartKeyComparator>::iterator it;
-    if (t_part.__isset.in_keys) {
-        it = _partitions_map->find(tuple);
-    } else {
-        it = _partitions_map->upper_bound(tuple);
-    }
+    auto it = t_part.__isset.in_keys ? _partitions_map->find(tuple) : 
_partitions_map->upper_bound(tuple);
     if (it == _partitions_map->end()) {
         return false;
     }
     if (_part_contains(it->second, tuple)) {
         *partition = it->second;
-        *dist_hashes = _compute_dist_hash(tuple);
         return true;
     }
     return false;
 }
 
+uint32_t OlapTablePartitionParam::find_tablet(Tuple* tuple, const 
OlapTablePartition& partition) const {
+    return _compute_tablet_index(tuple, partition.num_buckets);
+}
+
 Status OlapTablePartitionParam::_create_partition_keys(const 
std::vector<TExprNode>& t_exprs,
                                                        Tuple** part_key) {
     Tuple* tuple = 
(Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size());
@@ -388,25 +413,6 @@ std::string OlapTablePartitionParam::debug_string() const {
     return ss.str();
 }
 
-uint32_t OlapTablePartitionParam::_compute_dist_hash(Tuple* key) const {
-    uint32_t hash_val = 0;
-    for (auto slot_desc : _distributed_slot_descs) {
-        void* slot = nullptr;
-        if (!key->is_null(slot_desc->null_indicator_offset())) {
-            slot = key->get_slot(slot_desc->tuple_offset());
-        }
-        if (slot != nullptr) {
-            hash_val = RawValue::zlib_crc32(slot, slot_desc->type(), hash_val);
-        } else {
-            //nullptr is treat as 0 when hash
-            static const int INT_VALUE = 0;
-            static const TypeDescriptor INT_TYPE(TYPE_INT);
-            hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val);
-        }
-    }
-    return hash_val;
-}
-
 
VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>&
 schema,
                                                  const 
TOlapTablePartitionParam& t_param)
         : _schema(schema),
@@ -450,6 +456,30 @@ Status VOlapTablePartitionParam::init() {
             RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, 
"distributed"));
         }
     }
+    if (_distributed_slot_locs.empty()) {
+        Random random(UnixMillis());
+        _compute_tablet_index = [&random](BlockRow* key, int64_t num_buckets) 
-> uint32_t {
+            return random.Uniform(num_buckets);
+        };
+    } else {
+        _compute_tablet_index = [this](BlockRow* key, int64_t num_buckets) -> 
uint32_t {
+            uint32_t hash_val = 0;
+            for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
+                auto slot_desc = _slots[_distributed_slot_locs[i]];
+                auto column = 
key->first->get_by_position(_distributed_slot_locs[i]).column;
+                auto val = column->get_data_at(key->second);
+                if (val.data != nullptr) {
+                    hash_val = RawValue::zlib_crc32(val.data, val.size, 
slot_desc->type().type, hash_val);
+                } else {
+                    // NULL is treat as 0 when hash
+                    static const int INT_VALUE = 0;
+                    static const TypeDescriptor INT_TYPE(TYPE_INT);
+                    hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, 
hash_val);
+                }
+            }
+            return hash_val % num_buckets;
+        };
+    }
 
     DCHECK(!_t_param.partitions.empty()) << "must have at least 1 partition";
     _is_in_partition = _t_param.partitions[0].__isset.in_keys;
@@ -513,20 +543,22 @@ Status VOlapTablePartitionParam::init() {
     return Status::OK();
 }
 
-bool VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const 
VOlapTablePartition** partition,
-                                           uint32_t* dist_hashes) const {
+bool VOlapTablePartitionParam::find_partition(BlockRow* block_row, const 
VOlapTablePartition** partition) const {
     auto it = _is_in_partition ? _partitions_map->find(block_row) : 
_partitions_map->upper_bound(block_row);
     if (it == _partitions_map->end()) {
         return false;
     }
     if (_is_in_partition || _part_contains(it->second, block_row)) {
         *partition = it->second;
-        *dist_hashes = _compute_dist_hash(block_row);
         return true;
     }
     return false;
 }
 
+uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const 
VOlapTablePartition& partition) const {
+    return _compute_tablet_index(block_row, partition.num_buckets);
+}
+
 Status VOlapTablePartitionParam::_create_partition_keys(const 
std::vector<TExprNode>& t_exprs,
                                                        BlockRow* part_key) {
     for (int i = 0; i < t_exprs.size(); i++) {
@@ -601,23 +633,4 @@ Status 
VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
     return Status::OK();
 }
 
-uint32_t VOlapTablePartitionParam::_compute_dist_hash(BlockRow* key) const {
-    uint32_t hash_val = 0;
-    for (int i = 0; i < _distributed_slot_locs.size(); ++i) {
-        auto slot_desc = _slots[_distributed_slot_locs[i]];
-        auto column = 
key->first->get_by_position(_distributed_slot_locs[i]).column;
-
-        auto val = column->get_data_at(key->second);
-        if (val.data != nullptr) {
-            hash_val = RawValue::zlib_crc32(val.data, val.size, 
slot_desc->type().type, hash_val);
-        } else {
-            // NULL is treat as 0 when hash
-            static const int INT_VALUE = 0;
-            static const TypeDescriptor INT_TYPE(TYPE_INT);
-            hash_val = RawValue::zlib_crc32(&INT_VALUE, INT_TYPE, hash_val);
-        }
-    }
-    return hash_val;
-}
-
 } // namespace doris
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index f475663..644c799 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -156,8 +156,9 @@ public:
     int64_t version() const { return _t_param.version; }
 
     // return true if we found this tuple in partition
-    bool find_tablet(Tuple* tuple, const OlapTablePartition** partitions,
-                     uint32_t* dist_hash) const;
+    bool find_partition(Tuple* tuple, const OlapTablePartition** partition) 
const;
+
+    uint32_t find_tablet(Tuple* tuple, const OlapTablePartition& partition) 
const;
 
     const std::vector<OlapTablePartition*>& get_partitions() const { return 
_partitions; }
     std::string debug_string() const;
@@ -167,7 +168,7 @@ private:
 
     Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple, 
SlotDescriptor* slot_desc);
 
-    uint32_t _compute_dist_hash(Tuple* key) const;
+    std::function<uint32_t(Tuple*, int64_t)> _compute_tablet_index;
 
     // check if this partition contain this key
     bool _part_contains(OlapTablePartition* part, Tuple* key) const {
@@ -264,9 +265,10 @@ public:
     int64_t table_id() const { return _t_param.table_id; }
     int64_t version() const { return _t_param.version; }
 
-    // return true if we found this tuple in partition
-    bool find_tablet(BlockRow* block_row, const VOlapTablePartition** 
partitions,
-                     uint32_t* dist_hash) const;
+    // return true if we found this block_row in partition
+    bool find_partition(BlockRow* block_row, const VOlapTablePartition** 
partition) const;
+
+    uint32_t find_tablet(BlockRow* block_row, const VOlapTablePartition& 
partition) const;
 
     const std::vector<VOlapTablePartition*>& get_partitions() const { return 
_partitions; }
 
@@ -275,7 +277,7 @@ private:
 
     Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, 
uint16_t pos);
 
-    uint32_t _compute_dist_hash(BlockRow* key) const;
+    std::function<uint32_t(BlockRow*, int64_t)> _compute_tablet_index;
 
     // check if this partition contain this key
     bool _part_contains(VOlapTablePartition* part, BlockRow* key) const {
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 998936b..eba9bb1 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -685,7 +685,16 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
     if (table_sink.__isset.send_batch_parallelism && 
table_sink.send_batch_parallelism > 1) {
         _send_batch_parallelism = table_sink.send_batch_parallelism;
     }
-
+    // if distributed column list is empty, we can ensure that tablet is with 
random distribution info
+    // and if load_to_single_tablet is set and set to true, we should find 
only one tablet in one partition
+    // for the whole olap table sink
+    if (table_sink.partition.distributed_columns.empty()) {
+        if (table_sink.__isset.load_to_single_tablet && 
table_sink.load_to_single_tablet) {
+            findTabletMode = FindTabletMode::FIND_TABLET_EVERY_SINK;
+        } else {
+            findTabletMode = FindTabletMode::FIND_TABLET_EVERY_BATCH;
+        }
+    }
     return Status::OK();
 }
 
@@ -877,14 +886,16 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
 
     SCOPED_RAW_TIMER(&_send_data_ns);
     bool stop_processing = false;
+    if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
+        _partition_to_tablet_map.clear();
+    }
     for (int i = 0; i < batch->num_rows(); ++i) {
         Tuple* tuple = batch->get_row(i)->get_tuple(0);
         if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
             continue;
         }
         const OlapTablePartition* partition = nullptr;
-        uint32_t dist_hash = 0;
-        if (!_partition->find_tablet(tuple, &partition, &dist_hash)) {
+        if (!_partition->find_partition(tuple, &partition)) {
             RETURN_IF_ERROR(state->append_error_msg_to_file(
                     []() -> std::string { return ""; },
                     [&]() -> std::string {
@@ -900,8 +911,18 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* 
input_batch) {
             }
             continue;
         }
+        uint32_t tablet_index = 0;
+        if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
+            if (_partition_to_tablet_map.find(partition->id) == 
_partition_to_tablet_map.end()) {
+                tablet_index = _partition->find_tablet(tuple,*partition);
+                _partition_to_tablet_map.emplace(partition->id, tablet_index);
+            } else {
+                tablet_index = _partition_to_tablet_map[partition->id];
+            }
+        } else {
+            tablet_index = _partition->find_tablet(tuple,*partition);
+        }
         _partition_ids.emplace(partition->id);
-        uint32_t tablet_index = dist_hash % partition->num_buckets;
         for (int j = 0; j < partition->indexes.size(); ++j) {
             int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
             _channels[j]->add_row(tuple, tablet_id);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8bd1e96..d21d4f2 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -417,6 +417,8 @@ protected:
     RuntimeProfile* _profile = nullptr;
 
     std::set<int64_t> _partition_ids;
+    // only used for partition with random distribution
+    std::map<int64_t, int64_t> _partition_to_tablet_map;
 
     Bitmap _filter_bitmap;
 
@@ -470,6 +472,17 @@ protected:
 
     // TODO(cmy): this should be removed after we switch to rpc attachment by 
default.
     bool _transfer_data_by_brpc_attachment = false;
+
+    // FIND_TABLET_EVERY_ROW is used for both hash and random distribution 
info, which indicates that we
+    // should compute tablet index for every row
+    // FIND_TABLET_EVERY_BATCH is only used for random distribution info, 
which indicates that we should
+    // compute tablet index for every row batch
+    // FIND_TABLET_EVERY_SINK is only used for random distribution info, which 
indicates that we should
+    // only compute tablet index in the corresponding partition once for the 
whole time in olap table sink
+    enum FindTabletMode {
+        FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK
+    };
+    FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW;
 };
 
 } // namespace stream_load
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index a9da838..65b0edc 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -506,6 +506,14 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
         }
     }
 
+    if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
+        if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), 
"true")) {
+            request.__set_load_to_single_tablet(true);
+        } else {
+            request.__set_load_to_single_tablet(false);
+        }
+    }
+
     if (ctx->timeout_second != -1) {
         request.__set_timeout(ctx->timeout_second);
     }
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index a35b241..4a0a042 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -43,16 +43,13 @@ static const std::string HTTP_STRIP_OUTER_ARRAY = 
"strip_outer_array";
 static const std::string HTTP_NUM_AS_STRING = "num_as_string";
 static const std::string HTTP_FUZZY_PARSE = "fuzzy_parse";
 static const std::string HTTP_READ_JSON_BY_LINE = "read_json_by_line";
-
 static const std::string HTTP_MERGE_TYPE = "merge_type";
 static const std::string HTTP_DELETE_CONDITION = "delete";
 static const std::string HTTP_FUNCTION_COLUMN = "function_column";
 static const std::string HTTP_SEQUENCE_COL = "sequence_col";
 static const std::string HTTP_COMPRESS_TYPE = "compress_type";
-
 static const std::string HTTP_SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
-
-static const std::string HTTP_100_CONTINUE = "100-continue";
+static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
 
 static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
 static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f8df6a6..a6af2ee 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -96,14 +96,17 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
     SCOPED_RAW_TIMER(&_send_data_ns);
     // This is just for passing compilation.
     bool stop_processing = false;
+    if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) {
+        _partition_to_tablet_map.clear();
+    }
     for (int i = 0; i < num_rows; ++i) {
         if (filtered_rows > 0 && _filter_bitmap.Get(i)) {
             continue;
         }
         const VOlapTablePartition* partition = nullptr;
-        uint32_t dist_hash = 0;
+        uint32_t tablet_index = 0;
         block_row = {&block, i};
-        if (!_vpartition->find_tablet(&block_row, &partition, &dist_hash)) {
+        if (!_vpartition->find_partition(&block_row, &partition)) {
             RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> 
std::string { return ""; },
                     [&]() -> std::string {
                     fmt::memory_buffer buf;
@@ -117,7 +120,16 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block)
             continue;
         }
         _partition_ids.emplace(partition->id);
-        uint32_t tablet_index = dist_hash % partition->num_buckets;
+        if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) {
+            if (_partition_to_tablet_map.find(partition->id) == 
_partition_to_tablet_map.end()) {
+                tablet_index = _vpartition->find_tablet(&block_row, 
*partition);
+                _partition_to_tablet_map.emplace(partition->id, tablet_index);
+            } else {
+                tablet_index = _partition_to_tablet_map[partition->id];
+            }
+        } else {
+            tablet_index = _vpartition->find_tablet(&block_row, *partition);
+        }
         for (int j = 0; j < partition->indexes.size(); ++j) {
             int64_t tablet_id = partition->indexes[j].tablets[tablet_index];
             _channels[j]->add_row(block_row, tablet_id);
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md
index 5226d23..06ae914 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md      
+++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md      
@@ -270,7 +270,12 @@ Syntax:
        Syntax:
         `DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]`
        Explain:
-        The default buckets is 10.
+         Hash bucketing using the specified key column.
+    2) Random
+       Syntax:
+        `DISTRIBUTED BY RANDOM [BUCKETS num]`
+       Explain:
+         Use random numbers for bucketing.
 7. PROPERTIES
     1) If ENGINE type is olap. User can specify storage medium, cooldown time 
and replication   number:
 
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index 2150a32..869ae39 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
@@ -256,6 +256,8 @@ under the License.
         timezone: Specify time zones for functions affected by time zones, 
such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation 
for details. If not specified, use the "Asia/Shanghai" time zone.
 
         send_batch_parallelism: Used to set the default parallelism for 
sending batch, if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will 
use the value of `max_send_batch_parallelism_per_job`.
+        
+        load_to_single_tablet: Boolean type, True means that one task can only 
load data to one tablet in the corresponding partition at a time. The default 
value is false. The number of tasks for the job depends on the overall 
concurrency. This parameter can only be set when loading data into the OLAP 
table with random partition. 
 
     5. Load data format sample
 
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
index edaa2f5..c695f2a 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
@@ -198,6 +198,9 @@ FROM data_source
 
     10. `send_batch_parallelism`
         Integer, Used to set the default parallelism for sending batch, if the 
value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, 
then the coordinator BE will use the value of 
`max_send_batch_parallelism_per_job`.
+    
+    11. `load_to_single_tablet`
+        Boolean type, True means that one task can only load data to one 
tablet in the corresponding partition at a time. The default value is false. 
This parameter can only be set when loading data into the OLAP table with 
random partition. 
 
 6. data_source
 
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md
index 9a8e063..fac74a7 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
@@ -144,6 +144,8 @@ The type of data merging supports three types: APPEND, 
DELETE, and MERGE. APPEND
 
 `send_batch_parallelism`: Integer type, used to set the default parallelism 
for sending batch, if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will 
use the value of `max_send_batch_parallelism_per_job`.
 
+`load_to_single_tablet`: Boolean type, True means that one task can only load 
data to one tablet in the corresponding partition at a time. The default value 
is false. This parameter can only be set when loading data into the OLAP table 
with random partition.
+
 RETURN VALUES
 
 After the load is completed, the related content of this load will be returned 
in Json format. Current field included
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE 
TABLE.md
index a86f5d9..989a355 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md   
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md   
@@ -286,8 +286,12 @@ under the License.
         语法:
             `DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]`
         说明:
-            使用指定的 key 列进行哈希分桶。默认分区数为10
-
+            使用指定的 key 列进行哈希分桶。
+        2) Random 分桶
+        语法:
+            `DISTRIBUTED BY RANDOM [BUCKETS num]`
+        说明:
+            使用随机数进行分桶。  
     建议:建议使用Hash分桶方式
 
 7. PROPERTIES
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md
index cddc9c7..30726c6 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
@@ -249,6 +249,7 @@ under the License.
         strict mode:     是否对数据进行严格限制。默认为 false。
         timezone:         指定某些受时区影响的函数的时区,如 
strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 
"Asia/Shanghai" 时区。
         send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。
+        load_to_single_tablet: 
布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,作业的任务数取决于整体并发度。该参数只允许在对带有random分区的olap表导数的时候设置。
 
     5. 导入数据格式样例
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md
index 798ff69..3e61aab 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
@@ -184,6 +184,10 @@ under the License.
         10. send_batch_parallelism
             
             整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。 
+               
+        11. load_to_single_tablet
+         
+            
布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,该参数只允许在对带有random分区的olap表导数的时候设置。
 
     6. data_source
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md
index ef0c4c4..9edcd46 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
@@ -108,6 +108,8 @@ under the License.
         read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。
         
         send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。 
+         
+        load_to_single_tablet: 
布尔类型,为true表示支持一个任务只导入数据到对应分区的一个tablet,默认值为false,该参数只允许在对带有random分区的olap表导数的时候设置。
 
     RETURN VALUES
         导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 338a786..5bf72ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -117,6 +117,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
+    public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
 
     private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
@@ -134,6 +135,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(LoadStmt.TIMEZONE)
             .add(EXEC_MEM_LIMIT_PROPERTY)
             .add(SEND_BATCH_PARALLELISM)
+            .add(LOAD_TO_SINGLE_TABLET)
             .build();
 
     private final LabelName labelName;
@@ -157,6 +159,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private long execMemLimit = 2 * 1024 * 1024 * 1024L;
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
     private int sendBatchParallelism = 1;
+    private boolean loadToSingleTablet = false;
     /**
      * RoutineLoad support json data.
      * Require Params:
@@ -240,6 +243,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return sendBatchParallelism;
     }
 
+    public boolean isLoadToSingleTablet() {
+        return loadToSingleTablet;
+    }
+
     public boolean isStrictMode() {
         return strictMode;
     }
@@ -440,6 +447,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         sendBatchParallelism = ((Long) 
Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM),
                 
ConnectContext.get().getSessionVariable().getSendBatchParallelism(), 
SEND_BATCH_PARALLELISM_PRED,
                 SEND_BATCH_PARALLELISM + " should > 0")).intValue();
+        loadToSingleTablet = 
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
+                RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
+                LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
 
         if (ConnectContext.get() != null) {
             timezone = ConnectContext.get().getSessionVariable().getTimeZone();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
index 31e1d3b..2379d3d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
@@ -60,10 +60,10 @@ public class HashDistributionDesc extends DistributionDesc {
     @Override
     public void analyze(Set<String> cols) throws AnalysisException {
         if (numBucket <= 0) {
-            throw new AnalysisException("Number of hash distribution is 
zero.");
+            throw new AnalysisException("Number of hash distribution should be 
larger than zero.");
         }
         if (distributionColumnNames == null || distributionColumnNames.size() 
== 0) {
-            throw new AnalysisException("Number of hash column is zero.");
+            throw new AnalysisException("Number of hash column should be 
larger than zero.");
         }
         for (String columnName : distributionColumnNames) {
             if (!cols.contains(columnName)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index f750749..290597e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -301,7 +301,6 @@ public class InsertStmt extends DdlStmt {
             label = "insert_" + 
DebugUtil.printId(analyzer.getContext().queryId());
         }
         if (!isExplain() && !isTransactionBegin) {
-
             if (targetTable instanceof OlapTable) {
                 LoadJobSourceType sourceType = 
LoadJobSourceType.INSERT_STREAMING;
                 MetricRepo.COUNTER_LOAD_ADD.increase(1L);
@@ -318,7 +317,7 @@ public class InsertStmt extends DdlStmt {
             OlapTableSink sink = (OlapTableSink) dataSink;
             TUniqueId loadId = analyzer.getContext().queryId();
             int sendBatchParallelism = 
analyzer.getContext().getSessionVariable().getSendBatchParallelism();
-            sink.init(loadId, transactionId, db.getId(), timeoutSecond, 
sendBatchParallelism);
+            sink.init(loadId, transactionId, db.getId(), timeoutSecond, 
sendBatchParallelism, false);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index f68d58c..4bf0ae4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -81,6 +81,7 @@ public class LoadStmt extends DdlStmt {
     public static final String TIMEZONE = "timezone";
     public static final String LOAD_PARALLELISM = "load_parallelism";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
+    public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
 
     // for load data from Baidu Object Store(BOS)
     public static final String BOS_ENDPOINT = "bos_endpoint";
@@ -173,6 +174,12 @@ public class LoadStmt extends DdlStmt {
                     return s;
                 }
             })
+            .put(LOAD_TO_SINGLE_TABLET, new Function<String, Boolean>() {
+                @Override
+                public @Nullable Boolean apply(@Nullable String s) {
+                    return Boolean.valueOf(s);
+                }
+            })
             .build();
 
     public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 782ad3d..9d77a9c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -55,10 +55,7 @@ public class ModifyTablePropertiesClause extends 
AlterTableClause {
                 throw new AnalysisException("Can only change storage type to 
COLUMN");
             }
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
-            if 
(!properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE).equalsIgnoreCase("hash"))
 {
-                throw new AnalysisException("Can only change distribution type 
to HASH");
-            }
-            this.needTableStable = false;
+            throw new AnalysisException("Cannot change distribution type for 
olap table now");
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) {
             if 
(!properties.get(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK).equalsIgnoreCase("true"))
 {
                 throw new AnalysisException(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
index 9dd7475..947cebb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
-@Deprecated
 public class RandomDistributionDesc extends DistributionDesc {
     int numBucket;
 
@@ -44,7 +43,9 @@ public class RandomDistributionDesc extends DistributionDesc {
 
     @Override
     public void analyze(Set<String> colSet) throws AnalysisException {
-        throw new AnalysisException("Random distribution is deprecated now, 
use Hash distribution instead");
+        if (numBucket <= 0) {
+            throw new AnalysisException("Number of random distribution should 
be larger than zero.");
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 26b5210..652945b 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3794,6 +3794,9 @@ public class Catalog {
         try {
             String colocateGroup = 
PropertyAnalyzer.analyzeColocate(properties);
             if (colocateGroup != null) {
+                if (defaultDistributionInfo.getType() == 
DistributionInfoType.RANDOM) {
+                    throw new AnalysisException("Random distribution for 
colocate table is unsupported");
+                }
                 String fullGroupName = db.getId() + "_" + colocateGroup;
                 ColocateGroupSchema groupSchema = 
colocateTableIndex.getGroupSchema(fullGroupName);
                 if (groupSchema != null) {
@@ -4516,79 +4519,74 @@ public class Catalog {
     private void createTablets(String clusterName, MaterializedIndex index, 
ReplicaState replicaState,
                                DistributionInfo distributionInfo, long 
version, ReplicaAllocation replicaAlloc,
                                TabletMeta tabletMeta, Set<Long> tabletIdSet) 
throws DdlException {
-        DistributionInfoType distributionInfoType = distributionInfo.getType();
-        if (distributionInfoType == DistributionInfoType.HASH) {
-            ColocateTableIndex colocateIndex = 
Catalog.getCurrentColocateIndex();
-            Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
-            GroupId groupId = null;
-            if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
-                // if this is a colocate table, try to get backend seqs from 
colocation index.
-                groupId = colocateIndex.getGroup(tabletMeta.getTableId());
-                backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+        ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
+        Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
+        GroupId groupId = null;
+        if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
+            if (distributionInfo.getType() == DistributionInfoType.RANDOM) {
+                throw new DdlException("Random distribution for colocate table 
is unsupported");
             }
+            // if this is a colocate table, try to get backend seqs from 
colocation index.
+            groupId = colocateIndex.getGroup(tabletMeta.getTableId());
+            backendsPerBucketSeq = 
colocateIndex.getBackendsPerBucketSeq(groupId);
+        }
 
-            // chooseBackendsArbitrary is true, means this may be the first 
table of colocation group,
-            // or this is just a normal table, and we can choose backends 
arbitrary.
-            // otherwise, backends should be chosen from backendsPerBucketSeq;
-            boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || 
backendsPerBucketSeq.isEmpty();
-            if (chooseBackendsArbitrary) {
-                backendsPerBucketSeq = Maps.newHashMap();
-            }
-            for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
-                // create a new tablet with random chosen backends
-                Tablet tablet = new Tablet(getNextId());
-
-                // add tablet to inverted index first
-                index.addTablet(tablet, tabletMeta);
-                tabletIdSet.add(tablet.getId());
-
-                // get BackendIds
-                Map<Tag, List<Long>> chosenBackendIds;
-                if (chooseBackendsArbitrary) {
-                    // This is the first colocate table in the group, or just 
a normal table,
-                    // randomly choose backends
-                    if (!Config.disable_storage_medium_check) {
-                        chosenBackendIds = 
getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
-                                tabletMeta.getStorageMedium());
-                    } else {
-                        chosenBackendIds = 
getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, 
null);
-                    }
+        // chooseBackendsArbitrary is true, means this may be the first table 
of colocation group,
+        // or this is just a normal table, and we can choose backends 
arbitrary.
+        // otherwise, backends should be chosen from backendsPerBucketSeq;
+        boolean chooseBackendsArbitrary = backendsPerBucketSeq == null || 
backendsPerBucketSeq.isEmpty();
+        if (chooseBackendsArbitrary) {
+            backendsPerBucketSeq = Maps.newHashMap();
+        }
+        for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
+            // create a new tablet with random chosen backends
+            Tablet tablet = new Tablet(getNextId());
 
-                    for (Map.Entry<Tag, List<Long>> entry : 
chosenBackendIds.entrySet()) {
-                        backendsPerBucketSeq.putIfAbsent(entry.getKey(), 
Lists.newArrayList());
-                        
backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
-                    }
+            // add tablet to inverted index first
+            index.addTablet(tablet, tabletMeta);
+            tabletIdSet.add(tablet.getId());
+
+            // get BackendIds
+            Map<Tag, List<Long>> chosenBackendIds;
+            if (chooseBackendsArbitrary) {
+                // This is the first colocate table in the group, or just a 
normal table,
+                // randomly choose backends
+                if (!Config.disable_storage_medium_check) {
+                    chosenBackendIds = 
getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
+                            tabletMeta.getStorageMedium());
                 } else {
-                    // get backends from existing backend sequence
-                    chosenBackendIds = Maps.newHashMap();
-                    for (Map.Entry<Tag, List<List<Long>>> entry : 
backendsPerBucketSeq.entrySet()) {
-                        chosenBackendIds.put(entry.getKey(), 
entry.getValue().get(i));
-                    }
+                    chosenBackendIds = 
getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, 
null);
                 }
 
-                // create replicas
-                short totalReplicaNum = (short) 0;
-                for (List<Long> backendIds : chosenBackendIds.values()) {
-                    for (long backendId : backendIds) {
-                        long replicaId = getNextId();
-                        Replica replica = new Replica(replicaId, backendId, 
replicaState, version,
-                                tabletMeta.getOldSchemaHash());
-                        tablet.addReplica(replica);
-                        totalReplicaNum++;
-                    }
+                for (Map.Entry<Tag, List<Long>> entry : 
chosenBackendIds.entrySet()) {
+                    backendsPerBucketSeq.putIfAbsent(entry.getKey(), 
Lists.newArrayList());
+                    
backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
+                }
+            } else {
+                // get backends from existing backend sequence
+                chosenBackendIds = Maps.newHashMap();
+                for (Map.Entry<Tag, List<List<Long>>> entry : 
backendsPerBucketSeq.entrySet()) {
+                    chosenBackendIds.put(entry.getKey(), 
entry.getValue().get(i));
                 }
-                Preconditions.checkState(totalReplicaNum == 
replicaAlloc.getTotalReplicaNum(),
-                        totalReplicaNum + " vs. " + 
replicaAlloc.getTotalReplicaNum());
             }
-
-            if (groupId != null && chooseBackendsArbitrary) {
-                colocateIndex.addBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
-                ColocatePersistInfo info = 
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
-                editLog.logColocateBackendsPerBucketSeq(info);
+            // create replicas
+            short totalReplicaNum = (short) 0;
+            for (List<Long> backendIds : chosenBackendIds.values()) {
+                for (long backendId : backendIds) {
+                    long replicaId = getNextId();
+                    Replica replica = new Replica(replicaId, backendId, 
replicaState, version, tabletMeta.getOldSchemaHash());
+                    tablet.addReplica(replica);
+                    totalReplicaNum++;
+                }
             }
+            Preconditions.checkState(totalReplicaNum == 
replicaAlloc.getTotalReplicaNum(),
+                    totalReplicaNum + " vs. " + 
replicaAlloc.getTotalReplicaNum());
+        }
 
-        } else {
-            throw new DdlException("Unknown distribution type: " + 
distributionInfoType);
+        if (groupId != null && chooseBackendsArbitrary) {
+            colocateIndex.addBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
+            ColocatePersistInfo info = 
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, 
backendsPerBucketSeq);
+            editLog.logColocateBackendsPerBucketSeq(info);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
index 33d883d..ef1712b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
@@ -37,7 +37,6 @@ public abstract class DistributionInfo implements Writable {
 
     public enum DistributionInfoType {
         HASH,
-        @Deprecated
         RANDOM
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index cefd28e..d555e7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -200,7 +200,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                         brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
                         isStrictMode(), transactionId, this, getTimeZone(), 
getTimeout(),
                         getLoadParallelism(), getSendBatchParallelism(),
-                        getMaxFilterRatio() <= 0, enableProfile ? jobProfile : 
null);
+                        getMaxFilterRatio() <= 0, enableProfile ? jobProfile : 
null, isSingleTabletLoadPerSink());
 
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 0fe8ef0..047613c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -409,6 +409,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE);
         jobProperties.put(LoadStmt.LOAD_PARALLELISM, 
Config.default_load_parallelism);
         jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1);
+        jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, false);
     }
 
     public void isJobTypeRead(boolean jobTypeRead) {
@@ -1225,6 +1226,10 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM);
     }
 
+    public boolean isSingleTabletLoadPerSink() {
+        return (boolean) jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET);
+    }
+
     // Return true if this job is finished for a long time
     public boolean isExpired(long currentTimeMs) {
         if (!isCompleted()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 6ff10fe..5733133 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -67,6 +67,7 @@ public class LoadLoadingTask extends LoadTask {
     private final int loadParallelism;
     private final int sendBatchParallelism;
     private final boolean loadZeroTolerance;
+    private final boolean singleTabletLoadPerSink;
 
     private LoadingTaskPlanner planner;
 
@@ -78,7 +79,7 @@ public class LoadLoadingTask extends LoadTask {
                            long jobDeadlineMs, long execMemLimit, boolean 
strictMode,
                            long txnId, LoadTaskCallback callback, String 
timezone,
                            long timeoutS, int loadParallelism, int 
sendBatchParallelism,
-                           boolean loadZeroTolerance, RuntimeProfile profile) {
+                           boolean loadZeroTolerance, RuntimeProfile profile, 
boolean singleTabletLoadPerSink) {
         super(callback, TaskType.LOADING);
         this.db = db;
         this.table = table;
@@ -96,6 +97,7 @@ public class LoadLoadingTask extends LoadTask {
         this.sendBatchParallelism = sendBatchParallelism;
         this.loadZeroTolerance = loadZeroTolerance;
         this.jobProfile = profile;
+        this.singleTabletLoadPerSink = singleTabletLoadPerSink;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index e8928dd..903e65f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -132,7 +132,7 @@ public class LoadingTaskPlanner {
         // 2. Olap table sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds);
-        olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism);
+        olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism, false);
         olapTableSink.complete();
 
         // 3. Plan fragment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 2549ce3..0026b80 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -107,6 +107,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     public static final long DEFAULT_EXEC_MEM_LIMIT = 2 * 1024 * 1024 * 1024L;
     public static final boolean DEFAULT_STRICT_MODE = false; // default is 
false
     public static final int DEFAULT_SEND_BATCH_PARALLELISM = 1;
+    public static final boolean DEFAULT_LOAD_TO_SINGLE_TABLET = false;
 
     protected static final String STAR_STRING = "*";
      /*
@@ -167,6 +168,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional
     protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT;
     protected int sendBatchParallelism = DEFAULT_SEND_BATCH_PARALLELISM;
+    protected boolean loadToSingleTablet = DEFAULT_LOAD_TO_SINGLE_TABLET;
     // include strict mode
     protected Map<String, String> jobProperties = Maps.newHashMap();
 
@@ -286,10 +288,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         if (stmt.getSendBatchParallelism() > 0) {
             this.sendBatchParallelism = stmt.getSendBatchParallelism();
         }
+        if (stmt.isLoadToSingleTablet()) {
+            this.loadToSingleTablet = stmt.isLoadToSingleTablet();
+        }
         jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone());
         jobProperties.put(LoadStmt.STRICT_MODE, 
String.valueOf(stmt.isStrictMode()));
         jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
String.valueOf(this.execMemLimit));
         jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
+        jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, 
String.valueOf(this.loadToSingleTablet));
 
         if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
             jobProperties.put(PROPS_FORMAT, "csv");
@@ -552,6 +558,11 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     }
 
     @Override
+    public boolean isLoadToSingleTablet() {
+        return loadToSingleTablet;
+    }
+
+    @Override
     public boolean isReadJsonByLine() {
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index a686d26..aaadd11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -91,7 +91,7 @@ public class UpdatePlanner extends Planner {
         OlapTableSink olapTableSink = new OlapTableSink(targetTable, 
computeTargetTupleDesc(), null);
         olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId,
                 analyzer.getContext().getSessionVariable().queryTimeoutS,
-                
analyzer.getContext().getSessionVariable().sendBatchParallelism);
+                
analyzer.getContext().getSessionVariable().sendBatchParallelism, false);
         olapTableSink.complete();
         // 3. gen plan fragment
         PlanFragment planFragment = new 
PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 3af8fe2..ab0e3b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -905,9 +905,9 @@ public class OlapScanNode extends ScanNode {
 
     /*
     Although sometimes the scan range only involves one instance,
-        the data distribution cannot be set to UNPARTITION here.
-    The reason is that @coordicator will not set the scan range for the 
fragment,
-        when data partition of fragment is UNPARTITION.
+        the data distribution cannot be set to UNPARTITIONED here.
+    The reason is that @coordinator will not set the scan range for the 
fragment,
+        when data partition of fragment is UNPARTITIONED.
      */
     public DataPartition constructInputPartitionByDistributionInfo() throws 
UserException {
         ColocateTableIndex colocateTableIndex = 
Catalog.getCurrentColocateIndex();
@@ -917,9 +917,7 @@ public class OlapScanNode extends ScanNode {
                 || olapTable.getPartitions().size() == 1) {
             DistributionInfo distributionInfo = 
olapTable.getDefaultDistributionInfo();
             if (!(distributionInfo instanceof HashDistributionInfo)) {
-                // There may be some random distribution table left, throw 
exception here.
-                // And these table should be modified to hash distribution by 
ALTER TABLE operation.
-                throw new UserException("Table with non hash distribution is 
not supported: " + olapTable.getName());
+                return DataPartition.RANDOM;
             }
             List<Column> distributeColumns = ((HashDistributionInfo) 
distributionInfo).getDistributionColumns();
             List<Expr> dataDistributeExprs = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index edc7627..7a5f0bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
@@ -93,13 +94,18 @@ public class OlapTableSink extends DataSink {
         this.partitionIds = partitionIds;
     }
 
-    public void init(TUniqueId loadId, long txnId, long dbId, long 
loadChannelTimeoutS, int sendBatchParallelism) throws AnalysisException {
+    public void init(TUniqueId loadId, long txnId, long dbId, long 
loadChannelTimeoutS,
+                     int sendBatchParallelism, boolean loadToSingleTablet) 
throws AnalysisException {
         TOlapTableSink tSink = new TOlapTableSink();
         tSink.setLoadId(loadId);
         tSink.setTxnId(txnId);
         tSink.setDbId(dbId);
         tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
         tSink.setSendBatchParallelism(sendBatchParallelism);
+        if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() 
instanceof RandomDistributionInfo)) {
+            throw new AnalysisException("if load_to_single_tablet set to true, 
the olap table must be with random distribution");
+        }
+        tSink.setLoadToSingleTablet(loadToSingleTablet);
         tDataSink = new TDataSink(TDataSinkType.OLAP_TABLE_SINK);
         tDataSink.setOlapTableSink(tSink);
 
@@ -189,7 +195,7 @@ public class OlapTableSink extends DataSink {
         return schemaParam;
     }
 
-    private List<String> getDistColumns(DistributionInfo distInfo, OlapTable 
table) throws UserException {
+    private List<String> getDistColumns(DistributionInfo distInfo) throws 
UserException {
         List<String> distColumns = Lists.newArrayList();
         switch (distInfo.getType()) {
             case HASH: {
@@ -200,11 +206,7 @@ public class OlapTableSink extends DataSink {
                 break;
             }
             case RANDOM: {
-                for (Column column : table.getBaseSchema()) {
-                    if (column.isKey()) {
-                        distColumns.add(column.getName());
-                    }
-                }
+                // RandomDistributionInfo doesn't have distributedColumns
                 break;
             }
             default:
@@ -247,7 +249,7 @@ public class OlapTableSink extends DataSink {
 
                     DistributionInfo distInfo = 
partition.getDistributionInfo();
                     if (selectedDistInfo == null) {
-                        
partitionParam.setDistributedColumns(getDistColumns(distInfo, table));
+                        
partitionParam.setDistributedColumns(getDistColumns(distInfo));
                         selectedDistInfo = distInfo;
                     } else {
                         if (selectedDistInfo.getType() != distInfo.getType()) {
@@ -275,7 +277,7 @@ public class OlapTableSink extends DataSink {
                 }
                 partitionParam.addToPartitions(tPartition);
                 partitionParam.setDistributedColumns(
-                        getDistColumns(partition.getDistributionInfo(), 
table));
+                        getDistColumns(partition.getDistributionInfo()));
                 break;
             }
             default: {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index c2a2e71..a454647 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -151,7 +151,8 @@ public class StreamLoadPlanner {
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds);
-        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), 
taskInfo.getTimeout(), taskInfo.getSendBatchParallelism());
+        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), 
taskInfo.getTimeout(),
+                taskInfo.getSendBatchParallelism(), 
taskInfo.isLoadToSingleTablet());
         olapTableSink.complete();
 
         // for stream load, we only need one fragment, ScanNode -> DataSink.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index e5a39f3..5dc263d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -60,6 +60,7 @@ public interface LoadTaskInfo {
     public Separator getColumnSeparator();
     public Separator getLineDelimiter();
     public int getSendBatchParallelism();
+    public boolean isLoadToSingleTablet();
 
     public static class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 08b7f80..248b7f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -75,6 +75,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private String sequenceCol;
     private int sendBatchParallelism = 1;
     private double maxFilterRatio = 0.0;
+    private boolean loadToSingleTablet = false;
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, 
TFileFormatType formatType) {
         this.id = id;
@@ -130,6 +131,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return sendBatchParallelism;
     }
 
+    @Override
+    public boolean isLoadToSingleTablet() {
+        return loadToSingleTablet;
+    }
+
     public PartitionNames getPartitions() {
         return partitions;
     }
@@ -301,6 +307,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetMaxFilterRatio()) {
             maxFilterRatio = request.getMaxFilterRatio();
         }
+        if (request.isSetLoadToSingleTablet()) {
+            loadToSingleTablet = request.isLoadToSingleTablet();
+        }
     }
 
     // used for stream load
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
index 552a8a6..624b829 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java
@@ -106,6 +106,18 @@ public class CreateTableStmtTest {
     }
 
     @Test
+    public void testCreateTableWithRandomDistribution() throws UserException {
+        CreateTableStmt stmt = new CreateTableStmt(false, false, tblName, 
cols, "olap",
+                new KeysDesc(KeysType.DUP_KEYS, colsName), null,
+                new RandomDistributionDesc(6), null, null, "");
+        stmt.analyze(analyzer);
+        Assert.assertEquals("testCluster:db1", stmt.getDbName());
+        Assert.assertEquals("table1", stmt.getTableName());
+        Assert.assertNull(stmt.getProperties());
+        Assert.assertTrue(stmt.toSql().contains("DISTRIBUTED BY 
RANDOM\nBUCKETS 6"));
+    }
+
+    @Test
     public void testCreateTableWithRollup() throws UserException {
         List<AlterClause> ops = Lists.newArrayList();
         ops.add(new AddRollupClause("index1", Lists.newArrayList("col1", 
"col2"), null, "table1", null));
@@ -121,7 +133,7 @@ public class CreateTableStmtTest {
     }
     
     @Test
-    public void testDefaultDbNormal() throws UserException, AnalysisException {
+    public void testDefaultDbNormal() throws UserException {
         CreateTableStmt stmt = new CreateTableStmt(false, false, tblNameNoDb, 
cols, "olap",
                 new KeysDesc(KeysType.AGG_KEYS, colsName), null,
                 new HashDistributionDesc(10, Lists.newArrayList("col1")), 
null, null, "");
@@ -174,7 +186,6 @@ public class CreateTableStmtTest {
 
     @Test
     public void testBmpHllKey() throws Exception {
-
         ColumnDef bitmap = new ColumnDef("col3", new 
TypeDef(ScalarType.createType(PrimitiveType.BITMAP)));
         cols.add(bitmap);
         colsName.add("col3");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index ae96752..21aa0d3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -363,7 +363,7 @@ public class BrokerLoadJobTest {
         RuntimeProfile jobProfile = new RuntimeProfile("test");
         LoadLoadingTask task = new LoadLoadingTask(database, 
olapTable,brokerDesc, fileGroups,
                 100, 100, false, 100, callback, "",
-                100, 1, 1, true, jobProfile);
+                100, 1, 1, true, jobProfile, false);
         try {
             UserIdentity userInfo = new UserIdentity("root", "localhost");
             userInfo.setIsAnalyzed();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
index ba5ca64..a072cf0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
@@ -102,7 +102,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(2L));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@@ -132,7 +132,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         try {
             sink.complete();
         } catch (UserException e) {
@@ -154,7 +154,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(unknownPartId));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@@ -184,7 +184,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false);
         try {
             sink.complete();
         } catch (UserException e) {
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 7320fa9..c1bc78a 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -141,6 +141,7 @@ struct TOlapTableSink {
     13: required Descriptors.TPaloNodesInfo nodes_info
     14: optional i64 load_channel_timeout_s // the timeout of load channels in 
second
     15: optional i32 send_batch_parallelism
+    16: optional bool load_to_single_tablet
 }
 
 struct TDataSink {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index a6db982..a56677b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -597,6 +597,7 @@ struct TStreamLoadPutRequest {
     34: optional string auth_code_uuid
     35: optional i32 send_batch_parallelism
     36: optional double max_filter_ratio
+    37: optional bool load_to_single_tablet
 }
 
 struct TStreamLoadPutResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to