This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b2f90a1ce4f527490f603ff652a94e6a8a4c674d Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Fri Jun 28 21:12:21 2024 +0800 [Fix](autoinc) try fix concurrent load problem with auto inc column (#36421) 1. increase the value of `AutoIncrementGenerator.BATCH_ID_INTERVAL` to reduce the number of writes to BDBJE. (the default value of `config::auto_inc_prefetch_size_ratio` is 10 and the default value of `AutoIncIDBuffer::_batch_size` is 4064, so mostly the request length is 40960) 2. only allow master fe to offer `getAutoIncrementRange` service 3. write editlog before update `batchEndId` in memory in `getAutoIncrementRange ` 4. refactor `AutoIncIDBuffer` --- be/src/vec/sink/autoinc_buffer.cpp | 150 +++++++++++++-------- be/src/vec/sink/autoinc_buffer.h | 35 +++-- .../doris/catalog/AutoIncrementGenerator.java | 5 +- .../apache/doris/service/FrontendServiceImpl.java | 10 ++ gensrc/thrift/FrontendService.thrift | 1 + .../unique/test_unique_auto_inc_concurrent.out | 10 ++ .../unique/test_unique_auto_inc_concurrent.groovy | 59 ++++++++ 7 files changed, 203 insertions(+), 67 deletions(-) diff --git a/be/src/vec/sink/autoinc_buffer.cpp b/be/src/vec/sink/autoinc_buffer.cpp index c7c096ec6e8..f83dbcb55b8 100644 --- a/be/src/vec/sink/autoinc_buffer.cpp +++ b/be/src/vec/sink/autoinc_buffer.cpp @@ -19,14 +19,15 @@ #include <gen_cpp/HeartbeatService_types.h> -#include <string> +#include <chrono> +#include <mutex> +#include "common/logging.h" #include "common/status.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "util/runtime_profile.h" #include "util/thrift_rpc_helper.h" -#include "vec/sink/vtablet_block_convertor.h" namespace doris::vectorized { @@ -42,54 +43,11 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) { } } -void AutoIncIDBuffer::_wait_for_prefetching() { - if (_is_fetching) { - _rpc_token->wait(); - } -} - -Status AutoIncIDBuffer::sync_request_ids(size_t length, - std::vector<std::pair<int64_t, size_t>>* result) { - std::unique_lock<std::mutex> lock(_mutex); - RETURN_IF_ERROR(_prefetch_ids(_prefetch_size())); - if (_front_buffer.second > 0) { - auto min_length = std::min(_front_buffer.second, length); - length -= min_length; - result->emplace_back(_front_buffer.first, min_length); - _front_buffer.first += min_length; - _front_buffer.second -= min_length; - } - if (length > 0) { - _wait_for_prefetching(); - if (!_rpc_status.ok()) { - return _rpc_status; - } - - { - std::lock_guard<std::mutex> lock(_backend_buffer_latch); - std::swap(_front_buffer, _backend_buffer); - } - - DCHECK_LE(length, _front_buffer.second); - if (length > _front_buffer.second) { - return Status::RpcError("auto inc sync result length > front buffer. " + - std::to_string(length) + " vs " + - std::to_string(_front_buffer.second)); - } - result->emplace_back(_front_buffer.first, length); - _front_buffer.first += length; - _front_buffer.second -= length; - } - return Status::OK(); -} - -Status AutoIncIDBuffer::_prefetch_ids(size_t length) { - if (_front_buffer.second > _low_water_level_mark() || _is_fetching) { - return Status::OK(); - } +Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) { + constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3; + _rpc_status = Status::OK(); TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; - _is_fetching = true; - RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { + for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) { TAutoIncrementRangeRequest request; TAutoIncrementRangeResult result; request.__set_db_id(_db_id); @@ -97,7 +55,7 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) { request.__set_column_id(_column_id); request.__set_length(length); - int64_t get_auto_inc_range_rpc_ns; + int64_t get_auto_inc_range_rpc_ns = 0; { SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns); _rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>( @@ -109,15 +67,95 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) { LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]"; - if (!_rpc_status.ok() || result.length <= 0) { - LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter rpc failure." - << "errmsg=" << _rpc_status.to_string(); - return; + if (_rpc_status.is<ErrorCode::NOT_MASTER>()) { + LOG_WARNING( + "Failed to fetch auto-incremnt range, request to non-master FE, discard all " + "auto_increment ranges in _buffers. retry_time={}", + retry_times); + master_addr = result.master_address; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; } + if (!_rpc_status.ok()) { + LOG(WARNING) + << "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time=" + << retry_times << ", errmsg=" << _rpc_status.to_string(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + if (result.length != length) [[unlikely]] { + auto msg = fmt::format( + "Failed to fetch auto-incremnt range, request length={}, but get " + "result.length={}, retry_time={}", + length, result.length, retry_times); + LOG(WARNING) << msg; + _rpc_status = Status::RpcError<true>(msg); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + return result.start; + } + CHECK(!_rpc_status.ok()); + return _rpc_status; +} + +void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers( + size_t& request_length, std::vector<std::pair<int64_t, size_t>>* result) { + std::lock_guard<std::mutex> lock {_latch}; + while (request_length > 0 && !_buffers.empty()) { + auto& autoinc_range = _buffers.front(); + CHECK_GT(autoinc_range.length, 0); + auto min_length = std::min(request_length, autoinc_range.length); + result->emplace_back(autoinc_range.start, min_length); + autoinc_range.consume(min_length); + _current_volume -= min_length; + request_length -= min_length; + if (autoinc_range.empty()) { + _buffers.pop_front(); + } + } +} + +Status AutoIncIDBuffer::sync_request_ids(size_t request_length, + std::vector<std::pair<int64_t, size_t>>* result) { + std::lock_guard<std::mutex> lock(_mutex); + while (request_length > 0) { + _get_autoinc_ranges_from_buffers(request_length, result); + if (request_length == 0) { + break; + } + if (!_is_fetching) { + RETURN_IF_ERROR( + _launch_async_fetch_task(std::max<size_t>(request_length, _prefetch_size()))); + } + _rpc_token->wait(); + CHECK(!_is_fetching); + if (!_rpc_status.ok()) { + return _rpc_status; + } + } + CHECK_EQ(request_length, 0); + if (!_is_fetching && _current_volume < _low_water_level_mark()) { + RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size())); + } + return Status::OK(); +} + +Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) { + _is_fetching = true; + RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() { + auto&& res = _fetch_ids_from_fe(length); + if (!res.has_value()) [[unlikely]] { + _is_fetching = false; + return; + } + int64_t start = res.value(); { - std::lock_guard<std::mutex> lock(_backend_buffer_latch); - _backend_buffer = {result.start, result.length}; + std::lock_guard<std::mutex> lock {_latch}; + _buffers.emplace_back(start, length); + _current_volume += length; } _is_fetching = false; })); diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index 3ec009b0960..032ac18981f 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -61,17 +61,35 @@ public: // all public functions are thread safe AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id); void set_batch_size_at_least(size_t batch_size); - Status sync_request_ids(size_t length, std::vector<std::pair<int64_t, size_t>>* result); + Status sync_request_ids(size_t request_length, std::vector<std::pair<int64_t, size_t>>* result); + + struct AutoIncRange { + int64_t start; + size_t length; + + bool empty() const { return length == 0; } + + void consume(size_t l) { + start += l; + length -= l; + } + }; private: - Status _prefetch_ids(size_t length); [[nodiscard]] size_t _prefetch_size() const { return _batch_size * config::auto_inc_prefetch_size_ratio; } + [[nodiscard]] size_t _low_water_level_mark() const { return _batch_size * config::auto_inc_low_water_level_mark_size_ratio; }; - void _wait_for_prefetching(); + + void _get_autoinc_ranges_from_buffers(size_t& request_length, + std::vector<std::pair<int64_t, size_t>>* result); + + Status _launch_async_fetch_task(size_t length); + + Result<int64_t> _fetch_ids_from_fe(size_t length); std::atomic<size_t> _batch_size {MIN_BATCH_SIZE}; @@ -81,12 +99,14 @@ private: std::unique_ptr<ThreadPoolToken> _rpc_token; Status _rpc_status {Status::OK()}; + std::atomic<bool> _is_fetching {false}; - std::pair<int64_t, size_t> _front_buffer {0, 0}; - std::pair<int64_t, size_t> _backend_buffer {0, 0}; - std::mutex _backend_buffer_latch; // for _backend_buffer std::mutex _mutex; + + mutable std::mutex _latch; + std::list<AutoIncRange> _buffers; + size_t _current_volume {0}; }; class GlobalAutoIncBuffers { @@ -115,8 +135,7 @@ public: auto key = std::make_tuple(db_id, table_id, column_id); auto it = _buffers.find(key); if (it == _buffers.end()) { - _buffers.emplace(std::make_pair( - key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id))); + _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id)); } return _buffers[{db_id, table_id, column_id}]; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java index be110360850..e4c8cf5de01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java @@ -40,7 +40,7 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable { public static final long NEXT_ID_INIT_VALUE = 1; // _MIN_BATCH_SIZE = 4064 in load task - private static final long BATCH_ID_INTERVAL = 50000; + private static final long BATCH_ID_INTERVAL = 500000; @SerializedName(value = "dbId") private Long dbId; @@ -48,7 +48,6 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable { private Long tableId; @SerializedName(value = "columnId") private Long columnId; - @SerializedName(value = "nextId") private long nextId; @SerializedName(value = "batchEndId") private long batchEndId; @@ -86,10 +85,10 @@ public class AutoIncrementGenerator implements Writable, GsonPostProcessable { long endId = startId + length; nextId = startId + length; if (endId > batchEndId) { - batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL; Preconditions.checkState(editLog != null); AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId); editLog.logUpdateAutoIncrementId(info); + batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL; } LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length); return Pair.of(startId, length); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1b106576720..75a1987eb23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2626,6 +2626,16 @@ public class FrontendServiceImpl implements FrontendService.Iface { TAutoIncrementRangeResult result = new TAutoIncrementRangeResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + result.setMasterAddress(getMasterAddress()); + LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}", + NOT_MASTER_ERR_MSG, request, getClientAddrAsString()); + return result; + } + try { Env env = Env.getCurrentEnv(); Database db = env.getInternalCatalog().getDbOrMetaException(request.getDbId()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 9a57abc9b61..d25f5c0ac2b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1323,6 +1323,7 @@ struct TAutoIncrementRangeResult { 1: optional Status.TStatus status 2: optional i64 start 3: optional i64 length + 4: optional Types.TNetworkAddress master_address } struct TCreatePartitionRequest { diff --git a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out new file mode 100644 index 00000000000..03819c9a717 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1500000 1500000 + +-- !sql -- +3000000 3000000 + +-- !sql -- +4500000 4500000 + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy new file mode 100644 index 00000000000..bf6d584b2af --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_concurrent") { + + def table1 = "test_unique_table_auto_inc_concurrent" + sql "drop table if exists ${table1}" + sql """ + CREATE TABLE IF NOT EXISTS `${table1}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `value` int(11) NOT NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + def run_test = {thread_num, rows, iters -> + def threads = [] + (1..thread_num).each { id1 -> + threads.add(Thread.start { + (1..iters).each { id2 -> + sql """insert into ${table1}(value) select number from numbers("number" = "${rows}");""" + } + }) + } + + threads.each { thread -> thread.join() } + + qt_sql "select count(id), count(distinct id) from ${table1};" + } + + run_test(15, 10000, 10) + run_test(15, 100000, 1) + run_test(5, 30000, 10) + + sql "drop table if exists ${table1};" +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org