This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new be9288352a9 [fix][cloud] Prevent fe CloudReplica.getBackendIdImpl consuming too much CPU when high-frequency stream loads (#48564) be9288352a9 is described below commit be9288352a968432162cdd415b8644ce7f866a72 Author: deardeng <deng...@selectdb.com> AuthorDate: Thu Mar 13 20:37:10 2025 +0800 [fix][cloud] Prevent fe CloudReplica.getBackendIdImpl consuming too much CPU when high-frequency stream loads (#48564) --- .../java/org/apache/doris/backup/RestoreJob.java | 5 +-- .../apache/doris/catalog/ColocateTableIndex.java | 17 ++++++++- .../org/apache/doris/catalog/MetadataViewer.java | 21 ++++++----- .../org/apache/doris/clone/DiskRebalancer.java | 8 ++--- .../apache/doris/clone/PartitionRebalancer.java | 5 +-- .../org/apache/doris/clone/TabletSchedCtx.java | 42 ++++++++++++---------- .../org/apache/doris/clone/TabletScheduler.java | 18 +++++----- .../apache/doris/cloud/catalog/CloudReplica.java | 4 +-- .../apache/doris/common/proc/ReplicasProcNode.java | 5 +-- .../apache/doris/common/proc/TabletsProcDir.java | 7 ++-- .../main/java/org/apache/doris/load/DeleteJob.java | 2 +- .../org/apache/doris/planner/OlapScanNode.java | 7 ++-- .../doris/transaction/DatabaseTransactionMgr.java | 5 +-- 13 files changed, 89 insertions(+), 57 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 2e5ea7977be..19f01378ed8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1254,7 +1254,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { boolean isRestoreTask = true; // We don't care the visible version in restore job, the end version is used. long visibleVersion = -1L; - SnapshotTask task = new SnapshotTask(null, replica.getBackendIdWithoutException(), + long beId = replica.getBackendIdWithoutException(); + SnapshotTask task = new SnapshotTask(null, beId, signature, jobId, db.getId(), tbl.getId(), part.getId(), index.getId(), tablet.getId(), visibleVersion, tbl.getSchemaHashByIndexId(index.getId()), timeoutMs, isRestoreTask); @@ -1263,7 +1264,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { } batchTask.addTask(task); unfinishedSignatureToId.put(signature, tablet.getId()); - bePathsMap.put(replica.getBackendIdWithoutException(), replica.getPathHash()); + bePathsMap.put(beId, replica.getPathHash()); } finally { tbl.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index dae95698807..fcdd7042160 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -176,7 +176,7 @@ public class ColocateTableIndex implements Writable { private Multimap<GroupId, Long> group2Tables = ArrayListMultimap.create(); // table_id -> group_id @SerializedName(value = "table2Group") - private Map<Long, GroupId> table2Group = Maps.newHashMap(); + private Map<Long, GroupId> table2Group = Maps.newConcurrentMap(); // group id -> group schema @SerializedName(value = "group2Schema") private Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap(); @@ -385,6 +385,13 @@ public class ColocateTableIndex implements Writable { } } + // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic, + // If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here + // table2Group is ConcurrentHashMap + public boolean isColocateTableNoLock(long tableId) { + return table2Group.containsKey(tableId); + } + public boolean isColocateTable(long tableId) { readLock(); try { @@ -424,6 +431,14 @@ public class ColocateTableIndex implements Writable { } } + // ATTN: in cloud, CloudReplica.getBackendIdImpl has some logic, + // If the FE concurrency is high, the CPU may be fully loaded, so try not to lock it here + // table2Group is ConcurrentHashMap + public GroupId getGroupNoLock(long tableId) { + Preconditions.checkState(table2Group.containsKey(tableId)); + return table2Group.get(tableId); + } + public GroupId getGroup(long tableId) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 61853bea3d4..3d557c13e49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -90,7 +90,8 @@ public class MetadataViewer { List<String> row = Lists.newArrayList(); ReplicaStatus status = ReplicaStatus.OK; - Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); + long beId = replica.getBackendIdWithoutException(); + Backend be = infoService.getBackend(beId); if (be == null || !be.isAlive() || replica.isBad()) { status = ReplicaStatus.DEAD; } else if (replica.getVersion() < visibleVersion @@ -109,7 +110,7 @@ public class MetadataViewer { row.add(String.valueOf(tabletId)); row.add(String.valueOf(replica.getId())); - row.add(String.valueOf(replica.getBackendIdWithoutException())); + row.add(String.valueOf(beId)); row.add(String.valueOf(replica.getVersion())); row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); @@ -197,7 +198,8 @@ public class MetadataViewer { List<String> row = Lists.newArrayList(); ReplicaStatus status = ReplicaStatus.OK; - Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); + long beId = replica.getBackendIdWithoutException(); + Backend be = infoService.getBackend(beId); if (be == null || !be.isAlive() || replica.isBad()) { status = ReplicaStatus.DEAD; } else if (replica.getVersion() < visibleVersion @@ -216,7 +218,7 @@ public class MetadataViewer { row.add(String.valueOf(tabletId)); row.add(String.valueOf(replica.getId())); - row.add(String.valueOf(replica.getBackendIdWithoutException())); + row.add(String.valueOf(beId)); row.add(String.valueOf(replica.getVersion())); row.add(String.valueOf(replica.getLastFailedVersion())); row.add(String.valueOf(replica.getLastSuccessVersion())); @@ -338,13 +340,14 @@ public class MetadataViewer { for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - if (!countMap.containsKey(replica.getBackendIdWithoutException())) { + long beId = replica.getBackendIdWithoutException(); + if (!countMap.containsKey(beId)) { continue; } - countMap.put(replica.getBackendIdWithoutException(), - countMap.get(replica.getBackendIdWithoutException()) + 1); - sizeMap.put(replica.getBackendIdWithoutException(), - sizeMap.get(replica.getBackendIdWithoutException()) + replica.getDataSize()); + countMap.put(beId, + countMap.get(beId) + 1); + sizeMap.put(beId, + sizeMap.get(beId) + replica.getDataSize()); totalReplicaNum++; totalReplicaSize += replica.getDataSize(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index c66a28a39cf..6fc413a861c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -313,12 +313,12 @@ public class DiskRebalancer extends Rebalancer { if (replica.getDataSize() == 0) { throw new SchedException(Status.UNRECOVERABLE, SubCode.DIAGNOSE_IGNORE, "size of src replica is zero"); } - + long beId = replica.getBackendIdWithoutException(); // check src slot - PathSlot slot = backendsWorkingSlots.get(replica.getBackendIdWithoutException()); + PathSlot slot = backendsWorkingSlots.get(beId); if (slot == null) { if (LOG.isDebugEnabled()) { - LOG.debug("BE does not have slot: {}", replica.getBackendIdWithoutException()); + LOG.debug("BE does not have slot: {}", beId); } throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); } @@ -329,7 +329,7 @@ public class DiskRebalancer extends Rebalancer { // after take src slot, we can set src replica now tabletCtx.setSrc(replica); - BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendIdWithoutException()); + BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(beId); if (!beStat.isAvailable()) { throw new SchedException(Status.UNRECOVERABLE, "the backend is not available"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 657523123ad..30a7a76b920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -284,9 +284,10 @@ public class PartitionRebalancer extends Rebalancer { // Check src replica's validation Replica srcReplica = tabletCtx.getTablet().getReplicaByBackendId(move.fromBe); Preconditions.checkNotNull(srcReplica); - TabletScheduler.PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException()); + long beId = srcReplica.getBackendIdWithoutException(); + TabletScheduler.PathSlot slot = backendsWorkingSlots.get(beId); Preconditions.checkNotNull(slot, "unable to get fromBe " - + srcReplica.getBackendIdWithoutException() + " slot"); + + beId + " slot"); if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) { tabletCtx.setSrc(srcReplica); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index b8a098cc891..11d134cabc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -511,27 +511,28 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { } String host = backend.getHost(); for (Replica replica : tablet.getReplicas()) { - Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); + long replicaBeId = replica.getBackendIdWithoutException(); + Backend be = infoService.getBackend(beId); if (be == null) { // BE has been dropped, skip it if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist, skip. tablet: {}", - replica.getBackendIdWithoutException(), tabletId); + replicaBeId, tabletId); } continue; } if (!Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && host.equals(be.getHost())) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is on same host {}, skip. tablet: {}", - replica.getBackendIdWithoutException(), host, tabletId); + replicaBeId, host, tabletId); } return true; } - if (replica.getBackendIdWithoutException() == beId) { + if (replicaBeId == beId) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is same as dest backend {}, skip. tablet: {}", - replica.getBackendIdWithoutException(), beId, tabletId); + replicaBeId, beId, tabletId); } return true; } @@ -587,10 +588,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { */ List<Replica> candidates = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { - if (exceptBeId != -1 && replica.getBackendIdWithoutException() == exceptBeId) { + long replicaBeId = replica.getBackendIdWithoutException(); + if (exceptBeId != -1 && replicaBeId == exceptBeId) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} is same as except backend {}, skip. tablet: {}", - replica.getBackendIdWithoutException(), exceptBeId, tabletId); + replicaBeId, exceptBeId, tabletId); } continue; } @@ -603,12 +605,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { continue; } - Backend be = infoService.getBackend(replica.getBackendIdWithoutException()); + Backend be = infoService.getBackend(replicaBeId); if (be == null || !be.isAlive()) { // backend which is in decommission can still be the source backend if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist or is not alive, skip. tablet: {}", - replica.getBackendIdWithoutException(), tabletId); + replicaBeId, tabletId); } continue; } @@ -640,11 +642,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // sort replica by version count asc, so that we prefer to choose replicas with fewer versions Collections.sort(candidates, VERSION_COUNTER_COMPARATOR); for (Replica srcReplica : candidates) { - PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendIdWithoutException()); + long replicaBeId = srcReplica.getBackendIdWithoutException(); + PathSlot slot = backendsWorkingSlots.get(replicaBeId); if (slot == null) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not have working slot, skip. tablet: {}", - srcReplica.getBackendIdWithoutException(), tabletId); + replicaBeId, tabletId); } continue; } @@ -653,7 +656,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { if (srcPathHash == -1) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not have available slot, skip. tablet: {}", - srcReplica.getBackendIdWithoutException(), tabletId); + replicaBeId, tabletId); } continue; } @@ -701,10 +704,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { } if (!replica.isScheduleAvailable()) { - if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replica.getBackendIdWithoutException())) { + long replicaBeId = replica.getBackendIdWithoutException(); + if (Env.getCurrentSystemInfo().checkBackendScheduleAvailable(replicaBeId)) { if (LOG.isDebugEnabled()) { LOG.debug("replica's backend {} does not exist or is not scheduler available, skip. tablet: {}", - replica.getBackendIdWithoutException(), tabletId); + replicaBeId, tabletId); } } else { if (LOG.isDebugEnabled()) { @@ -816,6 +820,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, "unable to take slot of dest path"); } + long chosenReplicaBeId = chosenReplica.getBackendIdWithoutException(); if (chosenReplica.getState() == ReplicaState.DECOMMISSION) { // Since this replica is selected as the repair object of VERSION_INCOMPLETE, @@ -838,9 +843,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { setDecommissionTime(-1); LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete," + " and change state from DECOMMISSION to NORMAL", - chosenReplica.getId(), chosenReplica.getBackendIdWithoutException(), tabletId); + chosenReplica.getId(), chosenReplicaBeId, tabletId); } - setDest(chosenReplica.getBackendIdWithoutException(), chosenReplica.getPathHash()); + setDest(chosenReplicaBeId, chosenReplica.getPathHash()); } private boolean checkFurtherRepairFinish(Replica replica, long version) { @@ -974,10 +979,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // database lock should be held. public CloneTask createCloneReplicaAndTask() throws SchedException { - Backend srcBe = infoService.getBackend(srcReplica.getBackendIdWithoutException()); + long beId = srcReplica.getBackendIdWithoutException(); + Backend srcBe = infoService.getBackend(beId); if (srcBe == null) { throw new SchedException(Status.SCHEDULE_FAILED, - "src backend " + srcReplica.getBackendIdWithoutException() + " does not exist"); + "src backend " + beId + " does not exist"); } Backend destBe = infoService.getBackend(destBackendId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index da13d5c61c5..d2413552b8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1107,8 +1107,9 @@ public class TabletScheduler extends MasterDaemon { double maxScore = 0; long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L); for (Replica replica : replicas) { + long beId = replica.getBackendIdWithoutException(); BackendLoadStatistic beStatistic = statistic - .getBackendLoadStatistic(replica.getBackendIdWithoutException()); + .getBackendLoadStatistic(beId); if (beStatistic == null) { continue; } @@ -1132,7 +1133,7 @@ public class TabletScheduler extends MasterDaemon { chosenReplica = replica; } - if (debugHighBeId > 0 && replica.getBackendIdWithoutException() == debugHighBeId) { + if (debugHighBeId > 0 && beId == debugHighBeId) { chosenReplica = replica; break; } @@ -1219,6 +1220,7 @@ public class TabletScheduler extends MasterDaemon { * If all are finished, which means this replica is * safe to be deleted. */ + long beId = replica.getBackendIdWithoutException(); if (!force && !Config.enable_force_drop_redundant_replica && !FeConstants.runningUnitTest && (replica.getState().canLoad() || replica.getState() == ReplicaState.DECOMMISSION)) { @@ -1228,7 +1230,7 @@ public class TabletScheduler extends MasterDaemon { // Remain it as VERY_HIGH may block other task. tabletCtx.setPriority(Priority.NORMAL); LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}", - replica.getId(), replica.getBackendIdWithoutException(), tabletCtx.getTabletId(), reason); + replica.getId(), beId, tabletCtx.getTabletId(), reason); } try { long preWatermarkTxnId = replica.getPreWatermarkTxnId(); @@ -1237,7 +1239,7 @@ public class TabletScheduler extends MasterDaemon { .getTransactionIDGenerator().getNextTransactionId(); replica.setPreWatermarkTxnId(preWatermarkTxnId); LOG.info("set decommission replica {} on backend {} of tablet {} pre watermark txn id {}", - replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), preWatermarkTxnId); + replica.getId(), beId, tabletCtx.getTabletId(), preWatermarkTxnId); } long postWatermarkTxnId = replica.getPostWatermarkTxnId(); @@ -1251,7 +1253,7 @@ public class TabletScheduler extends MasterDaemon { replica.setPostWatermarkTxnId(postWatermarkTxnId); LOG.info("set decommission replica {} on backend {} of tablet {} post watermark txn id {}", - replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), postWatermarkTxnId); + replica.getId(), beId, tabletCtx.getTabletId(), postWatermarkTxnId); } if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId, @@ -1276,7 +1278,7 @@ public class TabletScheduler extends MasterDaemon { // NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report // deleting these replicas, but in FORCE_REDUNDANT case, replica may be added to meta again in report // process. - sendDeleteReplicaTask(replica.getBackendIdWithoutException(), tabletCtx.getTabletId(), replica.getId(), + sendDeleteReplicaTask(beId, tabletCtx.getTabletId(), replica.getId(), tabletCtx.getSchemaHash()); } @@ -1286,12 +1288,12 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.getPartitionId(), tabletCtx.getIndexId(), tabletCtx.getTabletId(), - replica.getBackendIdWithoutException()); + beId); Env.getCurrentEnv().getEditLog().logDeleteReplica(info); LOG.info("delete replica. tablet id: {}, backend id: {}. reason: {}, force: {}", - tabletCtx.getTabletId(), replica.getBackendIdWithoutException(), reason, force); + tabletCtx.getTabletId(), beId, reason, force); } private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 12f27eae3d0..f27b9fdf0cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -94,7 +94,7 @@ public class CloudReplica extends Replica { } private boolean isColocated() { - return Env.getCurrentColocateIndex().isColocateTable(tableId); + return Env.getCurrentColocateIndex().isColocateTableNoLock(tableId); } public long getColocatedBeId(String clusterId) throws ComputeGroupException { @@ -130,7 +130,7 @@ public class CloudReplica extends Replica { ComputeGroupException.FailedTypeEnum.COMPUTE_GROUPS_NO_ALIVE_BE); } - GroupId groupId = Env.getCurrentColocateIndex().getGroup(tableId); + GroupId groupId = Env.getCurrentColocateIndex().getGroupNoLock(tableId); HashCode hashCode = Hashing.murmur3_128().hashLong(groupId.grpId); if (availableBes.size() != bes.size()) { // some be is dead recently, still hash tablets on all backends. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index d7958f75504..3d8d1c46fa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -91,7 +91,8 @@ public class ReplicasProcNode implements ProcNodeInterface { } for (Replica replica : replicas) { - Backend be = backendMap.get(replica.getBackendIdWithoutException()); + long beId = replica.getBackendIdWithoutException(); + Backend be = backendMap.get(beId); String host = (be == null ? Backend.DUMMY_IP : be.getHost()); int port = (be == null ? 0 : be.getHttpPort()); String hostPort = NetUtils.getHostPortInAccessibleFormat(host, port); @@ -117,7 +118,7 @@ public class ReplicasProcNode implements ProcNodeInterface { queryHits = QueryStatsUtil.getMergedReplicaStats(replica.getId()); } List<String> replicaInfo = Lists.newArrayList(String.valueOf(replica.getId()), - String.valueOf(replica.getBackendIdWithoutException()), + String.valueOf(beId), String.valueOf(replica.getVersion()), String.valueOf(replica.getLastSuccessVersion()), String.valueOf(replica.getLastFailedVersion()), diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index 8eca5f84faa..2a8aa7e35b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -141,8 +141,9 @@ public class TabletsProcDir implements ProcDirInterface { tabletInfos.add(tabletInfo); } else { for (Replica replica : tablet.getReplicas()) { + long beId = replica.getBackendIdWithoutException(); if ((version > -1 && replica.getVersion() != version) - || (backendId > -1 && replica.getBackendIdWithoutException() != backendId) + || (backendId > -1 && beId != backendId) || (state != null && replica.getState() != state)) { continue; } @@ -150,7 +151,7 @@ public class TabletsProcDir implements ProcDirInterface { // tabletId -- replicaId -- backendId -- version -- dataSize -- rowCount -- state tabletInfo.add(tabletId); tabletInfo.add(replica.getId()); - tabletInfo.add(replica.getBackendIdWithoutException()); + tabletInfo.add(beId); tabletInfo.add(replica.getSchemaHash()); tabletInfo.add(replica.getVersion()); tabletInfo.add(replica.getLastSuccessVersion()); @@ -168,7 +169,7 @@ public class TabletsProcDir implements ProcDirInterface { tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L)); tabletInfo.add(replica.getPathHash()); tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), "")); - Backend be = backendMap.get(replica.getBackendIdWithoutException()); + Backend be = backendMap.get(beId); String host = (be == null ? Backend.DUMMY_IP : be.getHost()); int port = (be == null ? 0 : be.getHttpPort()); String hostPort = NetUtils.getHostPortInAccessibleFormat(host, port); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 664896693cb..b83fd878e08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -352,7 +352,7 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ // signature, adding 10 billion to `getNextId`. We are confident that the old signature // generated will not exceed this number. PushTask pushTask = new PushTask(null, - replica.getBackendId(), targetDb.getId(), targetTbl.getId(), + backendId, targetDb.getId(), targetTbl.getId(), partition.getId(), indexId, tabletId, replicaId, schemaHash, -1, "", -1, 0, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 33965e30a73..5a1aaec5085 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -838,11 +838,12 @@ public class OlapScanNode extends ScanNode { replicas.sort(Replica.ID_COMPARATOR); Replica replica = replicas.get(useFixReplica >= replicas.size() ? replicas.size() - 1 : useFixReplica); if (context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) { - Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendId()); + long beId = replica.getBackendId(); + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); // If the fixed replica is bad, then not clear the replicas using random replica if (backend == null || !backend.isAlive()) { if (LOG.isDebugEnabled()) { - LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(), + LOG.debug("backend {} not exists or is not alive for replica {}", beId, replica.getId()); } Collections.shuffle(replicas); @@ -928,7 +929,7 @@ public class OlapScanNode extends ScanNode { String ip = backend.getHost(); int port = backend.getBePort(); TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress(ip, port)); - scanRangeLocation.setBackendId(replica.getBackendId()); + scanRangeLocation.setBackendId(backendId); locations.addToLocations(scanRangeLocation); paloRange.addToHosts(new TNetworkAddress(ip, port)); tabletIsNull = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index cc2a4b1a90f..59d5bc571f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2260,10 +2260,11 @@ public class DatabaseTransactionMgr { if (newVersion == Partition.PARTITION_INIT_VERSION + 1) { index.setRowCountReported(false); } - Set<Long> partitionIds = backendPartitions.get(replica.getBackendIdWithoutException()); + long beId = replica.getBackendIdWithoutException(); + Set<Long> partitionIds = backendPartitions.get(beId); if (partitionIds == null) { partitionIds = Sets.newHashSet(); - backendPartitions.put(replica.getBackendIdWithoutException(), partitionIds); + backendPartitions.put(beId, partitionIds); } partitionIds.add(partitionId); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org