This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7a159622af0 [fix](storage medium) Fix partition show storage medium
not right when use default medium (#34123)
7a159622af0 is described below
commit 7a159622af04be8eeb42bb8bdda24136ba4024d8
Author: deardeng <[email protected]>
AuthorDate: Sat Apr 27 14:13:37 2024 +0800
[fix](storage medium) Fix partition show storage medium not right when use
default medium (#34123)
Co-authored-by: Yongqiang YANG
<[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 8 +-
.../java/org/apache/doris/backup/RestoreJob.java | 6 +-
.../org/apache/doris/catalog/DataProperty.java | 4 +
.../java/org/apache/doris/catalog/OlapTable.java | 6 +-
.../doris/common/util/DynamicPartitionUtil.java | 13 +-
.../apache/doris/common/util/PropertyAnalyzer.java | 3 +-
.../apache/doris/datasource/InternalCatalog.java | 77 ++++----
.../org/apache/doris/system/BeSelectionPolicy.java | 15 ++
.../org/apache/doris/system/SystemInfoService.java | 215 +++++++++------------
.../org/apache/doris/backup/RestoreJobTest.java | 10 +-
.../apache/doris/catalog/ModifyBackendTest.java | 2 +-
.../doris/catalog/ReplicaAllocationTest.java | 8 +-
.../doris/load/sync/canal/CanalSyncDataTest.java | 2 +-
.../apache/doris/system/SystemInfoServiceTest.java | 6 +-
14 files changed, 192 insertions(+), 183 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6e87e20960a..dccfa777668 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2035,7 +2035,13 @@ public class Config extends ConfigBase {
public static boolean skip_localhost_auth_check = true;
@ConfField(mutable = true)
- public static boolean enable_round_robin_create_tablet = false;
+ public static boolean enable_round_robin_create_tablet = true;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡",
+ "When creating tablet of a partition, always start from the first BE. "
+ + "Note: This method may cause BE imbalance"})
+ public static boolean create_tablet_round_robin_from_start = false;
/**
* To prevent different types (V1, V2, V3) of behavioral inconsistencies,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 123600f4705..8a79050e8a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1102,6 +1102,7 @@ public class RestoreJob extends AbstractJob {
long visibleVersion = remotePart.getVisibleVersion();
// tablets
+ Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (MaterializedIndex remoteIdx :
remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
int schemaHash =
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
@@ -1115,8 +1116,9 @@ public class RestoreJob extends AbstractJob {
// replicas
try {
- Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
- .selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, false);
+ Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium
= Env.getCurrentSystemInfo()
+ .selectBackendIdsForReplicaCreation(replicaAlloc,
nextIndexs, null, false, false);
+ Map<Tag, List<Long>> beIds = beIdsAndMedium.first;
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = env.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
index 731776384d0..2974d337167 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DataProperty.java
@@ -118,6 +118,10 @@ public class DataProperty implements Writable,
GsonPostProcessable {
storageMediumSpecified = isSpecified;
}
+ public void setStorageMedium(TStorageMedium medium) {
+ this.storageMedium = medium;
+ }
+
public static DataProperty read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) {
String json = Text.readString(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index d5ecc01febb..384e47dc69b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -594,6 +594,7 @@ public class OlapTable extends Table {
reserveReplica ? null : restoreReplicaAlloc,
isSinglePartition);
// for each partition, reset rollup index map
+ Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Partition partition = entry.getValue();
// entry.getKey() is the new partition id, use it to get the
restore specified
@@ -630,9 +631,10 @@ public class OlapTable extends Table {
// replicas
try {
- Map<Tag, List<Long>> tag2beIds =
+ Pair<Map<Tag, List<Long>>, TStorageMedium>
tag2beIdsAndMedium =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
- replicaAlloc, null, false, false);
+ replicaAlloc, nextIndexs, null, false,
false);
+ Map<Tag, List<Long>> tag2beIds =
tag2beIdsAndMedium.first;
for (Map.Entry<Tag, List<Long>> entry3 :
tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
index 6587da5aef6..ca3086c6ede 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
@@ -40,10 +40,12 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.policy.StoragePolicy;
+import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -228,7 +230,8 @@ public class DynamicPartitionUtil {
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT,
val);
}
ReplicaAllocation replicaAlloc = new
ReplicaAllocation(Short.valueOf(val));
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, true);
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
Maps.newHashMap(),
+ null, false, true);
}
private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc,
int hotPartitionNum,
@@ -237,14 +240,16 @@ public class DynamicPartitionUtil {
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO);
}
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
null, false, true);
+ Map<Tag, Integer> nextIndexs = Maps.newHashMap();
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
nextIndexs, null,
+ false, true);
if (hotPartitionNum <= 0) {
return;
}
try {
-
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
TStorageMedium.SSD, false,
- true);
+
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc,
nextIndexs, TStorageMedium.SSD,
+ false, true);
} catch (DdlException e) {
throw new DdlException("Failed to find enough backend for ssd
storage medium. When setting "
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the
hot partitions will store "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 901f554b094..686495ef73d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -1093,6 +1093,7 @@ public class PropertyAnalyzer {
allocationVal = allocationVal.replaceAll(" ", "");
String[] locations = allocationVal.split(",");
int totalReplicaNum = 0;
+ Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (String location : locations) {
String[] parts = location.split(":");
if (parts.length != 2) {
@@ -1116,7 +1117,7 @@ public class PropertyAnalyzer {
try {
SystemInfoService systemInfoService =
Env.getCurrentSystemInfo();
systemInfoService.selectBackendIdsForReplicaCreation(
- replicaAlloc, null, false, true);
+ replicaAlloc, nextIndexs, null, false, true);
} catch (DdlException ddlException) {
throw new AnalysisException(ddlException.getMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 64cfc9b745a..506da48d7a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1573,7 +1573,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
long partitionId = idGeneratorBuffer.getNextId();
Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(),
partitionId, partitionName, indexIdToMeta,
- distributionInfo, dataProperty.getStorageMedium(),
singlePartitionDesc.getReplicaAlloc(),
+ distributionInfo, dataProperty,
singlePartitionDesc.getReplicaAlloc(),
singlePartitionDesc.getVersionInfo(), bfColumns,
olapTable.getBfFpp(), tabletIdSet,
olapTable.getCopiedIndexes(),
singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
singlePartitionDesc.getTabletType(),
olapTable.getCompressionType(), olapTable.getDataSortInfo(),
@@ -1828,7 +1828,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
private Partition createPartitionWithIndices(String clusterName, long
dbId, long tableId, String tableName,
long baseIndexId, long partitionId, String partitionName,
Map<Long, MaterializedIndexMeta> indexIdToMeta,
- DistributionInfo distributionInfo, TStorageMedium storageMedium,
ReplicaAllocation replicaAlloc,
+ DistributionInfo distributionInfo, DataProperty dataProperty,
ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long>
tabletIdSet, List<Index> indexes,
boolean isInMemory, TStorageFormat storageFormat, TTabletType
tabletType, TCompressionType compressionType,
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,
String storagePolicy,
@@ -1868,6 +1868,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
long version = partition.getVisibleVersion();
short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ TStorageMedium realStorageMedium = null;
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
@@ -1875,9 +1876,16 @@ public class InternalCatalog implements
CatalogIf<Database> {
// create tablets
int schemaHash = indexMeta.getSchemaHash();
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium);
- createTablets(clusterName, index, ReplicaState.NORMAL,
distributionInfo, version, replicaAlloc, tabletMeta,
- tabletIdSet, idGeneratorBuffer, isStorageMediumSpecified);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId,
+ schemaHash, dataProperty.getStorageMedium());
+ realStorageMedium = createTablets(clusterName, index,
ReplicaState.NORMAL, distributionInfo, version,
+ replicaAlloc, tabletMeta, tabletIdSet, idGeneratorBuffer,
dataProperty.isStorageMediumSpecified());
+ if (realStorageMedium != null &&
!realStorageMedium.equals(dataProperty.getStorageMedium())) {
+ dataProperty.setStorageMedium(realStorageMedium);
+ LOG.info("real medium not eq default "
+ + "tableName={} tableId={} partitionName={}
partitionId={} readMedium {}",
+ tableName, tableId, partitionName, partitionId,
realStorageMedium);
+ }
boolean ok = false;
String errMsg = null;
@@ -1898,8 +1906,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId,
dbId, tableId, partitionId, indexId,
tabletId, replicaId, shortKeyColumnCount,
schemaHash, version, keysType, storageType,
- storageMedium, schema, bfColumns, bfFpp,
countDownLatch, indexes, isInMemory, tabletType,
- dataSortInfo, compressionType,
enableUniqueKeyMergeOnWrite, storagePolicy,
+ realStorageMedium, schema, bfColumns, bfFpp,
countDownLatch, indexes, isInMemory,
+ tabletType, dataSortInfo, compressionType,
enableUniqueKeyMergeOnWrite, storagePolicy,
disableAutoCompaction,
enableSingleReplicaCompaction, skipWriteIndexOnLoad,
compactionPolicy,
timeSeriesCompactionGoalSizeMbytes,
timeSeriesCompactionFileCountThreshold,
timeSeriesCompactionTimeThresholdSeconds,
@@ -2486,7 +2494,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(),
partitionId, partitionName,
olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
-
partitionInfo.getDataProperty(partitionId).getStorageMedium(),
+ partitionInfo.getDataProperty(partitionId),
partitionInfo.getReplicaAllocation(partitionId),
versionInfo, bfColumns, bfFpp, tabletIdSet,
olapTable.getCopiedIndexes(), isInMemory,
storageFormat, tabletType, compressionType,
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
@@ -2561,7 +2569,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Partition partition =
createPartitionWithIndices(db.getClusterName(), db.getId(),
olapTable.getId(), olapTable.getName(),
olapTable.getBaseIndexId(), entry.getValue(),
entry.getKey(), olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
- dataProperty.getStorageMedium(),
partitionInfo.getReplicaAllocation(entry.getValue()),
+ dataProperty,
partitionInfo.getReplicaAllocation(entry.getValue()),
versionInfo, bfColumns, bfFpp, tabletIdSet,
olapTable.getCopiedIndexes(), isInMemory,
storageFormat,
partitionInfo.getTabletType(entry.getValue()), compressionType,
olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(),
@@ -2760,11 +2768,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
@VisibleForTesting
- public void createTablets(String clusterName, MaterializedIndex index,
ReplicaState replicaState,
+ public TStorageMedium createTablets(String clusterName, MaterializedIndex
index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc, TabletMeta tabletMeta,
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer,
boolean isStorageMediumSpecified)
throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
+ SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
GroupId groupId = null;
if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
@@ -2784,19 +2793,23 @@ public class InternalCatalog implements
CatalogIf<Database> {
backendsPerBucketSeq = Maps.newHashMap();
}
+ TStorageMedium storageMedium = Config.disable_storage_medium_check ?
null : tabletMeta.getStorageMedium();
+
Map<Tag, Integer> nextIndexs = new HashMap<>();
if (Config.enable_round_robin_create_tablet) {
- for (Map.Entry<Tag, Short> entry :
replicaAlloc.getAllocMap().entrySet()) {
- int startPos =
Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(),
- tabletMeta.getStorageMedium());
- if (startPos == -1) {
- throw new DdlException("The number of BEs that match the
policy is insufficient");
+ for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
+ int startPos = -1;
+ if (Config.create_tablet_round_robin_from_start) {
+ startPos = 0;
+ } else {
+ startPos = systemInfoService.getStartPosOfRoundRobin(tag,
storageMedium,
+ isStorageMediumSpecified);
}
- nextIndexs.put(entry.getKey(), startPos);
+ nextIndexs.put(tag, startPos);
}
}
-
+ TStorageMedium realStorageMedium = Config.disable_storage_medium_check
? null : tabletMeta.getStorageMedium();
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
@@ -2807,30 +2820,15 @@ public class InternalCatalog implements
CatalogIf<Database> {
// get BackendIds
Map<Tag, List<Long>> chosenBackendIds;
+
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a
normal table,
// choose backends
- if (Config.enable_round_robin_create_tablet) {
- if (!Config.disable_storage_medium_check) {
- chosenBackendIds = Env.getCurrentSystemInfo()
-
.getBeIdRoundRobinForReplicaCreation(replicaAlloc,
tabletMeta.getStorageMedium(),
- nextIndexs);
- } else {
- chosenBackendIds = Env.getCurrentSystemInfo()
-
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, null,
- nextIndexs);
- }
- } else {
- if (!Config.disable_storage_medium_check) {
- chosenBackendIds = Env.getCurrentSystemInfo()
-
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
- isStorageMediumSpecified, false);
- } else {
- chosenBackendIds = Env.getCurrentSystemInfo()
-
.selectBackendIdsForReplicaCreation(replicaAlloc, null,
- isStorageMediumSpecified, false);
- }
- }
+ Pair<Map<Tag, List<Long>>, TStorageMedium>
chosenBackendIdsAndMedium
+ =
systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
+ storageMedium, isStorageMediumSpecified, false);
+ chosenBackendIds = chosenBackendIdsAndMedium.first;
+ storageMedium = chosenBackendIdsAndMedium.second;
for (Map.Entry<Tag, List<Long>> entry :
chosenBackendIds.entrySet()) {
backendsPerBucketSeq.putIfAbsent(entry.getKey(),
Lists.newArrayList());
@@ -2863,6 +2861,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
ColocatePersistInfo info =
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateBackendsPerBucketSeq(info);
}
+ return realStorageMedium;
}
/*
@@ -2992,7 +2991,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
Partition newPartition =
createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(),
copiedTbl.getName(), copiedTbl.getBaseIndexId(),
newPartitionId, entry.getKey(),
copiedTbl.getIndexIdToMeta(),
partitionsDistributionInfo.get(oldPartitionId),
-
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
+
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId),
copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /*
version info */,
copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(),
tabletIdSet, copiedTbl.getCopiedIndexes(),
copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 3a711307bd5..ace2ab3e1e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -51,6 +51,11 @@ public class BeSelectionPolicy {
public boolean preferComputeNode = false;
public int expectBeNum = 0;
+ public boolean enableRoundRobin = false;
+ // if enable round robin, choose next be from nextRoundRobinIndex
+ // call SystemInfoService::selectBackendIdsByPolicy will update
nextRoundRobinIndex
+ public int nextRoundRobinIndex = -1;
+
public List<String> preferredLocations = new ArrayList<>();
private BeSelectionPolicy() {
@@ -114,6 +119,16 @@ public class BeSelectionPolicy {
return this;
}
+ public Builder setEnableRoundRobin(boolean enableRoundRobin) {
+ policy.enableRoundRobin = enableRoundRobin;
+ return this;
+ }
+
+ public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) {
+ policy.nextRoundRobinIndex = nextRoundRobinIndex;
+ return this;
+ }
+
public BeSelectionPolicy build() {
return policy;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 050e1cf94f7..380a976cbef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -54,6 +54,7 @@ import org.jetbrains.annotations.NotNull;
import java.io.DataInputStream;
import java.io.IOException;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -406,71 +407,21 @@ public class SystemInfoService {
return idToBackendRef.values().stream().filter(backend ->
backend.isComputeNode()).collect(Collectors.toList());
}
- class BeComparator implements Comparator<Backend> {
+ class BeIdComparator implements Comparator<Backend> {
public int compare(Backend a, Backend b) {
return (int) (a.getId() - b.getId());
}
}
- public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy
policy, int number,
- int nextIndex) {
- Preconditions.checkArgument(number >= -1);
- List<Backend> candidates = getCandidates(policy);
- if (number != -1 && candidates.size() < number) {
- LOG.info("Not match policy: {}. candidates num: {}, expected: {}",
policy, candidates.size(), number);
- return Lists.newArrayList();
- }
-
- int realIndex = nextIndex % candidates.size();
- List<Long> partialOrderList = new ArrayList<Long>();
- partialOrderList.addAll(candidates.subList(realIndex,
candidates.size())
- .stream().map(b -> b.getId()).collect(Collectors.toList()));
- partialOrderList.addAll(candidates.subList(0, realIndex)
- .stream().map(b -> b.getId()).collect(Collectors.toList()));
-
- if (number == -1) {
- return partialOrderList;
- } else {
- return partialOrderList.subList(0, number);
- }
- }
-
- public List<Backend> getCandidates(BeSelectionPolicy policy) {
- List<Backend> candidates =
policy.getCandidateBackends(idToBackendRef.values());
- if (candidates.isEmpty()) {
- LOG.info("Not match policy: {}. candidates num: {}", policy,
candidates.size());
- return Lists.newArrayList();
- }
-
- if (!policy.allowOnSameHost) {
- Map<String, List<Backend>> backendMaps = Maps.newHashMap();
- for (Backend backend : candidates) {
- if (backendMaps.containsKey(backend.getHost())) {
- backendMaps.get(backend.getHost()).add(backend);
- } else {
- List<Backend> list = Lists.newArrayList();
- list.add(backend);
- backendMaps.put(backend.getHost(), list);
- }
- }
- candidates.clear();
- for (List<Backend> list : backendMaps.values()) {
- candidates.add(list.get(0));
- }
- }
-
- if (candidates.isEmpty()) {
- LOG.info("Not match policy: {}. candidates num: {}", policy,
candidates.size());
- return Lists.newArrayList();
+ class BeHostComparator implements Comparator<Backend> {
+ public int compare(Backend a, Backend b) {
+ return a.getHost().compareTo(b.getHost());
}
-
- Collections.sort(candidates, new BeComparator());
- return candidates;
}
// Select the smallest number of tablets as the starting position of
// round robin in the BE that match the policy
- public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) {
+ public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium,
boolean isStorageMediumSpecified) {
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder()
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag))
.setStorageMedium(storageMedium);
@@ -479,13 +430,17 @@ public class SystemInfoService {
}
BeSelectionPolicy policy = builder.build();
- List<Backend> candidates = getCandidates(policy);
+ List<Long> beIds = selectBackendIdsByPolicy(policy, -1);
+ if (beIds.isEmpty() && storageMedium != null &&
!isStorageMediumSpecified) {
+ storageMedium = (storageMedium == TStorageMedium.HDD) ?
TStorageMedium.SSD : TStorageMedium.HDD;
+ policy = builder.setStorageMedium(storageMedium).build();
+ beIds = selectBackendIdsByPolicy(policy, -1);
+ }
long minBeTabletsNum = Long.MAX_VALUE;
int minIndex = -1;
- for (int i = 0; i < candidates.size(); ++i) {
- long tabletsNum = Env.getCurrentInvertedIndex()
- .getTabletIdsByBackendId(candidates.get(i).getId()).size();
+ for (int i = 0; i < beIds.size(); ++i) {
+ long tabletsNum =
Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beIds.get(i)).size();
if (tabletsNum < minBeTabletsNum) {
minBeTabletsNum = tabletsNum;
minIndex = i;
@@ -494,48 +449,21 @@ public class SystemInfoService {
return minIndex;
}
- public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation(
- ReplicaAllocation replicaAlloc, TStorageMedium storageMedium,
- Map<Tag, Integer> nextIndexs) throws DdlException {
- Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
- Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
- short totalReplicaNum = 0;
- for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
- BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder()
-
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
- .setStorageMedium(storageMedium);
- if (FeConstants.runningUnitTest ||
Config.allow_replica_on_same_host) {
- builder.allowOnSameHost();
- }
-
- BeSelectionPolicy policy = builder.build();
- int nextIndex = nextIndexs.get(entry.getKey());
- List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy,
entry.getValue(), nextIndex);
- nextIndexs.put(entry.getKey(), nextIndex + beIds.size());
-
- if (beIds.isEmpty()) {
- throw new DdlException("Failed to find " + entry.getValue() +
" backend(s) for policy: " + policy);
- }
- chosenBackendIds.put(entry.getKey(), beIds);
- totalReplicaNum += beIds.size();
- }
- Preconditions.checkState(totalReplicaNum ==
replicaAlloc.getTotalReplicaNum());
- return chosenBackendIds;
- }
-
/**
* Select a set of backends for replica creation.
* The following parameters need to be considered when selecting backends.
*
* @param replicaAlloc
+ * @param nextIndexs create tablet round robin next be index, when
enable_round_robin_create_tablet
* @param storageMedium
* @param isStorageMediumSpecified
* @param isOnlyForCheck set true if only used for check available backend
* @return return the selected backend ids group by tag.
* @throws DdlException
*/
- public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
- ReplicaAllocation replicaAlloc, TStorageMedium storageMedium,
boolean isStorageMediumSpecified,
+ public Pair<Map<Tag, List<Long>>, TStorageMedium>
selectBackendIdsForReplicaCreation(
+ ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs,
TStorageMedium storageMedium,
+ boolean isStorageMediumSpecified,
boolean isOnlyForCheck)
throws DdlException {
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
@@ -552,6 +480,7 @@ public class SystemInfoService {
List<String> failedEntries = Lists.newArrayList();
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
+ Tag tag = entry.getKey();
BeSelectionPolicy.Builder builder = new
BeSelectionPolicy.Builder()
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
.setStorageMedium(storageMedium);
@@ -559,21 +488,34 @@ public class SystemInfoService {
builder.allowOnSameHost();
}
+ if (Config.enable_round_robin_create_tablet) {
+ builder.setEnableRoundRobin(true);
+
builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1));
+ }
+
BeSelectionPolicy policy = builder.build();
List<Long> beIds = selectBackendIdsByPolicy(policy,
entry.getValue());
// first time empty, retry with different storage medium
// if only for check, no need to retry different storage
medium to get backend
+ TStorageMedium originalStorageMedium = storageMedium;
if (beIds.isEmpty() && storageMedium != null &&
!isStorageMediumSpecified && !isOnlyForCheck) {
storageMedium = (storageMedium == TStorageMedium.HDD) ?
TStorageMedium.SSD : TStorageMedium.HDD;
- policy = builder.setStorageMedium(storageMedium).build();
+ builder.setStorageMedium(storageMedium);
+ if (Config.enable_round_robin_create_tablet) {
+
builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1));
+ }
+ policy = builder.build();
beIds = selectBackendIdsByPolicy(policy, entry.getValue());
}
+ if (Config.enable_round_robin_create_tablet) {
+ nextIndexs.put(tag, policy.nextRoundRobinIndex);
+ }
// after retry different storage medium, it's still empty
if (beIds.isEmpty()) {
- LOG.error("failed backend(s) for policy:" + policy);
+ LOG.error("failed backend(s) for policy: {} real medium
{}", policy, originalStorageMedium);
String errorReplication = "replication tag: " +
entry.getKey()
+ ", replication num: " + entry.getValue()
- + ", storage medium: " + storageMedium;
+ + ", storage medium: " + originalStorageMedium;
failedEntries.add(errorReplication);
} else {
chosenBackendIds.put(entry.getKey(), beIds);
@@ -589,13 +531,13 @@ public class SystemInfoService {
}
Preconditions.checkState(totalReplicaNum ==
replicaAlloc.getTotalReplicaNum());
- return chosenBackendIds;
+ return Pair.of(chosenBackendIds, storageMedium);
}
/**
* Select a set of backends by the given policy.
*
- * @param policy
+ * @param policy if policy is enableRoundRobin, will update its
nextRoundRobinIndex
* @param number number of backends which need to be selected. -1 means
return as many as possible.
* @return return #number of backend ids,
* or empty set if no backends match the policy, or the number of matched
backends is less than "number";
@@ -603,50 +545,75 @@ public class SystemInfoService {
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int
number) {
Preconditions.checkArgument(number >= -1);
List<Backend> candidates =
policy.getCandidateBackends(idToBackendRef.values());
- if ((number != -1 && candidates.size() < number) ||
candidates.isEmpty()) {
+ if (candidates.size() < number || candidates.isEmpty()) {
LOG.debug("Not match policy: {}. candidates num: {}, expected:
{}", policy, candidates.size(), number);
return Lists.newArrayList();
}
// If only need one Backend, just return a random one.
- if (number == 1) {
+ if (number == 1 && !policy.enableRoundRobin) {
Collections.shuffle(candidates);
return Lists.newArrayList(candidates.get(0).getId());
}
- if (policy.allowOnSameHost) {
- Collections.shuffle(candidates);
- if (number == -1) {
- return candidates.stream().map(b ->
b.getId()).collect(Collectors.toList());
- } else {
- return candidates.subList(0, number).stream().map(b ->
b.getId()).collect(Collectors.toList());
+ boolean hasSameHost = false;
+ if (!policy.allowOnSameHost) {
+ // for each host, random select one backend.
+ Map<String, List<Backend>> backendMaps = Maps.newHashMap();
+ for (Backend backend : candidates) {
+ if (backendMaps.containsKey(backend.getHost())) {
+ backendMaps.get(backend.getHost()).add(backend);
+ } else {
+ List<Backend> list = Lists.newArrayList();
+ list.add(backend);
+ backendMaps.put(backend.getHost(), list);
+ }
}
- }
-
- // for each host, random select one backend.
- Map<String, List<Backend>> backendMaps = Maps.newHashMap();
- for (Backend backend : candidates) {
- if (backendMaps.containsKey(backend.getHost())) {
- backendMaps.get(backend.getHost()).add(backend);
- } else {
- List<Backend> list = Lists.newArrayList();
- list.add(backend);
- backendMaps.put(backend.getHost(), list);
+ candidates.clear();
+ for (List<Backend> list : backendMaps.values()) {
+ if (list.size() > 1) {
+ Collections.shuffle(list);
+ hasSameHost = true;
+ }
+ candidates.add(list.get(0));
}
}
- candidates.clear();
- for (List<Backend> list : backendMaps.values()) {
- Collections.shuffle(list);
- candidates.add(list.get(0));
- }
- if (number != -1 && candidates.size() < number) {
+
+ if (candidates.size() < number) {
LOG.debug("Not match policy: {}. candidates num: {}, expected:
{}", policy, candidates.size(), number);
return Lists.newArrayList();
}
- Collections.shuffle(candidates);
- if (number != -1) {
- return candidates.subList(0, number).stream().map(b ->
b.getId()).collect(Collectors.toList());
+
+ if (policy.enableRoundRobin) {
+ if (!policy.allowOnSameHost && hasSameHost) {
+ // not allow same host and has same host,
+ // then we compare them with their host
+ Collections.sort(candidates, new BeHostComparator());
+ } else {
+ Collections.sort(candidates, new BeIdComparator());
+ }
+
+ if (policy.nextRoundRobinIndex < 0) {
+ policy.nextRoundRobinIndex = new
SecureRandom().nextInt(candidates.size());
+ }
+
+ int realIndex = policy.nextRoundRobinIndex % candidates.size();
+ List<Long> partialOrderList = new ArrayList<Long>();
+ partialOrderList.addAll(candidates.subList(realIndex,
candidates.size())
+
.stream().map(Backend::getId).collect(Collectors.toList()));
+ partialOrderList.addAll(candidates.subList(0, realIndex)
+
.stream().map(Backend::getId).collect(Collectors.toList()));
+
+ List<Long> result = number == -1 ? partialOrderList :
partialOrderList.subList(0, number);
+ policy.nextRoundRobinIndex = realIndex + result.size();
+
+ return result;
} else {
- return candidates.stream().map(b ->
b.getId()).collect(Collectors.toList());
+ Collections.shuffle(candidates);
+ if (number != -1) {
+ return candidates.subList(0,
number).stream().map(Backend::getId).collect(Collectors.toList());
+ } else {
+ return
candidates.stream().map(Backend::getId).collect(Collectors.toList());
+ }
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 64e92b35c83..d361777fdd5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -39,6 +39,7 @@ import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.persist.EditLog;
+import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
@@ -54,6 +55,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
@@ -153,12 +155,14 @@ public class RestoreJobTest {
new Expectations() {
{
-
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
(TStorageMedium) any,
- false, true);
+
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
+ Maps.newHashMap(), (TStorageMedium) any, false, true);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long>
selectBackendIdsForReplicaCreation(
- ReplicaAllocation replicaAlloc, String
clusterName, TStorageMedium medium) {
+ ReplicaAllocation replicaAlloc, Map<Tag, Integer>
nextIndexs,
+ TStorageMedium medium, boolean
isStorageMediumSpecified,
+ boolean isOnlyForCheck) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
index 2cfa4e9b90f..3bd00d2b73a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ModifyBackendTest.java
@@ -83,7 +83,7 @@ public class ModifyBackendTest {
CreateTableStmt createStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to
find enough backend, please check the replication num,replication tag and
storage medium.\n"
+ "Create failed replications:\n"
- + "replication tag: {\"location\" : \"default\"},
replication num: 1, storage medium: SSD",
+ + "replication tag: {\"location\" : \"default\"},
replication num: 1, storage medium: HDD",
() -> DdlExecutor.execute(Env.getCurrentEnv(), createStmt));
createStr = "create table test.tbl1(\n" + "k1 int\n" + ") distributed
by hash(k1)\n" + "buckets 3 properties(\n"
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java
index 14367ea731e..971abe9b803 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.resource.Tag;
@@ -52,11 +53,12 @@ public class ReplicaAllocationTest {
public void setUp() throws DdlException {
new Expectations() {
{
-
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
(TStorageMedium) any, false, true);
+
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
Maps.newHashMap(),
+ (TStorageMedium) any, false, true);
minTimes = 0;
result = new Delegate() {
- Map<Tag, List<Long>> selectBackendIdsForReplicaCreation() {
- return Maps.newHashMap();
+ Pair<Map<Tag, List<Long>>, TStorageMedium>
selectBackendIdsForReplicaCreation() {
+ return Pair.of(Maps.newHashMap(), TStorageMedium.HDD);
}
};
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index bf57f21f02f..61228c821a8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -150,7 +150,7 @@ public class CanalSyncDataTest {
result = execPlanFragmentParams;
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
- (TStorageMedium) any, false, true);
+ Maps.newHashMap(), (TStorageMedium) any, false, true);
minTimes = 0;
result = backendIds;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index d207e0ce2a4..9578ed1c7ff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.Pair;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService.HostInfo;
@@ -403,8 +404,9 @@ public class SystemInfoServiceTest {
// also check if the random selection logic can evenly distribute the
replica.
Map<Long, Integer> beCounterMap = Maps.newHashMap();
for (int i = 0; i < 10000; ++i) {
- Map<Tag, List<Long>> res =
infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
- TStorageMedium.HDD, false, false);
+ Pair<Map<Tag, List<Long>>, TStorageMedium> ret =
infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
+ Maps.newHashMap(), TStorageMedium.HDD, false, false);
+ Map<Tag, List<Long>> res = ret.first;
Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]