This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b2f90a1ce4f527490f603ff652a94e6a8a4c674d
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Fri Jun 28 21:12:21 2024 +0800

    [Fix](autoinc) try fix concurrent load problem with auto inc column (#36421)
    
    1. increase the value of `AutoIncrementGenerator.BATCH_ID_INTERVAL` to
    reduce the number of writes to BDBJE. (the default value of
    `config::auto_inc_prefetch_size_ratio` is 10 and the default value of
    `AutoIncIDBuffer::_batch_size` is 4064, so mostly the request length is
    40960)
    2. only allow master fe to offer `getAutoIncrementRange` service
    3. write editlog before update `batchEndId` in memory in
    `getAutoIncrementRange `
    4. refactor `AutoIncIDBuffer`
---
 be/src/vec/sink/autoinc_buffer.cpp                 | 150 +++++++++++++--------
 be/src/vec/sink/autoinc_buffer.h                   |  35 +++--
 .../doris/catalog/AutoIncrementGenerator.java      |   5 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  10 ++
 gensrc/thrift/FrontendService.thrift               |   1 +
 .../unique/test_unique_auto_inc_concurrent.out     |  10 ++
 .../unique/test_unique_auto_inc_concurrent.groovy  |  59 ++++++++
 7 files changed, 203 insertions(+), 67 deletions(-)

diff --git a/be/src/vec/sink/autoinc_buffer.cpp 
b/be/src/vec/sink/autoinc_buffer.cpp
index c7c096ec6e8..f83dbcb55b8 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -19,14 +19,15 @@
 
 #include <gen_cpp/HeartbeatService_types.h>
 
-#include <string>
+#include <chrono>
+#include <mutex>
 
+#include "common/logging.h"
 #include "common/status.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "util/runtime_profile.h"
 #include "util/thrift_rpc_helper.h"
-#include "vec/sink/vtablet_block_convertor.h"
 
 namespace doris::vectorized {
 
@@ -42,54 +43,11 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t 
batch_size) {
     }
 }
 
-void AutoIncIDBuffer::_wait_for_prefetching() {
-    if (_is_fetching) {
-        _rpc_token->wait();
-    }
-}
-
-Status AutoIncIDBuffer::sync_request_ids(size_t length,
-                                         std::vector<std::pair<int64_t, 
size_t>>* result) {
-    std::unique_lock<std::mutex> lock(_mutex);
-    RETURN_IF_ERROR(_prefetch_ids(_prefetch_size()));
-    if (_front_buffer.second > 0) {
-        auto min_length = std::min(_front_buffer.second, length);
-        length -= min_length;
-        result->emplace_back(_front_buffer.first, min_length);
-        _front_buffer.first += min_length;
-        _front_buffer.second -= min_length;
-    }
-    if (length > 0) {
-        _wait_for_prefetching();
-        if (!_rpc_status.ok()) {
-            return _rpc_status;
-        }
-
-        {
-            std::lock_guard<std::mutex> lock(_backend_buffer_latch);
-            std::swap(_front_buffer, _backend_buffer);
-        }
-
-        DCHECK_LE(length, _front_buffer.second);
-        if (length > _front_buffer.second) {
-            return Status::RpcError("auto inc sync result length > front 
buffer. " +
-                                    std::to_string(length) + " vs " +
-                                    std::to_string(_front_buffer.second));
-        }
-        result->emplace_back(_front_buffer.first, length);
-        _front_buffer.first += length;
-        _front_buffer.second -= length;
-    }
-    return Status::OK();
-}
-
-Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
-    if (_front_buffer.second > _low_water_level_mark() || _is_fetching) {
-        return Status::OK();
-    }
+Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
+    constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
+    _rpc_status = Status::OK();
     TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
-    _is_fetching = true;
-    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
+    for (uint32_t retry_times = 0; retry_times < 
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
         TAutoIncrementRangeRequest request;
         TAutoIncrementRangeResult result;
         request.__set_db_id(_db_id);
@@ -97,7 +55,7 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
         request.__set_column_id(_column_id);
         request.__set_length(length);
 
-        int64_t get_auto_inc_range_rpc_ns;
+        int64_t get_auto_inc_range_rpc_ns = 0;
         {
             SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
             _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -109,15 +67,95 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
         LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" 
<< result.length
                   << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " 
ms]";
 
-        if (!_rpc_status.ok() || result.length <= 0) {
-            LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter 
rpc failure."
-                         << "errmsg=" << _rpc_status.to_string();
-            return;
+        if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
+            LOG_WARNING(
+                    "Failed to fetch auto-incremnt range, request to 
non-master FE, discard all "
+                    "auto_increment ranges in _buffers. retry_time={}",
+                    retry_times);
+            master_addr = result.master_address;
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            continue;
         }
 
+        if (!_rpc_status.ok()) {
+            LOG(WARNING)
+                    << "Failed to fetch auto-incremnt range, encounter rpc 
failure. retry_time="
+                    << retry_times << ", errmsg=" << _rpc_status.to_string();
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            continue;
+        }
+        if (result.length != length) [[unlikely]] {
+            auto msg = fmt::format(
+                    "Failed to fetch auto-incremnt range, request length={}, 
but get "
+                    "result.length={}, retry_time={}",
+                    length, result.length, retry_times);
+            LOG(WARNING) << msg;
+            _rpc_status = Status::RpcError<true>(msg);
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            continue;
+        }
+
+        return result.start;
+    }
+    CHECK(!_rpc_status.ok());
+    return _rpc_status;
+}
+
+void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
+        size_t& request_length, std::vector<std::pair<int64_t, size_t>>* 
result) {
+    std::lock_guard<std::mutex> lock {_latch};
+    while (request_length > 0 && !_buffers.empty()) {
+        auto& autoinc_range = _buffers.front();
+        CHECK_GT(autoinc_range.length, 0);
+        auto min_length = std::min(request_length, autoinc_range.length);
+        result->emplace_back(autoinc_range.start, min_length);
+        autoinc_range.consume(min_length);
+        _current_volume -= min_length;
+        request_length -= min_length;
+        if (autoinc_range.empty()) {
+            _buffers.pop_front();
+        }
+    }
+}
+
+Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
+                                         std::vector<std::pair<int64_t, 
size_t>>* result) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    while (request_length > 0) {
+        _get_autoinc_ranges_from_buffers(request_length, result);
+        if (request_length == 0) {
+            break;
+        }
+        if (!_is_fetching) {
+            RETURN_IF_ERROR(
+                    _launch_async_fetch_task(std::max<size_t>(request_length, 
_prefetch_size())));
+        }
+        _rpc_token->wait();
+        CHECK(!_is_fetching);
+        if (!_rpc_status.ok()) {
+            return _rpc_status;
+        }
+    }
+    CHECK_EQ(request_length, 0);
+    if (!_is_fetching && _current_volume < _low_water_level_mark()) {
+        RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
+    }
+    return Status::OK();
+}
+
+Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
+    _is_fetching = true;
+    RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
+        auto&& res = _fetch_ids_from_fe(length);
+        if (!res.has_value()) [[unlikely]] {
+            _is_fetching = false;
+            return;
+        }
+        int64_t start = res.value();
         {
-            std::lock_guard<std::mutex> lock(_backend_buffer_latch);
-            _backend_buffer = {result.start, result.length};
+            std::lock_guard<std::mutex> lock {_latch};
+            _buffers.emplace_back(start, length);
+            _current_volume += length;
         }
         _is_fetching = false;
     }));
diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h
index 3ec009b0960..032ac18981f 100644
--- a/be/src/vec/sink/autoinc_buffer.h
+++ b/be/src/vec/sink/autoinc_buffer.h
@@ -61,17 +61,35 @@ public:
     // all public functions are thread safe
     AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
     void set_batch_size_at_least(size_t batch_size);
-    Status sync_request_ids(size_t length, std::vector<std::pair<int64_t, 
size_t>>* result);
+    Status sync_request_ids(size_t request_length, 
std::vector<std::pair<int64_t, size_t>>* result);
+
+    struct AutoIncRange {
+        int64_t start;
+        size_t length;
+
+        bool empty() const { return length == 0; }
+
+        void consume(size_t l) {
+            start += l;
+            length -= l;
+        }
+    };
 
 private:
-    Status _prefetch_ids(size_t length);
     [[nodiscard]] size_t _prefetch_size() const {
         return _batch_size * config::auto_inc_prefetch_size_ratio;
     }
+
     [[nodiscard]] size_t _low_water_level_mark() const {
         return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
     };
-    void _wait_for_prefetching();
+
+    void _get_autoinc_ranges_from_buffers(size_t& request_length,
+                                          std::vector<std::pair<int64_t, 
size_t>>* result);
+
+    Status _launch_async_fetch_task(size_t length);
+
+    Result<int64_t> _fetch_ids_from_fe(size_t length);
 
     std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
 
@@ -81,12 +99,14 @@ private:
 
     std::unique_ptr<ThreadPoolToken> _rpc_token;
     Status _rpc_status {Status::OK()};
+
     std::atomic<bool> _is_fetching {false};
 
-    std::pair<int64_t, size_t> _front_buffer {0, 0};
-    std::pair<int64_t, size_t> _backend_buffer {0, 0};
-    std::mutex _backend_buffer_latch; // for _backend_buffer
     std::mutex _mutex;
+
+    mutable std::mutex _latch;
+    std::list<AutoIncRange> _buffers;
+    size_t _current_volume {0};
 };
 
 class GlobalAutoIncBuffers {
@@ -115,8 +135,7 @@ public:
         auto key = std::make_tuple(db_id, table_id, column_id);
         auto it = _buffers.find(key);
         if (it == _buffers.end()) {
-            _buffers.emplace(std::make_pair(
-                    key, AutoIncIDBuffer::create_shared(db_id, table_id, 
column_id)));
+            _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, 
table_id, column_id));
         }
         return _buffers[{db_id, table_id, column_id}];
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
index be110360850..e4c8cf5de01 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
@@ -40,7 +40,7 @@ public class AutoIncrementGenerator implements Writable, 
GsonPostProcessable {
 
     public static final long NEXT_ID_INIT_VALUE = 1;
     // _MIN_BATCH_SIZE = 4064 in load task
-    private static final long BATCH_ID_INTERVAL = 50000;
+    private static final long BATCH_ID_INTERVAL = 500000;
 
     @SerializedName(value = "dbId")
     private Long dbId;
@@ -48,7 +48,6 @@ public class AutoIncrementGenerator implements Writable, 
GsonPostProcessable {
     private Long tableId;
     @SerializedName(value = "columnId")
     private Long columnId;
-    @SerializedName(value = "nextId")
     private long nextId;
     @SerializedName(value = "batchEndId")
     private long batchEndId;
@@ -86,10 +85,10 @@ public class AutoIncrementGenerator implements Writable, 
GsonPostProcessable {
         long endId = startId + length;
         nextId = startId + length;
         if (endId > batchEndId) {
-            batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
             Preconditions.checkState(editLog != null);
             AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, 
tableId, columnId, batchEndId);
             editLog.logUpdateAutoIncrementId(info);
+            batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
         }
         LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
         return Pair.of(startId, length);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1b106576720..75a1987eb23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2626,6 +2626,16 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
+
+        if (!Env.getCurrentEnv().isMaster()) {
+            status.setStatusCode(TStatusCode.NOT_MASTER);
+            status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+            result.setMasterAddress(getMasterAddress());
+            LOG.error("failed to getAutoIncrementRange:{}, request:{}, 
backend:{}",
+                    NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
+            return result;
+        }
+
         try {
             Env env = Env.getCurrentEnv();
             Database db = 
env.getInternalCatalog().getDbOrMetaException(request.getDbId());
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 9a57abc9b61..d25f5c0ac2b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1323,6 +1323,7 @@ struct TAutoIncrementRangeResult {
     1: optional Status.TStatus status
     2: optional i64 start
     3: optional i64 length
+    4: optional Types.TNetworkAddress master_address
 }
 
 struct TCreatePartitionRequest {
diff --git 
a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out 
b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out
new file mode 100644
index 00000000000..03819c9a717
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1500000        1500000
+
+-- !sql --
+3000000        3000000
+
+-- !sql --
+4500000        4500000
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
 
b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
new file mode 100644
index 00000000000..bf6d584b2af
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_concurrent") {
+    
+    def table1 = "test_unique_table_auto_inc_concurrent"
+    sql "drop table if exists ${table1}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table1}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT,
+          `value` int(11) NOT NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    
+    def run_test = {thread_num, rows, iters -> 
+        def threads = []
+        (1..thread_num).each { id1 -> 
+            threads.add(Thread.start {
+                (1..iters).each { id2 -> 
+                    sql """insert into ${table1}(value) select number from 
numbers("number" = "${rows}");"""
+                }
+            })
+        }
+
+        threads.each { thread -> thread.join() }
+
+        qt_sql "select count(id), count(distinct id) from ${table1};"
+    }
+
+    run_test(15, 10000, 10)
+    run_test(15, 100000, 1)
+    run_test(5, 30000, 10)
+
+    sql "drop table if exists ${table1};"
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to