This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c60a49b05e7 [fix](multi-table-load) fix single stream multi table load cannot finish (#33816) c60a49b05e7 is described below commit c60a49b05e7d5f2846d3874d0dc7f6ed2c63b682 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri Apr 19 13:02:45 2024 +0800 [fix](multi-table-load) fix single stream multi table load cannot finish (#33816) --- be/src/io/fs/multi_table_pipe.cpp | 22 +++++++++++++++++----- be/src/io/fs/stream_load_pipe.cpp | 9 +++++++++ be/src/io/fs/stream_load_pipe.h | 4 ++++ 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index dbc0c3e0228..28df937479b 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -114,17 +114,25 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size } else { pipe = iter->second; } - RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), - "append failed in unplanned kafka pipe"); + // It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity, + // otherwise the following situation may occur: + // the pipe is full but still cannot trigger the request and exec plan condition, + // causing one stream multi table load can not finish ++_unplanned_row_cnt; + auto pipe_current_capacity = pipe->current_capacity(); + auto pipe_max_capacity = pipe->max_capacity(); if (_unplanned_row_cnt >= _row_threshold || - _unplanned_pipes.size() >= _wait_tables_threshold) { + _unplanned_pipes.size() >= _wait_tables_threshold || + pipe_current_capacity + size > pipe_max_capacity) { LOG(INFO) << fmt::format( "unplanned row cnt={} reach row_threshold={} or " - "wait_plan_table_threshold={}, " + "wait_plan_table_threshold={}, or the sum of " + "pipe_current_capacity {} " + "and size {} is greater than pipe_max_capacity {}, " "plan them", - _unplanned_row_cnt, _row_threshold, _wait_tables_threshold) + _unplanned_row_cnt, _row_threshold, _wait_tables_threshold, + pipe_current_capacity, size, pipe_max_capacity) << ", ctx: " << _ctx->brief(); Status st = request_and_exec_plans(); _unplanned_row_cnt = 0; @@ -132,7 +140,11 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size return st; } } + + RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size), + "append failed in unplanned kafka pipe"); } + return Status::OK(); } diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index cd5ee5a8a09..ecce306bdf1 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -255,5 +255,14 @@ TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fr return pipe_id; } +size_t StreamLoadPipe::current_capacity() { + std::unique_lock<std::mutex> l(_lock); + if (_use_proto) { + return _proto_buffered_bytes; + } else { + return _buffered_bytes; + } +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index e3042a932c4..afbe9ebf98d 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -81,6 +81,10 @@ public: // used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id); + size_t max_capacity() const { return _max_buffered_bytes; } + + size_t current_capacity(); + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org