This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef4fba649cb [Fix](group commit) Fix multiple cluster group commit BE
select strategy (#38644)
ef4fba649cb is described below
commit ef4fba649cb563efe64874addcbac11204925cfc
Author: abmdocrt <[email protected]>
AuthorDate: Sun Aug 4 18:33:55 2024 +0800
[Fix](group commit) Fix multiple cluster group commit BE select strategy
(#38644)
---
.../org/apache/doris/load/GroupCommitManager.java | 64 ++++++++++++++--------
1 file changed, 41 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 20f7b9ed9be..1009c4257b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -58,10 +58,10 @@ public class GroupCommitManager {
private Set<Long> blockedTableIds = new HashSet<>();
- // Table id to BE id map. Only for group commit.
- private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
- // BE id to pressure map. Only for group commit.
- private Map<Long, SlidingWindowCounter> tablePressureMap = new
ConcurrentHashMap<>();
+ // Encoded <Cluster and Table id> to BE id map. Only for group commit.
+ private final Map<String, Long> tableToBeMap = new ConcurrentHashMap<>();
+ // Table id to pressure map. Only for group commit.
+ private final Map<Long, SlidingWindowCounter> tableToPressureMap = new
ConcurrentHashMap<>();
public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
@@ -243,13 +243,13 @@ public class GroupCommitManager {
private long selectBackendForCloudGroupCommitInternal(long tableId, String
cluster)
throws DdlException, LoadException {
- LOG.debug("cloud group commit select be info, tableToBeMap {},
tablePressureMap {}", tableToBeMap.toString(),
- tablePressureMap.toString());
+ LOG.debug("cloud group commit select be info, tableToBeMap {},
tablePressureMap {}",
+ tableToBeMap.toString(), tableToPressureMap.toString());
if (Strings.isNullOrEmpty(cluster)) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
}
- Long cachedBackendId = getCachedBackend(tableId);
+ Long cachedBackendId = getCachedBackend(cluster, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
@@ -261,7 +261,7 @@ public class GroupCommitManager {
throw new LoadException("No alive backend");
}
// If the cached backend is not active or decommissioned, select a
random new backend.
- Long randomBackendId = getRandomBackend(tableId, backends);
+ Long randomBackendId = getRandomBackend(cluster, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
@@ -274,8 +274,8 @@ public class GroupCommitManager {
private long selectBackendForLocalGroupCommitInternal(long tableId) throws
LoadException {
LOG.debug("group commit select be info, tableToBeMap {},
tablePressureMap {}", tableToBeMap.toString(),
- tablePressureMap.toString());
- Long cachedBackendId = getCachedBackend(tableId);
+ tableToPressureMap.toString());
+ Long cachedBackendId = getCachedBackend(null, tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}
@@ -293,7 +293,7 @@ public class GroupCommitManager {
}
// If the cached backend is not active or decommissioned, select a
random new backend.
- Long randomBackendId = getRandomBackend(tableId, backends);
+ Long randomBackendId = getRandomBackend(null, tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
@@ -305,31 +305,41 @@ public class GroupCommitManager {
}
@Nullable
- private Long getCachedBackend(long tableId) {
+ private Long getCachedBackend(String cluster, long tableId) {
OlapTable table = (OlapTable)
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
- if (tableToBeMap.containsKey(tableId)) {
- if (tablePressureMap.get(tableId).get() <
table.getGroupCommitDataBytes()) {
- Backend backend =
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+ if (tableToBeMap.containsKey(encode(cluster, tableId))) {
+ if (tableToPressureMap.get(tableId).get() <
table.getGroupCommitDataBytes()) {
+ // There are multiple threads getting cached backends for the
same table.
+ // Maybe one thread removes the tableId from the tableToBeMap.
+ // Another thread gets the same tableId but can not find this
tableId.
+ // So another thread needs to get the random backend.
+ Long backendId = tableToBeMap.get(encode(cluster, tableId));
+ Backend backend;
+ if (backendId != null) {
+ backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ } else {
+ return null;
+ }
if (backend.isActive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
- tableToBeMap.remove(tableId);
+ tableToBeMap.remove(encode(cluster, tableId));
}
} else {
- tableToBeMap.remove(tableId);
+ tableToBeMap.remove(encode(cluster, tableId));
}
}
return null;
}
@Nullable
- private Long getRandomBackend(long tableId, List<Backend> backends) {
+ private Long getRandomBackend(String cluster, long tableId, List<Backend>
backends) {
OlapTable table = (OlapTable)
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isActive() && !backend.isDecommissioned()) {
- tableToBeMap.put(tableId, backend.getId());
- tablePressureMap.put(tableId,
+ tableToBeMap.put(encode(cluster, tableId), backend.getId());
+ tableToPressureMap.put(tableId,
new
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
@@ -337,6 +347,14 @@ public class GroupCommitManager {
return null;
}
+ private String encode(String cluster, long tableId) {
+ if (cluster == null) {
+ return String.valueOf(tableId);
+ } else {
+ return cluster + tableId;
+ }
+ }
+
public void updateLoadData(long tableId, long receiveData) {
if (tableId == -1) {
LOG.warn("invalid table id: " + tableId);
@@ -359,10 +377,10 @@ public class GroupCommitManager {
}
public void updateLoadDataInternal(long tableId, long receiveData) {
- if (tablePressureMap.containsKey(tableId)) {
- tablePressureMap.get(tableId).add(receiveData);
+ if (tableToPressureMap.containsKey(tableId)) {
+ tableToPressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {},
tablePressureMap {}", tableId, receiveData,
- tablePressureMap.toString());
+ tableToPressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]