This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new bd47d5a6816 [branch-2.1](auto-partition) Fix auto partition load 
failure in multi replica (#36586)
bd47d5a6816 is described below

commit bd47d5a68164e26a09247baa3d749b5c8865c715
Author: zclllyybb <zhaochan...@selectdb.com>
AuthorDate: Thu Jun 20 17:51:18 2024 +0800

    [branch-2.1](auto-partition) Fix auto partition load failure in multi 
replica (#36586)
    
    this pr
    1. picked #35630, which was reverted #36098 before.
    2. picked #36344 from master
    
    these two pr fixed existing bug about auto partition load.
    
    ---------
    
    Co-authored-by: Kaijie Chen <c...@apache.org>
---
 be/src/exec/tablet_info.cpp                        |  17 +--
 be/src/runtime/load_channel.cpp                    |  28 ++++-
 be/src/runtime/load_channel.h                      |  11 +-
 be/src/runtime/load_channel_mgr.cpp                |   8 --
 be/src/runtime/load_stream.cpp                     |   2 +-
 be/src/runtime/load_stream.h                       |   4 +
 be/src/runtime/tablets_channel.cpp                 |  55 +++++++--
 be/src/runtime/tablets_channel.h                   |  10 +-
 be/src/vec/sink/load_stream_map_pool.cpp           |  11 +-
 be/src/vec/sink/load_stream_map_pool.h             |   4 +-
 be/src/vec/sink/load_stream_stub.cpp               |  13 +-
 be/src/vec/sink/load_stream_stub.h                 |  10 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          | 136 +++++++++++++++------
 be/src/vec/sink/writer/vtablet_writer.h            |  67 ++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  60 ++++++---
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   2 +
 .../apache/doris/catalog/ListPartitionItem.java    |   2 +-
 .../org/apache/doris/catalog/PartitionKey.java     |   7 ++
 .../apache/doris/catalog/RangePartitionItem.java   |   6 +-
 .../apache/doris/datasource/InternalCatalog.java   |   4 +-
 .../org/apache/doris/planner/OlapTableSink.java    | 111 ++++++++++++++++-
 .../apache/doris/service/FrontendServiceImpl.java  |  14 +--
 gensrc/proto/internal_service.proto                |   3 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 .../sql/two_instance_correctness.out               |   4 +
 .../test_auto_range_partition.groovy               |   3 +-
 .../auto_partition/sql/multi_thread_load.groovy    |   2 +-
 .../sql/two_instance_correctness.groovy            |  45 +++++++
 28 files changed, 492 insertions(+), 148 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 62ff0b2fcce..e32e9c9efcf 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() {
     // for both auto/non-auto partition table.
     _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
 
-    // initial partitions
+    // initial partitions. if meet dummy partitions only for open BE nodes, 
not generate key of them for finding
     for (const auto& t_part : _t_param.partitions) {
         VOlapTablePartition* part = nullptr;
         RETURN_IF_ERROR(generate_partition_from(t_part, part));
         _partitions.emplace_back(part);
-        if (_is_in_partition) {
-            for (auto& in_key : part->in_keys) {
-                _partitions_map->emplace(std::tuple {in_key.first, 
in_key.second, false}, part);
+
+        if (!_t_param.partitions_is_fake) {
+            if (_is_in_partition) {
+                for (auto& in_key : part->in_keys) {
+                    _partitions_map->emplace(std::tuple {in_key.first, 
in_key.second, false}, part);
+                }
+            } else {
+                _partitions_map->emplace(
+                        std::tuple {part->end_key.first, part->end_key.second, 
false}, part);
             }
-        } else {
-            _partitions_map->emplace(std::tuple {part->end_key.first, 
part->end_key.second, false},
-                                     part);
         }
     }
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 146575feac9..3d8c8e1dbf3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -33,11 +33,11 @@ namespace doris {
 bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
 
 LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
-                         const std::string& sender_ip, int64_t backend_id, 
bool enable_profile)
+                         std::string sender_ip, int64_t backend_id, bool 
enable_profile)
         : _load_id(load_id),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
-          _sender_ip(sender_ip),
+          _sender_ip(std::move(sender_ip)),
           _backend_id(backend_id),
           _enable_profile(enable_profile) {
     std::shared_ptr<QueryContext> query_context =
@@ -96,6 +96,10 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         if (it != _tablets_channels.end()) {
             channel = it->second;
         } else {
+            // just for VLOG
+            if (_txn_id == 0) [[unlikely]] {
+                _txn_id = params.txn_id();
+            }
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
             // TODO(plat1ko): CloudTabletsChannel
@@ -161,6 +165,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     }
 
     // 3. handle eos
+    // if channel is incremental, maybe hang on close until all close request 
arrived.
     if (request.has_eos() && request.eos()) {
         st = _handle_eos(channel.get(), request, response);
         _report_profile(response);
@@ -182,6 +187,24 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* 
channel,
     auto index_id = request.index_id();
 
     RETURN_IF_ERROR(channel->close(this, request, response, &finished));
+
+    // for init node, we close waiting(hang on) all close request and let them 
return together.
+    if (request.has_hang_wait() && request.hang_wait()) {
+        DCHECK(!channel->is_incremental_channel());
+        VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by 
sender {}", _txn_id,
+                                  request.index_id(), request.sender_id());
+        int count = 0;
+        while (!channel->is_finished()) {
+            bthread_usleep(1000);
+            count++;
+        }
+        // now maybe finished or cancelled.
+        VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
+        if (count >= 1000 * _timeout_s) { // maybe 
config::streaming_load_rpc_max_alive_time_sec
+            return Status::InternalError("Tablets channel didn't wait all 
close");
+        }
+    }
+
     if (finished) {
         std::lock_guard<std::mutex> l(_lock);
         {
@@ -191,6 +214,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
                     std::make_pair(channel->total_received_rows(), 
channel->num_rows_filtered())));
             _tablets_channels.erase(index_id);
         }
+        LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << 
index_id;
         _finished_channel_ids.emplace(index_id);
     }
     return Status::OK();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4a437e51907..791e996574a 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,10 +17,8 @@
 
 #pragma once
 
-#include <algorithm>
 #include <atomic>
-#include <functional>
-#include <map>
+#include <cstdint>
 #include <memory>
 #include <mutex>
 #include <ostream>
@@ -28,15 +26,11 @@
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
-#include <vector>
 
 #include "common/status.h"
-#include "olap/memtable_memory_limiter.h"
-#include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
-#include "util/thrift_util.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -52,7 +46,7 @@ class BaseTabletsChannel;
 class LoadChannel {
 public:
     LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool 
is_high_priority,
-                const std::string& sender_ip, int64_t backend_id, bool 
enable_profile);
+                std::string sender_ip, int64_t backend_id, bool 
enable_profile);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -91,6 +85,7 @@ protected:
 
 private:
     UniqueId _load_id;
+    int64_t _txn_id = 0;
 
     SpinLock _profile_serialize_lock;
     std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 4b0cc32f9c9..d236645b1fe 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -24,25 +24,17 @@
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <ctime>
-#include <functional>
-#include <map>
 #include <memory>
 #include <ostream>
-#include <queue>
 #include <string>
-#include <tuple>
 #include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_channel.h"
-#include "runtime/memory/mem_tracker.h"
 #include "util/doris_metrics.h"
-#include "util/mem_info.h"
 #include "util/metrics.h"
-#include "util/perf_counters.h"
-#include "util/pretty_printer.h"
 #include "util/thread.h"
 
 namespace doris {
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 87898d95a46..8de15091ec5 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -364,7 +364,7 @@ LoadStream::~LoadStream() {
 Status LoadStream::init(const POpenLoadStreamRequest* request) {
     _txn_id = request->txn_id();
     _total_streams = request->total_streams();
-    DCHECK(_total_streams > 0) << "total streams should be greator than 0";
+    _is_incremental = (_total_streams == 0);
 
     _schema = std::make_shared<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request->schema()));
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index c61a2d163de..b2635698379 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -117,6 +117,9 @@ public:
     void add_source(int64_t src_id) {
         std::lock_guard lock_guard(_lock);
         _open_streams[src_id]++;
+        if (_is_incremental) {
+            _total_streams++;
+        }
     }
 
     Status close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
@@ -167,6 +170,7 @@ private:
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
     LoadStreamMgr* _load_stream_mgr = nullptr;
     QueryThreadContext _query_thread_context;
+    bool _is_incremental = false;
 };
 
 using LoadStreamPtr = std::unique_ptr<LoadStream>;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 526c979968d..adaced0b76e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -21,7 +21,8 @@
 #include <fmt/format.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
-#include <time.h>
+
+#include <ctime>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
@@ -132,17 +133,40 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
     if (_state == kOpened || _state == kFinished) {
         return Status::OK();
     }
-    LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << 
request.tablets().size()
-              << ", timeout(s): " << request.load_channel_timeout_s();
+    LOG(INFO) << fmt::format("open tablets channel of index {}, tablets num: 
{} timeout(s): {}",
+                             _index_id, request.tablets().size(), 
request.load_channel_timeout_s());
     _txn_id = request.txn_id();
     _index_id = request.index_id();
     _schema = std::make_shared<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request.schema()));
     _tuple_desc = _schema->tuple_desc();
 
-    _num_remaining_senders = request.num_senders();
-    _next_seqs.resize(_num_remaining_senders, 0);
-    _closed_senders.Reset(_num_remaining_senders);
+    int max_sender = request.num_senders();
+    /*
+     * a tablets channel in reciever is related to a bulk of VNodeChannel of 
sender. each instance one or none.
+     * there are two possibilities:
+     *  1. there's partitions originally broadcasted by FE. so all 
sender(instance) know it at start. and open() will be 
+     *     called directly, not by incremental_open(). and after _state 
changes to kOpened. _open_by_incremental will never 
+     *     be true. in this case, _num_remaining_senders will keep same with 
senders number. when all sender sent close rpc,
+     *     the tablets channel will close. and if for auto partition table, 
these channel's closing will hang on reciever and
+     *     return together to avoid close-then-incremental-open problem.
+     *  2. this tablets channel is opened by incremental_open of sender's sink 
node. so only this sender will know this partition
+     *     (this TabletsChannel) at that time. and we are not sure how many 
sender will know in the end. it depends on data
+     *     distribution. in this situation open() is called by 
incremental_open() at first time. so _open_by_incremental is true.
+     *     then _num_remaining_senders will not be set here. but inc every 
time when incremental_open() called. so it's dynamic
+     *     and also need same number of senders' close to close. but will not 
hang.
+     */
+    if (_open_by_incremental) {
+        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
+    } else {
+        _num_remaining_senders = max_sender;
+    }
+    LOG(INFO) << fmt::format(
+            "txn {}: TabletsChannel of index {} init senders {} with 
incremental {}", _txn_id,
+            _index_id, _num_remaining_senders, _open_by_incremental ? "on" : 
"off");
+    // just use max_sender no matter incremental or not cuz we dont know how 
many senders will open.
+    _next_seqs.resize(max_sender, 0);
+    _closed_senders.Reset(max_sender);
 
     RETURN_IF_ERROR(_open_all_writers(request));
 
@@ -152,10 +176,27 @@ Status BaseTabletsChannel::open(const 
PTabletWriterOpenRequest& request) {
 
 Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& 
params) {
     SCOPED_TIMER(_incremental_open_timer);
-    if (_state == kInitialized) { // haven't opened
+
+    // current node first opened by incremental open
+    if (_state == kInitialized) {
+        _open_by_incremental = true;
         RETURN_IF_ERROR(open(params));
     }
+
     std::lock_guard<std::mutex> l(_lock);
+
+    // one sender may incremental_open many times. but only close one time. so 
dont count duplicately.
+    if (_open_by_incremental) {
+        if (params.has_sender_id() && 
!_recieved_senders.contains(params.sender_id())) {
+            _recieved_senders.insert(params.sender_id());
+            _num_remaining_senders++;
+        } else if (!params.has_sender_id()) { // for compatible
+            _num_remaining_senders++;
+        }
+        VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to 
{}", _txn_id, _index_id,
+                                  _num_remaining_senders);
+    }
+
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
     for (const auto& index : _schema->indexes()) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 27db9387602..54438be7690 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -21,8 +21,6 @@
 
 #include <atomic>
 #include <cstdint>
-#include <functional>
-#include <map>
 #include <mutex>
 #include <ostream>
 #include <shared_mutex>
@@ -113,6 +111,11 @@ public:
 
     size_t num_rows_filtered() const { return _num_rows_filtered; }
 
+    // means this tablets in this BE is incremental opened partitions.
+    bool is_incremental_channel() const { return _open_by_incremental; }
+
+    bool is_finished() const { return _state == kFinished; }
+
 protected:
     Status _get_current_seq(int64_t& cur_seq, const 
PTabletWriterAddBlockRequest& request);
 
@@ -151,10 +154,11 @@ protected:
     int64_t _txn_id = -1;
     int64_t _index_id = -1;
     std::shared_ptr<OlapTableSchemaParam> _schema;
-
     TupleDescriptor* _tuple_desc = nullptr;
+    bool _open_by_incremental = false;
 
     // next sequence we expect
+    std::set<int32_t> _recieved_senders;
     int _num_remaining_senders = 0;
     std::vector<int64_t> _next_seqs;
     Bitmap _closed_senders;
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index fdcfe190dbf..7a3072ade6e 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
 
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool 
incremental) {
     std::lock_guard<std::mutex> lock(_mutex);
     std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
     if (streams != nullptr) {
@@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t 
dst_id) {
     streams = std::make_shared<Streams>();
     for (int i = 0; i < _num_streams; i++) {
         streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
_tablet_schema_for_index,
-                                                 
_enable_unique_mow_for_index));
+                                                 _enable_unique_mow_for_index, 
incremental));
     }
     _streams_for_node[dst_id] = streams;
     return streams;
@@ -101,10 +101,13 @@ bool LoadStreamMap::release() {
     return false;
 }
 
-Status LoadStreamMap::close_load() {
-    return for_each_st([this](int64_t dst_id, const Streams& streams) -> 
Status {
+Status LoadStreamMap::close_load(bool incremental) {
+    return for_each_st([this, incremental](int64_t dst_id, const Streams& 
streams) -> Status {
         const auto& tablets = _tablets_to_commit[dst_id];
         for (auto& stream : streams) {
+            if (stream->is_incremental() != incremental) {
+                continue;
+            }
             RETURN_IF_ERROR(stream->close_load(tablets));
         }
         return Status::OK();
diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index aad12dba2aa..d0f72ab7e00 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -78,7 +78,7 @@ public:
     LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
                   LoadStreamMapPool* pool);
 
-    std::shared_ptr<Streams> get_or_create(int64_t dst_id);
+    std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = 
false);
 
     std::shared_ptr<Streams> at(int64_t dst_id);
 
@@ -95,7 +95,7 @@ public:
 
     // send CLOSE_LOAD to all streams, return ERROR if any.
     // only call this method after release() returns true.
-    Status close_load();
+    Status close_load(bool incremental);
 
 private:
     const UniqueId _load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 92670c1c930..caebb381db6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
                                std::shared_ptr<IndexToTabletSchema> schema_map,
-                               std::shared_ptr<IndexToEnableMoW> mow_map)
+                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental)
         : _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(schema_map),
-          _enable_unique_mow_for_index(mow_map) {};
+          _enable_unique_mow_for_index(mow_map),
+          _is_incremental(incremental) {};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_init.load() && !_is_closed.load()) {
@@ -168,7 +169,13 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     request.set_src_id(_src_id);
     request.set_txn_id(txn_id);
     request.set_enable_profile(enable_profile);
-    request.set_total_streams(total_streams);
+    if (_is_incremental) {
+        request.set_total_streams(0);
+    } else if (total_streams > 0) {
+        request.set_total_streams(total_streams);
+    } else {
+        return Status::InternalError("total_streams should be greator than 0");
+    }
     request.set_idle_timeout_ms(idle_timeout_ms);
     schema.to_protobuf(request.mutable_schema());
     for (auto& tablet : tablets_for_schema) {
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 1f0d2e459d3..1bf0fac4e38 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -111,12 +111,12 @@ public:
     // construct new stub
     LoadStreamStub(PUniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map);
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false);
 
     LoadStreamStub(UniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map)
-            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) 
{};
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false)
+            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental) {};
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -195,6 +195,8 @@ public:
 
     int64_t dst_id() const { return _dst_id; }
 
+    bool is_incremental() const { return _is_incremental; }
+
     friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& 
stub);
 
     std::string to_string();
@@ -255,6 +257,8 @@ protected:
     bthread::Mutex _failed_tablets_mutex;
     std::vector<int64_t> _success_tablets;
     std::unordered_map<int64_t, Status> _failed_tablets;
+
+    bool _is_incremental = false;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 64fb092e736..3aa9c799216 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -35,11 +35,9 @@
 #include <sys/param.h>
 
 #include <algorithm>
-#include <exception>
 #include <initializer_list>
 #include <memory>
 #include <mutex>
-#include <ranges>
 #include <sstream>
 #include <string>
 #include <unordered_map>
@@ -50,23 +48,18 @@
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vdatetime_value.h"
-#include "vec/sink/volap_table_sink.h"
 #include "vec/sink/vrow_distribution.h"
 
 #ifdef DEBUG
 #include <unordered_set>
 #endif
 
-#include "bvar/bvar.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "exec/tablet_info.h"
-#include "runtime/client_cache.h"
-#include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
@@ -86,13 +79,7 @@
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
-#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
-#include "vec/core/types.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/sink/vtablet_block_convertor.h"
@@ -110,7 +97,8 @@ bvar::Adder<int64_t> g_sink_write_rows;
 bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_throughput_row",
                                                                    
&g_sink_write_rows, 60);
 
-Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets) {
+Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets,
+                          bool incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
     for (const auto& tablet : tablets) {
         // First find the location BEs of this tablet
@@ -128,8 +116,15 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
                 // NodeChannel is not added to the _parent->_pool.
                 // Because the deconstruction of NodeChannel may take a long 
time to wait rpc finish.
                 // but the ObjectPool will hold a spin lock to delete objects.
-                channel = std::make_shared<VNodeChannel>(_parent, this, 
replica_node_id);
+                channel =
+                        std::make_shared<VNodeChannel>(_parent, this, 
replica_node_id, incremental);
                 _node_channels.emplace(replica_node_id, channel);
+                // incremental opened new node. when close we have use 
two-stage close.
+                if (incremental) {
+                    _has_inc_node = true;
+                }
+                LOG(INFO) << "init new node for instance " << 
_parent->_sender_id
+                          << ", incremantal:" << incremental;
             } else {
                 channel = it->second;
             }
@@ -359,22 +354,23 @@ Status VNodeChannel::init(RuntimeState* state) {
     // add block closure
     // Has to using value to capture _task_exec_ctx because tablet writer may 
destroyed during callback.
     _send_block_callback = 
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
-    _send_block_callback->addFailedHandler([&, task_exec_ctx = 
_task_exec_ctx](bool is_last_rpc) {
-        auto ctx_lock = task_exec_ctx.lock();
-        if (ctx_lock == nullptr) {
-            return;
-        }
-        _add_block_failed_callback(is_last_rpc);
-    });
+    _send_block_callback->addFailedHandler(
+            [&, task_exec_ctx = _task_exec_ctx](const 
WriteBlockCallbackContext& ctx) {
+                std::shared_ptr<TaskExecutionContext> ctx_lock = 
task_exec_ctx.lock();
+                if (ctx_lock == nullptr) {
+                    return;
+                }
+                _add_block_failed_callback(ctx);
+            });
 
     _send_block_callback->addSuccessHandler(
             [&, task_exec_ctx = _task_exec_ctx](const 
PTabletWriterAddBlockResult& result,
-                                                bool is_last_rpc) {
-                auto ctx_lock = task_exec_ctx.lock();
+                                                const 
WriteBlockCallbackContext& ctx) {
+                std::shared_ptr<TaskExecutionContext> ctx_lock = 
task_exec_ctx.lock();
                 if (ctx_lock == nullptr) {
                     return;
                 }
-                _add_block_success_callback(result, is_last_rpc);
+                _add_block_success_callback(result, ctx);
             });
 
     _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
@@ -398,6 +394,7 @@ void VNodeChannel::_open_internal(bool is_incremental) {
     request->set_allocated_id(&_parent->_load_id);
     request->set_index_id(_index_channel->_index_id);
     request->set_txn_id(_parent->_txn_id);
+    request->set_sender_id(_parent->_sender_id);
     request->set_allocated_schema(_parent->_schema->to_protobuf());
 
     std::set<int64_t> deduper;
@@ -432,6 +429,8 @@ void VNodeChannel::_open_internal(bool is_incremental) {
     if (config::tablet_writer_ignore_eovercrowded) {
         open_callback->cntl_->ignore_eovercrowded();
     }
+    VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental: 
{}, senders: {}",
+                              _parent->_txn_id, _node_id, is_incremental, 
_parent->_num_senders);
     // the real transmission here. the corresponding BE's load mgr will open 
load channel for it.
     _stub->tablet_writer_open(open_closure->cntl_.get(), 
open_closure->request_.get(),
                               open_closure->response_.get(), 
open_closure.get());
@@ -677,6 +676,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
         }
 
         // eos request must be the last request-> it's a signal makeing 
callback function to set _add_batch_finished true.
+        // end_mark makes is_last_rpc true when rpc finished and call 
callbacks.
         _send_block_callback->end_mark();
         _send_finished = true;
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -726,7 +726,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
 }
 
 void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult& result,
-                                               bool is_last_rpc) {
+                                               const 
WriteBlockCallbackContext& ctx) {
     std::lock_guard<std::mutex> l(this->_closed_lock);
     if (this->_is_closed) {
         // if the node channel is closed, no need to call the following logic,
@@ -744,7 +744,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
         Status st = _index_channel->check_intolerable_failure();
         if (!st.ok()) {
             _cancel_with_msg(st.to_string());
-        } else if (is_last_rpc) {
+        } else if (ctx._is_last_rpc) {
             for (const auto& tablet : result.tablet_vec()) {
                 TTabletCommitInfo commit_info;
                 commit_info.tabletId = tablet.tablet_id();
@@ -802,7 +802,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
     }
 }
 
-void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) {
+void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& 
ctx) {
     std::lock_guard<std::mutex> l(this->_closed_lock);
     if (this->_is_closed) {
         // if the node channel is closed, no need to call `mark_as_failed`,
@@ -819,7 +819,7 @@ void VNodeChannel::_add_block_failed_callback(bool 
is_last_rpc) {
     Status st = _index_channel->check_intolerable_failure();
     if (!st.ok()) {
         _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
st.to_string()));
-    } else if (is_last_rpc) {
+    } else if (ctx._is_last_rpc) {
         // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
         // will be blocked.
         _add_batches_finished = true;
@@ -892,12 +892,14 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
         }
     }
 
-    // waiting for finished, it may take a long time, so we couldn't set a 
timeout
+    // Waiting for finished until _add_batches_finished changed by rpc's 
finished callback.
+    // it may take a long time, so we couldn't set a timeout
     // For pipeline engine, the close is called in async writer's process 
block method,
     // so that it will not block pipeline thread.
     while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
         bthread_usleep(1000);
     }
+    VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
     _close_time_ms = UnixMillis() - _close_time_ms;
 
     if (_cancelled || state->is_cancelled()) {
@@ -925,17 +927,18 @@ void VNodeChannel::_close_check() {
     CHECK(_cur_mutable_block == nullptr) << name();
 }
 
-void VNodeChannel::mark_close() {
+void VNodeChannel::mark_close(bool hang_wait) {
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
         return;
     }
 
     _cur_add_block_request->set_eos(true);
+    _cur_add_block_request->set_hang_wait(hang_wait);
     {
         std::lock_guard<std::mutex> l(_pending_batches_lock);
         if (!_cur_mutable_block) [[unlikely]] {
-            // add a dummy block
+            // never had a block arrived. add a dummy block
             _cur_mutable_block = vectorized::MutableBlock::create_unique();
         }
         auto tmp_add_block_request =
@@ -1168,7 +1171,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         return Status::InternalError("unknown destination tuple descriptor");
     }
 
-    if (_vec_output_expr_ctxs.size() > 0 &&
+    if (!_vec_output_expr_ctxs.empty() &&
         _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
         LOG(WARNING) << "output tuple slot num should be equal to num of 
output exprs, "
                      << "output_tuple_slot_num " << 
_output_tuple_desc->slots().size()
@@ -1279,7 +1282,7 @@ Status VTabletWriter::_incremental_open_node_channel(
         // update and reinit for existing channels.
         std::shared_ptr<IndexChannel> channel = 
_index_id_to_channel[index->index_id];
         DCHECK(channel != nullptr);
-        RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it
+        RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets 
into it
     }
 
     fmt::memory_buffer buf;
@@ -1374,14 +1377,71 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
 
     _try_close = true; // will stop periodic thread
     if (status.ok()) {
+        // BE id -> add_batch method counter
+        std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
+
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
         SCOPED_TIMER(_profile->total_time_counter());
-        {
-            for (const auto& index_channel : _channels) {
+        for (const auto& index_channel : _channels) {
+            // two-step mark close. first we send close_origin to recievers to 
close all originly exist TabletsChannel.
+            // when they all closed, we are sure all Writer of instances 
called _do_try_close. that means no new channel
+            // will be opened. the refcount of recievers will be monotonically 
decreasing. then we are safe to close all
+            // our channels.
+            if (index_channel->has_incremental_node_channel()) {
+                if (!status.ok()) {
+                    break;
+                }
+                VLOG_TRACE << _sender_id << " first stage close start " << 
_txn_id;
+                index_channel->for_init_node_channel(
+                        [&index_channel, &status, this](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            VLOG_DEBUG << index_channel->_parent->_sender_id 
<< "'s " << ch->host()
+                                       << "mark close1 for inits " << _txn_id;
+                            ch->mark_close(true);
+                            if (ch->is_cancelled()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                            }
+                        });
+                if (!status.ok()) {
+                    break;
+                }
+                index_channel->for_init_node_channel(
+                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            auto s = ch->close_wait(_state);
+                            VLOG_DEBUG << index_channel->_parent->_sender_id 
<< "'s " << ch->host()
+                                       << "close1 wait finished!";
+                            if (!s.ok()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, s.to_string(), index_channel, 
ch);
+                            }
+                        });
                 if (!status.ok()) {
                     break;
                 }
+                VLOG_DEBUG << _sender_id << " first stage finished. closeing 
inc nodes " << _txn_id;
+                index_channel->for_inc_node_channel(
+                        [&index_channel, &status, this](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // only first try close, all node channels will 
mark_close()
+                            VLOG_DEBUG << index_channel->_parent->_sender_id 
<< "'s " << ch->host()
+                                       << "mark close2 for inc " << _txn_id;
+                            ch->mark_close();
+                            if (ch->is_cancelled()) {
+                                status = 
cancel_channel_and_check_intolerable_failure(
+                                        status, ch->get_cancel_msg(), 
index_channel, ch);
+                            }
+                        });
+            } else { // not has_incremental_node_channel
+                VLOG_TRACE << _sender_id << " has no incremental channels " << 
_txn_id;
                 index_channel->for_each_node_channel(
                         [&index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
                             if (!status.ok() || ch->is_closed()) {
@@ -1394,8 +1454,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                                         status, ch->get_cancel_msg(), 
index_channel, ch);
                             }
                         });
-            } // end for index channels
-        }
+            }
+        } // end for index channels
     }
 
     if (!status.ok()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index bcc5228457a..603034cea6d 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -36,13 +36,11 @@
 #include <cstddef>
 #include <cstdint>
 #include <functional>
-#include <initializer_list>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <ostream>
 #include <queue>
-#include <set>
 #include <sstream>
 #include <string>
 #include <thread>
@@ -55,23 +53,17 @@
 #include "common/status.h"
 #include "exec/data_sink.h"
 #include "exec/tablet_info.h"
-#include "gutil/ref_counted.h"
-#include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/thread_context.h"
-#include "runtime/types.h"
-#include "util/countdown_latch.h"
 #include "util/ref_count_closure.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 #include "util/stopwatch.hpp"
 #include "vec/columns/column.h"
-#include "vec/common/allocator.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vfile_format_transformer.h"
 #include "vec/sink/vrow_distribution.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
@@ -114,6 +106,10 @@ struct AddBatchCounter {
     }
 };
 
+struct WriteBlockCallbackContext {
+    std::atomic<bool> _is_last_rpc {false};
+};
+
 // It's very error-prone to guarantee the handler capture vars' & this 
closure's destruct sequence.
 // So using create() to get the closure pointer is recommended. We can delete 
the closure ptr before the capture vars destruction.
 // Delete this point is safe, don't worry about RPC callback will run after 
WriteBlockCallback deleted.
@@ -127,8 +123,13 @@ public:
     WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
     ~WriteBlockCallback() override = default;
 
-    void addFailedHandler(const std::function<void(bool)>& fn) { 
failed_handler = fn; }
-    void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { 
success_handler = fn; }
+    void addFailedHandler(const std::function<void(const 
WriteBlockCallbackContext&)>& fn) {
+        failed_handler = fn;
+    }
+    void addSuccessHandler(
+            const std::function<void(const T&, const 
WriteBlockCallbackContext&)>& fn) {
+        success_handler = fn;
+    }
 
     void join() override {
         // We rely on in_flight to assure one rpc is running,
@@ -165,8 +166,8 @@ public:
     bool is_packet_in_flight() { return _packet_in_flight; }
 
     void end_mark() {
-        DCHECK(_is_last_rpc == false);
-        _is_last_rpc = true;
+        DCHECK(_ctx._is_last_rpc == false);
+        _ctx._is_last_rpc = true;
     }
 
     void call() override {
@@ -175,9 +176,9 @@ public:
             LOG(WARNING) << "failed to send brpc batch, error="
                          << 
berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
                          << ", error_text=" << 
::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
-            failed_handler(_is_last_rpc);
+            failed_handler(_ctx);
         } else {
-            success_handler(*(::doris::DummyBrpcCallback<T>::response_), 
_is_last_rpc);
+            success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
         }
         clear_in_flight();
     }
@@ -185,9 +186,9 @@ public:
 private:
     brpc::CallId cid;
     std::atomic<bool> _packet_in_flight {false};
-    std::atomic<bool> _is_last_rpc {false};
-    std::function<void(bool)> failed_handler;
-    std::function<void(const T&, bool)> success_handler;
+    WriteBlockCallbackContext _ctx;
+    std::function<void(const WriteBlockCallbackContext&)> failed_handler;
+    std::function<void(const T&, const WriteBlockCallbackContext&)> 
success_handler;
 };
 
 class IndexChannel;
@@ -258,7 +259,8 @@ public:
     // two ways to stop channel:
     // 1. mark_close()->close_wait() PS. close_wait() will block waiting for 
the last AddBatch rpc response.
     // 2. just cancel()
-    void mark_close();
+    // hang_wait = true will make reciever hang until all sender mark_closed.
+    void mark_close(bool hang_wait = false);
 
     bool is_closed() const { return _is_closed; }
     bool is_cancelled() const { return _cancelled; }
@@ -320,8 +322,9 @@ protected:
     void _close_check();
     void _cancel_with_msg(const std::string& msg);
 
-    void _add_block_success_callback(const PTabletWriterAddBlockResult& 
result, bool is_last_rpc);
-    void _add_block_failed_callback(bool is_last_rpc);
+    void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
+                                     const WriteBlockCallbackContext& ctx);
+    void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
 
     VTabletWriter* _parent = nullptr;
     IndexChannel* _index_channel = nullptr;
@@ -425,7 +428,8 @@ public:
     ~IndexChannel() = default;
 
     // allow to init multi times, for incremental open more tablets for one 
index(table)
-    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets);
+    Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& 
tablets,
+                bool incremental = false);
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
@@ -434,6 +438,26 @@ public:
         }
     }
 
+    void for_init_node_channel(
+            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
+        for (auto& it : _node_channels) {
+            if (!it.second->is_incremental()) {
+                func(it.second);
+            }
+        }
+    }
+
+    void for_inc_node_channel(
+            const std::function<void(const std::shared_ptr<VNodeChannel>&)>& 
func) {
+        for (auto& it : _node_channels) {
+            if (it.second->is_incremental()) {
+                func(it.second);
+            }
+        }
+    }
+
+    bool has_incremental_node_channel() const { return _has_inc_node; }
+
     void mark_as_failed(const VNodeChannel* node_channel, const std::string& 
err,
                         int64_t tablet_id = -1);
     Status check_intolerable_failure();
@@ -492,6 +516,7 @@ private:
     std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
     // from tablet_id to backend channel
     std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> 
_channels_by_tablet;
+    bool _has_inc_node = false;
 
     // lock to protect _failed_channels and _failed_channels_msgs
     mutable doris::SpinLock _fail_lock;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c1b43722c33..1b14a57d154 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -111,7 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams(
         }
     }
     for (int64_t dst_id : new_backends) {
-        auto streams = _load_stream_map->get_or_create(dst_id);
+        auto streams = _load_stream_map->get_or_create(dst_id, true);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
     }
     return Status::OK();
@@ -310,6 +310,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                     tablet.set_index_id(index.index_id);
                     tablet.set_tablet_id(tablet_id);
                     _tablets_for_node[node].emplace(tablet_id, tablet);
+                    constexpr int64_t DUMMY_TABLET_ID = 0;
+                    if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
+                        // ignore fake tablet for auto partition
+                        continue;
+                    }
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
@@ -548,32 +553,26 @@ Status VTabletWriterV2::close(Status exec_status) {
         LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
                   << ", load_id=" << print_id(_load_id);
 
-        // send CLOSE_LOAD on all streams if this is the last sink
+        // send CLOSE_LOAD on all non-incremental streams if this is the last 
sink
         if (is_last_sink) {
-            RETURN_IF_ERROR(_load_stream_map->close_load());
+            RETURN_IF_ERROR(_load_stream_map->close_load(false));
         }
 
-        // close_wait on all streams, even if this is not the last sink.
+        // close_wait on all non-incremental streams, even if this is not the 
last sink.
         // because some per-instance data structures are now shared among all 
sinks
         // due to sharing delta writers and load stream stubs.
-        {
-            SCOPED_TIMER(_close_load_timer);
-            RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t 
dst_id,
-                                                                 const 
Streams& streams) -> Status {
-                for (auto& stream : streams) {
-                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
-                    if (remain_ms <= 0) {
-                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
-                                     << print_id(_load_id);
-                        return Status::TimedOut("load timed out before close 
waiting");
-                    }
-                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
-                }
-                return Status::OK();
-            }));
+        RETURN_IF_ERROR(_close_wait(false));
+
+        // send CLOSE_LOAD on all incremental streams if this is the last sink.
+        // this must happen after all non-incremental streams are closed,
+        // so we can ensure all sinks are in close phase before closing 
incremental streams.
+        if (is_last_sink) {
+            RETURN_IF_ERROR(_load_stream_map->close_load(true));
         }
 
+        // close_wait on all incremental streams, even if this is not the last 
sink.
+        RETURN_IF_ERROR(_close_wait(true));
+
         // calculate and submit commit info
         if (is_last_sink) {
             DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
@@ -624,6 +623,27 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
+Status VTabletWriterV2::_close_wait(bool incremental) {
+    SCOPED_TIMER(_close_load_timer);
+    return _load_stream_map->for_each_st(
+            [this, incremental](int64_t dst_id, const Streams& streams) -> 
Status {
+                for (auto& stream : streams) {
+                    if (stream->is_incremental() != incremental) {
+                        continue;
+                    }
+                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
+                    if (remain_ms <= 0) {
+                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                     << print_id(_load_id);
+                        return Status::TimedOut("load timed out before close 
waiting");
+                    }
+                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+                }
+                return Status::OK();
+            });
+}
+
 void VTabletWriterV2::_calc_tablets_to_commit() {
     LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << 
", txn_id=" << _txn_id
               << ", sink_id=" << _sender_id;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index e3d31fb32b9..5a9890cdb49 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -147,6 +147,8 @@ private:
 
     void _calc_tablets_to_commit();
 
+    Status _close_wait(bool incremental);
+
     Status _cancel(Status status);
 
     std::shared_ptr<MemTracker> _mem_tracker;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index 1a4d188a0ca..dafdcdc49f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -35,7 +35,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class ListPartitionItem extends PartitionItem {
-    public static ListPartitionItem DUMMY_ITEM = new 
ListPartitionItem(Lists.newArrayList());
+    public static final ListPartitionItem DUMMY_ITEM = new 
ListPartitionItem(Lists.newArrayList());
 
     private final List<PartitionKey> partitionKeys;
     private boolean isDefaultPartition = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index b227afdc142..ff5fa91ee6c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -87,6 +87,13 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         return partitionKey;
     }
 
+    public static PartitionKey createMaxPartitionKey() {
+        PartitionKey partitionKey = new PartitionKey();
+        partitionKey.keys.add(MaxLiteral.MAX_VALUE);
+        // type not set
+        return partitionKey;
+    }
+
     public static PartitionKey createPartitionKey(List<PartitionValue> keys, 
List<Column> columns)
             throws AnalysisException {
         PartitionKey partitionKey = new PartitionKey();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 56214aaa0ea..bb7ddabbaa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -30,10 +30,12 @@ import java.util.Optional;
 
 public class RangePartitionItem extends PartitionItem {
     private Range<PartitionKey> partitionKeyRange;
-    public static final Range<PartitionKey> DUMMY_ITEM;
+    public static final Range<PartitionKey> DUMMY_RANGE;
+    public static final RangePartitionItem DUMMY_ITEM;
 
     static {
-        DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey());
+        DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey());
+        DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), 
PartitionKey.createMaxPartitionKey()));
     }
 
     public RangePartitionItem(Range<PartitionKey> range) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index f52bc11829f..df6b02b8324 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1698,12 +1698,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else if (partitionInfo.getType() == PartitionType.LIST) {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_ITEM, 
partitionInfo.getItem(partitionId), dataProperty,
+                            RangePartitionItem.DUMMY_RANGE, 
partitionInfo.getItem(partitionId), dataProperty,
                             partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 } else {
                     info = new PartitionPersistInfo(db.getId(), 
olapTable.getId(), partition,
-                            RangePartitionItem.DUMMY_ITEM, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
+                            RangePartitionItem.DUMMY_RANGE, 
ListPartitionItem.DUMMY_ITEM, dataProperty,
                             partitionInfo.getReplicaAllocation(partitionId), 
partitionInfo.getIsInMemory(partitionId),
                             isTempPartition, 
partitionInfo.getIsMutable(partitionId));
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index ada7c6b770b..e3195eec135 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -339,18 +339,84 @@ public class OlapTableSink extends DataSink {
         return distColumns;
     }
 
+    private PartitionItem createDummyPartitionItem(PartitionType partType) 
throws UserException {
+        if (partType == PartitionType.LIST) {
+            return ListPartitionItem.DUMMY_ITEM;
+        } else if (partType == PartitionType.RANGE) {
+            return RangePartitionItem.DUMMY_ITEM;
+        } else {
+            throw new UserException("unsupported partition for OlapTable, 
partition=" + partType);
+        }
+    }
+
+    private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable 
table, Analyzer analyzer,
+            TOlapTablePartitionParam partitionParam, PartitionInfo 
partitionInfo, PartitionType partType)
+            throws UserException {
+        partitionParam.setEnableAutomaticPartition(true);
+        // these partitions only use in locations. not find partition.
+        partitionParam.setPartitionsIsFake(true);
+
+        // set columns
+        for (Column partCol : partitionInfo.getPartitionColumns()) {
+            partitionParam.addToPartitionColumns(partCol.getName());
+        }
+
+        int partColNum = partitionInfo.getPartitionColumns().size();
+
+        TOlapTablePartition fakePartition = new TOlapTablePartition();
+        fakePartition.setId(0);
+        // set partition keys
+        setPartitionKeys(fakePartition, createDummyPartitionItem(partType), 
partColNum);
+
+        for (Long indexId : table.getIndexIdToMeta().keySet()) {
+            fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, 
Arrays.asList(0L)));
+            fakePartition.setNumBuckets(1);
+        }
+        fakePartition.setIsMutable(true);
+
+        DistributionInfo distInfo = table.getDefaultDistributionInfo();
+        partitionParam.setDistributedColumns(getDistColumns(distInfo));
+        partitionParam.addToPartitions(fakePartition);
+
+        ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
+        if (exprSource != null && analyzer != null) {
+            Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), 
analyzer.getContext());
+            tupleDescriptor.setTable(table);
+            funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
+            // we must clone the exprs. otherwise analyze will influence the 
origin exprs.
+            ArrayList<Expr> exprs = new ArrayList<Expr>();
+            for (Expr e : exprSource) {
+                exprs.add(e.clone());
+            }
+            for (Expr e : exprs) {
+                e.reset();
+                e.analyze(funcAnalyzer);
+            }
+            
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
+        }
+
+        return partitionParam;
+    }
+
     public TOlapTablePartitionParam createPartition(long dbId, OlapTable 
table, Analyzer analyzer)
             throws UserException {
         TOlapTablePartitionParam partitionParam = new 
TOlapTablePartitionParam();
+        PartitionInfo partitionInfo = table.getPartitionInfo();
+        boolean enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
+        PartitionType partType = table.getPartitionInfo().getType();
         partitionParam.setDbId(dbId);
         partitionParam.setTableId(table.getId());
         partitionParam.setVersion(0);
+        partitionParam.setPartitionType(partType.toThrift());
+
+        // create shadow partition for empty auto partition table. only use in 
this load.
+        if (enableAutomaticPartition && partitionIds.isEmpty()) {
+            return createDummyPartition(dbId, table, analyzer, partitionParam, 
partitionInfo, partType);
+        }
 
-        PartitionType partType = table.getPartitionInfo().getType();
         switch (partType) {
             case LIST:
             case RANGE: {
-                PartitionInfo partitionInfo = table.getPartitionInfo();
                 for (Column partCol : partitionInfo.getPartitionColumns()) {
                     partitionParam.addToPartitionColumns(partCol.getName());
                 }
@@ -395,7 +461,6 @@ public class OlapTableSink extends DataSink {
                         }
                     }
                 }
-                boolean enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
                 // for auto create partition by function expr, there is no any 
partition firstly,
                 // But this is required in thrift struct.
                 if (enableAutomaticPartition && partitionIds.isEmpty()) {
@@ -464,7 +529,6 @@ public class OlapTableSink extends DataSink {
                 throw new UserException("unsupported partition for OlapTable, 
partition=" + partType);
             }
         }
-        partitionParam.setPartitionType(partType.toThrift());
         return partitionParam;
     }
 
@@ -505,7 +569,46 @@ public class OlapTableSink extends DataSink {
         }
     }
 
+    public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) 
throws UserException {
+        TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+        TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
+
+        final long fakeTabletId = 0;
+        SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
+        List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
+        if (aliveBe.isEmpty()) {
+            throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no 
available BE in cluster");
+        }
+        for (int i = 0; i < table.getIndexNumber(); i++) {
+            // only one fake tablet here
+            if (singleReplicaLoad) {
+                Long[] nodes = aliveBe.toArray(new Long[0]);
+                List<Long> slaveBe = aliveBe;
+
+                Random random = new SecureRandom();
+                int masterNode = random.nextInt(nodes.length);
+                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+                        Arrays.asList(nodes[masterNode])));
+
+                slaveBe.remove(masterNode);
+                slaveLocationParam.addToTablets(new 
TTabletLocation(fakeTabletId,
+                        slaveBe));
+            } else {
+                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+                        Arrays.asList(aliveBe.get(0)))); // just one fake 
location is enough
+
+                LOG.info("created dummy location tablet_id={}, be_id={}", 
fakeTabletId, aliveBe.get(0));
+            }
+        }
+
+        return Arrays.asList(locationParam, slaveLocationParam);
+    }
+
     public List<TOlapTableLocationParam> createLocation(OlapTable table) 
throws UserException {
+        if (table.getPartitionInfo().enableAutomaticPartition() && 
partitionIds.isEmpty()) {
+            return createDummyLocation(table);
+        }
+
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b0abcc67141..af86660be21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3511,7 +3511,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (!Env.getCurrentEnv().isMaster()) {
             errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
             errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
-            LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
+            LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
             return result;
         }
 
@@ -3546,10 +3546,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         List<String> allReqPartNames; // all request partitions
         try {
             taskLock.lock();
-            // we dont lock the table. other thread in this txn will be 
controled by
-            // taskLock.
-            // if we have already replaced. dont do it again, but acquire the 
recorded new
-            // partition directly.
+            // we dont lock the table. other thread in this txn will be 
controled by taskLock.
+            // if we have already replaced. dont do it again, but acquire the 
recorded new partition directly.
             // if not by this txn, just let it fail naturally is ok.
             List<Long> replacedPartIds = 
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
             // here if replacedPartIds still have null. this will throw 
exception.
@@ -3559,8 +3557,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     .filter(i -> partitionIds.get(i) == 
replacedPartIds.get(i)) // equal means not replaced
                     .mapToObj(partitionIds::get)
                     .collect(Collectors.toList());
-            // from here we ONLY deal the pending partitions. not include the 
dealed(by
-            // others).
+            // from here we ONLY deal the pending partitions. not include the 
dealed(by others).
             if (!pendingPartitionIds.isEmpty()) {
                 // below two must have same order inner.
                 List<String> pendingPartitionNames = 
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3571,8 +3568,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 overwriteManager.registerTaskInGroup(taskGroupId, taskId);
                 InsertOverwriteUtil.addTempPartitions(olapTable, 
pendingPartitionNames, tempPartitionNames);
                 InsertOverwriteUtil.replacePartition(olapTable, 
pendingPartitionNames, tempPartitionNames);
-                // now temp partitions are bumped up and use new names. we get 
their ids and
-                // record them.
+                // now temp partitions are bumped up and use new names. we get 
their ids and record them.
                 List<Long> newPartitionIds = new ArrayList<Long>();
                 for (String newPartName : pendingPartitionNames) {
                     
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 12b1b6b1eda..0a975b81991 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -98,6 +98,7 @@ message PTabletWriterOpenRequest {
     optional bool is_incremental = 15 [default = false];
     optional int64 txn_expiration = 16; // Absolute time
     optional bool write_file_cache = 17;
+    optional int32 sender_id = 19;
 };
 
 message PTabletWriterOpenResult {
@@ -152,6 +153,8 @@ message PTabletWriterAddBlockRequest {
     optional bool write_single_replica = 12 [default = false];
     map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
     optional bool is_single_tablet_block = 14 [default = false];
+    // for auto-partition first stage close, we should hang.
+    optional bool hang_wait = 15 [default = false];
 };
 
 message PSlaveTabletNodes {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index ef7a8451684..5da7b4df7de 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -211,6 +211,7 @@ struct TOlapTablePartitionParam {
     // insert overwrite partition(*)
     11: optional bool enable_auto_detect_overwrite
     12: optional i64 overwrite_group_id
+    13: optional bool partitions_is_fake = false
 }
 
 struct TOlapTableIndex {
diff --git 
a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
 
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
new file mode 100644
index 00000000000..4ee136aef2b
--- /dev/null
+++ 
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+2
+
diff --git 
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
 
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
index 508f086f865..f52dc2945f0 100644
--- 
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
+++ 
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
@@ -140,8 +140,7 @@ suite("test_auto_range_partition") {
     logger.info("${result2}")
     assertEquals(result2.size(), 2)
 
-     // partition expr extraction
-
+    // insert into select have multi sender in load
     sql " drop table if exists isit "
     sql " drop table if exists isit_src "
     sql """
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
index 4f9b7a365b4..8d43d90ff15 100644
--- 
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -19,7 +19,7 @@ import groovy.io.FileType
 import java.nio.file.Files
 import java.nio.file.Paths
 
-suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use 
resource fully
+suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use 
resource fully```
     // get doris-db from s3
     def dirPath = context.file.parent
     def fatherPath = context.file.parentFile.parentFile.getPath()
diff --git 
a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
 
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
new file mode 100644
index 00000000000..c9f2f04aab3
--- /dev/null
+++ 
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("two_instance_correctness") {
+
+    // finish time of instances have diff
+    sql "DROP TABLE IF EXISTS two_bkt;"
+    sql """
+        create table two_bkt(
+            k0 date not null
+        )
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 2
+        properties("replication_num" = "1");
+    """
+
+    sql """ insert into two_bkt values ("2012-12-11"); """
+    sql """ insert into two_bkt select "2020-12-12" from numbers("number" = 
"20000"); """
+
+    sql " DROP TABLE IF EXISTS two_bkt_dest; "
+    sql """
+        create table two_bkt_dest(
+            k0 date not null
+        )
+        AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) ()
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 10
+        properties("replication_num" = "1");
+    """
+    sql " insert into two_bkt_dest select * from two_bkt; "
+
+    qt_sql " select count(distinct k0) from two_bkt_dest; "
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to