This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b8a337468c3 [feature](merge-cloud) Add drop cloud table (#30032)
b8a337468c3 is described below
commit b8a337468c3063854e0df43bfb01e93e214fc3c2
Author: yujun <[email protected]>
AuthorDate: Wed Jan 17 09:02:37 2024 +0800
[feature](merge-cloud) Add drop cloud table (#30032)
Co-authored-by: Luwei <[email protected]>
Co-authored-by: deardeng <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
Co-authored-by: Lightman <[email protected]>
Co-authored-by: zhengyu <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: AlexYue <[email protected]>
Co-authored-by: Xiaocc <[email protected]>
Co-authored-by: panDing19 <[email protected]>
Co-authored-by: plat1ko <[email protected]>
Co-authored-by: zhangdong <[email protected]>
Co-authored-by: walter <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 3 +
.../main/java/org/apache/doris/catalog/Env.java | 29 +---
.../org/apache/doris/catalog/TempPartitions.java | 8 +-
.../org/apache/doris/cloud/catalog/CloudEnv.java | 5 +
.../cloud/datasource/CloudInternalCatalog.java | 163 ++++++++++++++++++++-
.../apache/doris/datasource/InternalCatalog.java | 58 +++++++-
6 files changed, 228 insertions(+), 38 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2a8a2ed53ad..baf46177bc1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2463,6 +2463,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int sts_duration = 3600;
+ @ConfField(mutable = true)
+ public static int drop_rpc_retry_num = 200;
+
@ConfField
public static int cloud_meta_service_rpc_failed_retry_times = 200;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b72544365aa..222eb195b34 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -251,7 +251,6 @@ import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.CompactionTask;
-import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
@@ -5684,33 +5683,11 @@ public class Env {
}
}
- if (!isReplay && !Env.isCheckpointThread()) {
- // drop all replicas
- AgentBatchTask batchTask = new AgentBatchTask();
- for (Partition partition : olapTable.getAllPartitions()) {
- List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
- for (MaterializedIndex materializedIndex : allIndices) {
- long indexId = materializedIndex.getId();
- int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
- for (Tablet tablet : materializedIndex.getTablets()) {
- long tabletId = tablet.getId();
- List<Replica> replicas = tablet.getReplicas();
- for (Replica replica : replicas) {
- long backendId = replica.getBackendId();
- long replicaId = replica.getId();
- DropReplicaTask dropTask = new
DropReplicaTask(backendId, tabletId,
- replicaId, schemaHash, true);
- batchTask.addTask(dropTask);
- } // end for replicas
- } // end for tablets
- } // end for indices
- } // end for partitions
- AgentTaskExecutor.submit(batchTask);
- }
-
// TODO: does checkpoint need update colocate index ?
// colocation
Env.getCurrentColocateIndex().removeTable(olapTable.getId());
+
+ getInternalCatalog().eraseTableDropBackendReplicas(olapTable,
isReplay);
}
public void onErasePartition(Partition partition) {
@@ -5721,6 +5698,8 @@ public class Env {
invertedIndex.deleteTablet(tablet.getId());
}
}
+
+
getInternalCatalog().erasePartitionDropBackendReplicas(Lists.newArrayList(partition));
}
public void cleanTrash(AdminCleanTrashStmt stmt) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
index 9cd2d61bf91..aebcae2e51d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
@@ -17,7 +17,6 @@
package org.apache.doris.catalog;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -78,12 +77,7 @@ public class TempPartitions implements Writable,
GsonPostProcessable {
idToPartition.remove(partition.getId());
nameToPartition.remove(partitionName);
if (needDropTablet) {
- TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
- for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
- for (Tablet tablet : index.getTablets()) {
- invertedIndex.deleteTablet(tablet.getId());
- }
- }
+ Env.getCurrentEnv().onErasePartition(partition);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 8d49783f3ca..85162fec825 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -19,8 +19,13 @@ package org.apache.doris.cloud.catalog;
import org.apache.doris.catalog.Env;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
public class CloudEnv extends Env {
+ private static final Logger LOG = LogManager.getLogger(CloudEnv.class);
+
public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 13f04fd36df..f2e3f10c280 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -27,9 +27,11 @@ import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
@@ -54,6 +56,7 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTabletType;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import doris.segment_v2.SegmentV2;
import org.apache.logging.log4j.LogManager;
@@ -498,12 +501,164 @@ public class CloudInternalCatalog extends
InternalCatalog {
// END CREATE TABLE
+ // BEGIN DROP TABLE
+
+ @Override
+ public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean
isReplay) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+
+ List<Long> indexs = Lists.newArrayList();
+ for (Partition partition : olapTable.getAllPartitions()) {
+ List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
+ for (MaterializedIndex materializedIndex : allIndices) {
+ long indexId = materializedIndex.getId();
+ indexs.add(indexId);
+ }
+ }
+
+ int tryCnt = 0;
+ while (true) {
+ if (tryCnt++ > Config.drop_rpc_retry_num) {
+ LOG.warn("failed to drop index {} of table {}, try cnt {}
reaches maximum retry count",
+ indexs, olapTable.getId(), tryCnt);
+ break;
+ }
+
+ try {
+ if (indexs.isEmpty()) {
+ break;
+ }
+ dropMaterializedIndex(olapTable.getId(), indexs);
+ } catch (Exception e) {
+ LOG.warn("failed to drop index {} of table {}, try cnt {},
execption {}",
+ indexs, olapTable.getId(), tryCnt, e);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread sleep is interrupted");
+ }
+ continue;
+ }
+ break;
+ }
+ }
+
+ @Override
+ public void erasePartitionDropBackendReplicas(List<Partition> partitions) {
+ if (!Env.getCurrentEnv().isMaster() || partitions.isEmpty()) {
+ return;
+ }
+
+ long tableId = -1;
+ List<Long> partitionIds = Lists.newArrayList();
+ List<Long> indexIds = Lists.newArrayList();
+ for (Partition partition : partitions) {
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.ALL)) {
+ indexIds.add(index.getId());
+ if (tableId == -1) {
+ tableId = ((CloudReplica)
index.getTablets().get(0).getReplicas().get(0)).getTableId();
+ }
+ }
+ partitionIds.add(partition.getId());
+ }
+
+ CloudPartition partition0 = (CloudPartition) partitions.get(0);
+
+ int tryCnt = 0;
+ while (true) {
+ if (tryCnt++ > Config.drop_rpc_retry_num) {
+ LOG.warn("failed to drop partition {} of table {}, try cnt {}
reaches maximum retry count",
+ partitionIds, tableId, tryCnt);
+ break;
+ }
+ try {
+ dropCloudPartition(partition0.getDbId(), tableId,
partitionIds, indexIds);
+ } catch (Exception e) {
+ LOG.warn("failed to drop partition {} of table {}, try cnt {},
execption {}",
+ partitionIds, tableId, tryCnt, e);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread sleep is interrupted");
+ }
+ continue;
+ }
+ break;
+ }
+ }
+
+ private void dropCloudPartition(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ Cloud.PartitionRequest.Builder partitionRequestBuilder =
+ Cloud.PartitionRequest.newBuilder();
+ partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ partitionRequestBuilder.setTableId(tableId);
+ partitionRequestBuilder.addAllPartitionIds(partitionIds);
+ partitionRequestBuilder.addAllIndexIds(indexIds);
+ if (dbId > 0) {
+ partitionRequestBuilder.setDbId(dbId);
+ }
+ final Cloud.PartitionRequest partitionRequest =
partitionRequestBuilder.build();
+
+ Cloud.PartitionResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().dropPartition(partitionRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, dropPartition RpcException", tryTimes,
e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("dropPartition response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ private void dropMaterializedIndex(Long tableId, List<Long> indexIds)
throws DdlException {
+ Cloud.IndexRequest.Builder indexRequestBuilder =
Cloud.IndexRequest.newBuilder();
+ indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ indexRequestBuilder.addAllIndexIds(indexIds);
+ indexRequestBuilder.setTableId(tableId);
+ final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+ Cloud.IndexResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().dropIndex(indexRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, dropIndex RpcException", tryTimes, e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("dropIndex response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ // END DROP TABLE
+
@Override
protected void checkAvailableCapacity(Database db) throws DdlException {
- // check cluster capacity
- Env.getCurrentSystemInfo().checkAvailableCapacity();
- // check db quota
- db.checkQuota();
}
private void sleepSeveralMs() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 4671daba0cb..cf270a2c091 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -156,6 +156,7 @@ import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageFormat;
@@ -965,6 +966,38 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
+ public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean
isReplay) {
+ if (isReplay || Env.isCheckpointThread()) {
+ return;
+ }
+
+ // drop all replicas
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Partition partition : olapTable.getAllPartitions()) {
+ List<MaterializedIndex> allIndices =
partition.getMaterializedIndices(IndexExtState.ALL);
+ for (MaterializedIndex materializedIndex : allIndices) {
+ long indexId = materializedIndex.getId();
+ int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+ for (Tablet tablet : materializedIndex.getTablets()) {
+ long tabletId = tablet.getId();
+ List<Replica> replicas = tablet.getReplicas();
+ for (Replica replica : replicas) {
+ long backendId = replica.getBackendId();
+ long replicaId = replica.getId();
+ DropReplicaTask dropTask = new
DropReplicaTask(backendId, tabletId,
+ replicaId, schemaHash, true);
+ batchTask.addTask(dropTask);
+ } // end for replicas
+ } // end for tablets
+ } // end for indices
+ } // end for partitions
+ AgentTaskExecutor.submit(batchTask);
+ }
+
+ public void erasePartitionDropBackendReplicas(List<Partition> partitions) {
+ // no need send be delete task, when be report its tablets, fe will
send delete task then.
+ }
+
private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo
info) {
LOG.debug("replay add a replica {}", info);
Partition partition = olapTable.getPartition(info.getPartitionId());
@@ -3085,6 +3118,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// Things may be changed outside the table lock.
olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
olapTable.writeLockOrDdlException();
+ List<Partition> oldPartitions = Lists.newArrayList();
try {
olapTable.checkNormalStateForAlter();
// check partitions
@@ -3141,7 +3175,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
// replace
- truncateTableInternal(olapTable, newPartitions,
truncateEntireTable);
+ oldPartitions = truncateTableInternal(olapTable, newPartitions,
truncateEntireTable);
// write edit log
TruncateTableInfo info =
@@ -3152,6 +3186,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
} finally {
olapTable.writeUnlock();
}
+
+ erasePartitionDropBackendReplicas(oldPartitions);
+
if (truncateEntireTable) {
// Drop the whole table stats after truncate the entire table
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
@@ -3162,11 +3199,14 @@ public class InternalCatalog implements
CatalogIf<Database> {
LOG.info("finished to truncate table {}, partitions: {}",
tblRef.getName().toSql(), tblRef.getPartitionNames());
}
- private void truncateTableInternal(OlapTable olapTable, List<Partition>
newPartitions, boolean isEntireTable) {
+ private List<Partition> truncateTableInternal(OlapTable olapTable,
List<Partition> newPartitions,
+ boolean isEntireTable) {
// use new partitions to replace the old ones.
+ List<Partition> oldPartitions = Lists.newArrayList();
Set<Long> oldTabletIds = Sets.newHashSet();
for (Partition newPartition : newPartitions) {
Partition oldPartition = olapTable.replacePartition(newPartition);
+ oldPartitions.add(oldPartition);
// save old tablets to be removed
for (MaterializedIndex index :
oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
index.getTablets().forEach(t -> {
@@ -3176,6 +3216,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
if (isEntireTable) {
+ Set<Long> oldPartitionsIds =
oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet());
+ for (Partition partition : olapTable.getTempPartitions()) {
+ if (!oldPartitionsIds.contains(partition.getId())) {
+ oldPartitions.add(partition);
+ }
+ }
// drop all temp partitions
olapTable.dropAllTempPartitions();
}
@@ -3184,9 +3230,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (Long tabletId : oldTabletIds) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
+
+ return oldPartitions;
}
public void replayTruncateTable(TruncateTableInfo info) throws
MetaNotFoundException {
+ List<Partition> oldPartitions = Lists.newArrayList();
Database db = (Database) getDbOrMetaException(info.getDbId());
OlapTable olapTable = (OlapTable)
db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
olapTable.writeLock();
@@ -3196,6 +3245,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// add tablet to inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : info.getPartitions()) {
+ oldPartitions.add(partition);
long partitionId = partition.getId();
TStorageMedium medium =
olapTable.getPartitionInfo().getDataProperty(partitionId)
.getStorageMedium();
@@ -3216,6 +3266,10 @@ public class InternalCatalog implements
CatalogIf<Database> {
} finally {
olapTable.writeUnlock();
}
+
+ if (!Env.isCheckpointThread()) {
+ erasePartitionDropBackendReplicas(oldPartitions);
+ }
}
public void replayAlterExternalTableSchema(String dbName, String
tableName, List<Column> newSchema)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]