This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new df464f84b15 branch-2.1: [fix](load) fix the error msg of task
submission failure for memory back pressure #51078 (#51131)
df464f84b15 is described below
commit df464f84b1593c307dd5ae9d27aa84dae6bc28d6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 22 20:37:39 2025 +0800
branch-2.1: [fix](load) fix the error msg of task submission failure for
memory back pressure #51078 (#51131)
Cherry-picked from #51078
Co-authored-by: hui lai <[email protected]>
---
.../routine_load/routine_load_task_executor.cpp | 29 +++-
.../routine_load/routine_load_task_executor.h | 2 +-
.../test_routine_load_error_info.groovy | 174 ++++++++-------------
3 files changed, 90 insertions(+), 115 deletions(-)
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index e12ef7ff6df..fd559e9b6ad 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -205,13 +205,14 @@ Status
RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(
Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
std::unique_lock<std::mutex> l(_lock);
+ // check if already submitted
if (_task_map.find(task.id) != _task_map.end()) {
- // already submitted
LOG(INFO) << "routine load task " << UniqueId(task.id) << " has
already been submitted";
return Status::OK();
}
- if (_task_map.size() >= config::max_routine_load_thread_pool_size ||
_reach_memory_limit()) {
+ // check task num limit
+ if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
LOG(INFO) << "too many tasks in thread pool. reject task: " <<
UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool->get_queue_size()
@@ -220,6 +221,18 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
BackendOptions::get_localhost());
}
+ // check memory limit
+ std::string reason;
+ DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
+ _reach_memory_limit(reason);
+ return Status::MemoryLimitExceeded("fake reason: " + reason);
+ });
+ if (_reach_memory_limit(reason)) {
+ LOG(INFO) << "reach memory limit. reject task: " << UniqueId(task.id)
+ << ", job id: " << task.job_id << ", reason: " << reason;
+ return Status::MemoryLimitExceeded(reason);
+ }
+
// create the context
std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
ctx->load_type = TLoadType::ROUTINE_LOAD;
@@ -306,14 +319,18 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
}
}
-bool RoutineLoadTaskExecutor::_reach_memory_limit() {
+bool RoutineLoadTaskExecutor::_reach_memory_limit(std::string& reason) {
+ DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
+ reason = "reach memory limit";
+ return true;
+ });
bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
auto current_load_mem_value =
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value();
if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
- LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
- << " current_load_mem_value: " << current_load_mem_value
- << " _load_mem_limit: " << _load_mem_limit;
+ reason = "is_exceed_soft_mem_limit: " +
std::to_string(is_exceed_soft_mem_limit) +
+ " current_load_mem_value: " +
std::to_string(current_load_mem_value) +
+ " _load_mem_limit: " + std::to_string(_load_mem_limit);
return true;
}
return false;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index b1196f7824a..eae3f9c4073 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -88,7 +88,7 @@ private:
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
std::shared_ptr<StreamLoadContext> ctx);
- bool _reach_memory_limit();
+ bool _reach_memory_limit(std::string& reason);
private:
ExecEnv* _exec_env = nullptr;
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index f05b8af79ee..2f018a93729 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -48,13 +48,11 @@ suite("test_routine_load_error_info","nonConcurrent") {
}
}
- // case 1: task failed
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- // create table
- def jobName = "test_error_info"
- def tableName = "test_routine_error_info"
- try {
- sql """
+ def createTable = {tableName ->
+ sql """
+ DROP TABLE IF EXISTS ${tableName}
+ """
+ sql """
CREATE TABLE IF NOT EXISTS ${tableName}
(
k00 INT NOT NULL,
@@ -120,13 +118,12 @@ suite("test_routine_load_error_info","nonConcurrent") {
"bloom_filter_columns"="k05",
"replication_num" = "1"
);
- """
- sql "sync"
+ """
+ }
- // create job
-
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
- sql """
- CREATE ROUTINE LOAD ${jobName} on ${tableName}
+ def createJob = {jobName, tableName, kafkaTopic ->
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
COLUMNS TERMINATED BY "|"
PROPERTIES
@@ -138,10 +135,22 @@ suite("test_routine_load_error_info","nonConcurrent") {
FROM KAFKA
(
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
- "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "kafka_topic" = "${kafkaTopic}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
- """
+ """
+ }
+
+ // case 1: task failed
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // create table
+ def jobName = "test_error_info"
+ def tableName = "test_routine_error_info"
+ try {
+ createTable(tableName)
+ sql "sync"
+
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+ createJob(jobName, tableName, kafkaCsvTpoics[0])
sql "sync"
// check error info
@@ -158,10 +167,8 @@ suite("test_routine_load_error_info","nonConcurrent") {
if (count > 60) {
assertEquals(1, 2)
break;
- } else {
- sleep(1000)
- continue;
}
+ sleep(1000)
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
@@ -175,93 +182,45 @@ suite("test_routine_load_error_info","nonConcurrent") {
def jobName = "test_error_info"
def tableName = "test_routine_error_info"
try {
- sql """
- CREATE TABLE IF NOT EXISTS ${tableName}
- (
- k00 INT NOT NULL,
- k01 DATE NOT NULL,
- k02 BOOLEAN NULL,
- k03 TINYINT NULL,
- k04 SMALLINT NULL,
- k05 INT NULL,
- k06 BIGINT NULL,
- k07 LARGEINT NULL,
- k08 FLOAT NULL,
- k09 DOUBLE NULL,
- k10 DECIMAL(9,1) NULL,
- k11 DECIMALV3(9,1) NULL,
- k12 DATETIME NULL,
- k13 DATEV2 NULL,
- k14 DATETIMEV2 NULL,
- k15 CHAR NULL,
- k16 VARCHAR NULL,
- k17 STRING NULL,
- k18 JSON NULL,
- kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
- kd02 TINYINT NOT NULL DEFAULT "1",
- kd03 SMALLINT NOT NULL DEFAULT "2",
- kd04 INT NOT NULL DEFAULT "3",
- kd05 BIGINT NOT NULL DEFAULT "4",
- kd06 LARGEINT NOT NULL DEFAULT "5",
- kd07 FLOAT NOT NULL DEFAULT "6.0",
- kd08 DOUBLE NOT NULL DEFAULT "7.0",
- kd09 DECIMAL NOT NULL DEFAULT "888888888",
- kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
- kd11 DATE NOT NULL DEFAULT "2023-08-24",
- kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
- kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
- kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
- kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
- kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
- kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
- kd18 JSON NULL,
-
- INDEX idx_inverted_k104 (`k05`) USING INVERTED,
- INDEX idx_inverted_k110 (`k11`) USING INVERTED,
- INDEX idx_inverted_k113 (`k13`) USING INVERTED,
- INDEX idx_inverted_k114 (`k14`) USING INVERTED,
- INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
- INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
- INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
- INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
-
- INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
- INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
-
- )
- DUPLICATE KEY(k00)
- PARTITION BY RANGE(k01)
- (
- PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
- PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
- PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
- )
- DISTRIBUTED BY HASH(k00) BUCKETS 32
- PROPERTIES (
- "bloom_filter_columns"="k05",
- "replication_num" = "1"
- );
- """
+ createTable(tableName)
+ sql "sync"
+ createJob(jobName, tableName, "invalid_job")
sql "sync"
- // create job
- sql """
- CREATE ROUTINE LOAD ${jobName} on ${tableName}
-
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
- COLUMNS TERMINATED BY "|"
- PROPERTIES
- (
- "max_batch_interval" = "5",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200"
- )
- FROM KAFKA
- (
- "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
- "kafka_topic" = "invalid_job",
- "property.kafka_default_offsets" = "OFFSET_BEGINNING"
- );
- """
+ // check error info
+ def count = 0
+ while (true) {
+ def res = sql "show routine load for ${jobName}"
+ log.info("show routine load: ${res[0].toString()}".toString())
+ log.info("reason: ${res[0][17].toString()}".toString())
+ if (res[0][17].toString() != "") {
+ assertTrue(res[0][17].toString().contains("may be Kafka
properties set in job is error or no partition in this topic that should check
Kafka"))
+ break;
+ }
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ break;
+ }
+ sleep(1000)
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+
+ // case 3: memory limit
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "test_memory_limit_error_info"
+ def tableName = "test_routine_memory_limit_error_info"
+
+ try {
+ createTable(tableName)
+ sql "sync"
+
GetDebugPoint().enableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
+ createJob(jobName, tableName, kafkaCsvTpoics[0])
+ sql "sync"
// check error info
def count = 0
@@ -269,20 +228,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
def res = sql "show routine load for ${jobName}"
log.info("show routine load: ${res[0].toString()}".toString())
log.info("other msg: ${res[0][19].toString()}".toString())
- if (res[0][19].toString() != "" && res[0][8].toString() ==
"NEED_SCHEDULE") {
- assertTrue(res[0][19].toString().contains("may be Kafka
properties set in job is error or no partition in this topic that should check
Kafka"))
+ if (res[0][19].toString() != "") {
+ assertTrue(res[0][19].toString().contains("reach memory
limit"))
break;
}
count++
if (count > 60) {
assertEquals(1, 2)
break;
- } else {
- sleep(1000)
- continue;
}
+ sleep(1000)
}
} finally {
+
GetDebugPoint().disableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
sql "stop routine load for ${jobName}"
sql "DROP TABLE IF EXISTS ${tableName}"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]