This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 898fe415b9d branch-3.0: [improve](load) introduce black list of
backend when load job fetch meta to avoid jitter #50587 (#51043)
898fe415b9d is described below
commit 898fe415b9d09adebf3bcf8a6f3f8511e0a2e7a4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 22 14:18:34 2025 +0800
branch-3.0: [improve](load) introduce black list of backend when load job
fetch meta to avoid jitter #50587 (#51043)
Cherry-picked from #50587
Co-authored-by: hui lai <[email protected]>
---
be/src/runtime/routine_load/data_consumer.cpp | 4 +
.../main/java/org/apache/doris/common/Config.java | 7 ++
.../apache/doris/datasource/kafka/KafkaUtil.java | 40 ++++++-
.../doris/load/routineload/RoutineLoadManager.java | 25 +++++
.../load_p0/routine_load/data/test_black_list.csv | 1 +
.../load_p0/routine_load/test_black_list.groovy | 124 +++++++++++++++++++++
6 files changed, 199 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 7566d06914a..00f9c726b41 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -418,6 +418,10 @@ Status KafkaDataConsumer::get_offsets_for_times(const
std::vector<PIntegerPair>&
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>*
offsets,
int timeout) {
+
DBUG_EXECUTE_IF("KafkaDataConsumer.get_latest_offsets_for_partitions.timeout", {
+ // sleep 60s
+ std::this_thread::sleep_for(std::chrono::seconds(60));
+ });
MonotonicStopWatch watch;
watch.start();
for (int32_t partition_id : partition_ids) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index e609da9900c..e5c88a0e485 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1270,6 +1270,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_get_kafka_meta_timeout_second = 60;
+
+ /**
+ * the expire time of routine load blacklist.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int routine_load_blacklist_expire_time_second = 300;
+
/**
* The max number of files store in SmallFileMgr
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index da683cc2b37..3e78ba0d4a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -35,8 +35,10 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -227,12 +229,29 @@ public class KafkaUtil {
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
+ Set<Long> failedBeIds = new HashSet<>();
+ TStatusCode code = null;
+
try {
while (retryTimes < 3) {
List<Long> backendIds = new ArrayList<>();
for (Long beId :
Env.getCurrentSystemInfo().getAllBackendIds(true)) {
Backend backend =
Env.getCurrentSystemInfo().getBackend(beId);
- if (backend != null && backend.isLoadAvailable() &&
!backend.isDecommissioned()) {
+ if (backend != null && backend.isLoadAvailable()
+ && !backend.isDecommissioned()
+ && !failedBeIds.contains(beId)
+ &&
!Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
+ backendIds.add(beId);
+ }
+ }
+ // If there are no available backends, utilize the blacklist.
+ // Special scenarios include:
+ // 1. A specific job that connects to Kafka may time out for
topic config or network error,
+ // leaving only one backend operational.
+ // 2. If that sole backend is decommissioned, the
aliveBackends list becomes empty.
+ // Hence, in such cases, it's essential to rely on the
blacklist to obtain meta information.
+ if (backendIds.isEmpty()) {
+ for (Long beId :
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
backendIds.add(beId);
}
}
@@ -243,19 +262,22 @@ public class KafkaUtil {
Collections.shuffle(backendIds);
Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+ long beId = be.getId();
try {
future =
BackendServiceProxy.getInstance().getInfo(address, request);
result =
future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + "
err " + e.getMessage());
+ failedBeIds.add(beId);
retryTimes++;
continue;
}
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " +
result.getStatus().getErrorMsgsList());
+ failedBeIds.add(beId);
retryTimes++;
} else {
return result;
@@ -265,6 +287,20 @@ public class KafkaUtil {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info");
} finally {
+ // Ensure that not all BE added to the blacklist.
+ // For single request:
+ // Only when the final success is achieved, the failed BE will
be added to the blacklist,
+ // ensuring that there are always BE nodes that are not on the
blacklist.
+ // For multiple requests:
+ // If there is only one BE left without being blacklisted
after multiple jitters,
+ // even if this BE fails, it will not be blacklisted.
+ if (code != null && code == TStatusCode.OK &&
!failedBeIds.isEmpty()) {
+ for (Long beId : failedBeIds) {
+
Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId);
+ LOG.info("add beId {} to blacklist, blacklist: {}", beId,
+
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist());
+ }
+ }
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime
- startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index bfe42ad7695..23b36f11a4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -91,6 +91,9 @@ public class RoutineLoadManager implements Writable {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ // Map<beId, timestamp when added to blacklist>
+ private Map<Long, Long> blacklist = new ConcurrentHashMap<>();
+
private void readLock() {
lock.readLock().lock();
}
@@ -110,6 +113,10 @@ public class RoutineLoadManager implements Writable {
public RoutineLoadManager() {
}
+ public Map<Long, Long> getBlacklist() {
+ return blacklist;
+ }
+
public List<RoutineLoadJob> getAllRoutineLoadJobs() {
return new ArrayList<>(idToRoutineLoadJob.values());
}
@@ -916,4 +923,22 @@ public class RoutineLoadManager implements Writable {
}
}
}
+
+ public void addToBlacklist(long beId) {
+ blacklist.put(beId, System.currentTimeMillis());
+ }
+
+ public boolean isInBlacklist(long beId) {
+ Long timestamp = blacklist.get(beId);
+ if (timestamp == null) {
+ return false;
+ }
+
+ if (System.currentTimeMillis() - timestamp >
Config.routine_load_blacklist_expire_time_second * 1000) {
+ blacklist.remove(beId);
+ LOG.info("remove beId {} from blacklist, blacklist: {}", beId,
blacklist);
+ return false;
+ }
+ return true;
+ }
}
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_black_list.csv
b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_black_list.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
new file mode 100644
index 00000000000..04779f10362
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_black_list","nonConcurrent,p0") {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // 1. send data
+ def kafkaCsvTpoics = [
+ "test_black_list",
+ ]
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+
+ // 2. create table and routine load job
+ def tableName = "test_black_list"
+ def job = "test_black_list_job"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def inject =
"KafkaDataConsumer.get_latest_offsets_for_partitions.timeout"
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(inject)
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ log.info("res: ${res}")
+ def state = sql "show routine load for ${job}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ log.info("other msg: ${state[0][19].toString()}".toString())
+ if (state[0][17].toString().contains("Failed to get info") ||
state[0][19].toString().contains("Failed to get info")) {
+ break
+ }
+ if (count >= 90) {
+ log.error("routine load test fail")
+ assertEquals(1, 2)
+ break
+ }
+ sleep(1000)
+ count++
+ }
+
+ count = 0
+ GetDebugPoint().disableDebugPointForAllBEs(inject)
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${job}"
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ log.info("progress: ${res[0][15].toString()}".toString())
+ log.info("lag: ${res[0][16].toString()}".toString())
+ res = sql "select count(*) from ${tableName}"
+ if (res[0][0] > 0) {
+ break;
+ }
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(inject)
+ sql "stop routine load for ${job}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]