This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b8a337468c3 [feature](merge-cloud) Add drop cloud table (#30032)
b8a337468c3 is described below

commit b8a337468c3063854e0df43bfb01e93e214fc3c2
Author: yujun <[email protected]>
AuthorDate: Wed Jan 17 09:02:37 2024 +0800

    [feature](merge-cloud) Add drop cloud table (#30032)
    
    Co-authored-by: Luwei <[email protected]>
    Co-authored-by: deardeng <[email protected]>
    Co-authored-by: Gavin Chou <[email protected]>
    Co-authored-by: Lightman <[email protected]>
    Co-authored-by: zhengyu <[email protected]>
    Co-authored-by: Lei Zhang <[email protected]>
    Co-authored-by: AlexYue <[email protected]>
    Co-authored-by: Xiaocc <[email protected]>
    Co-authored-by: panDing19 <[email protected]>
    Co-authored-by: plat1ko <[email protected]>
    Co-authored-by: zhangdong <[email protected]>
    Co-authored-by: walter <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../main/java/org/apache/doris/catalog/Env.java    |  29 +---
 .../org/apache/doris/catalog/TempPartitions.java   |   8 +-
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   5 +
 .../cloud/datasource/CloudInternalCatalog.java     | 163 ++++++++++++++++++++-
 .../apache/doris/datasource/InternalCatalog.java   |  58 +++++++-
 6 files changed, 228 insertions(+), 38 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2a8a2ed53ad..baf46177bc1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2463,6 +2463,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int sts_duration = 3600;
 
+    @ConfField(mutable = true)
+    public static int drop_rpc_retry_num = 200;
+
     @ConfField
     public static int cloud_meta_service_rpc_failed_retry_times = 200;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b72544365aa..222eb195b34 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -251,7 +251,6 @@ import org.apache.doris.system.SystemInfoService.HostInfo;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.CompactionTask;
-import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.task.PriorityMasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
@@ -5684,33 +5683,11 @@ public class Env {
             }
         }
 
-        if (!isReplay && !Env.isCheckpointThread()) {
-            // drop all replicas
-            AgentBatchTask batchTask = new AgentBatchTask();
-            for (Partition partition : olapTable.getAllPartitions()) {
-                List<MaterializedIndex> allIndices = 
partition.getMaterializedIndices(IndexExtState.ALL);
-                for (MaterializedIndex materializedIndex : allIndices) {
-                    long indexId = materializedIndex.getId();
-                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-                    for (Tablet tablet : materializedIndex.getTablets()) {
-                        long tabletId = tablet.getId();
-                        List<Replica> replicas = tablet.getReplicas();
-                        for (Replica replica : replicas) {
-                            long backendId = replica.getBackendId();
-                            long replicaId = replica.getId();
-                            DropReplicaTask dropTask = new 
DropReplicaTask(backendId, tabletId,
-                                    replicaId, schemaHash, true);
-                            batchTask.addTask(dropTask);
-                        } // end for replicas
-                    } // end for tablets
-                } // end for indices
-            } // end for partitions
-            AgentTaskExecutor.submit(batchTask);
-        }
-
         // TODO: does checkpoint need update colocate index ?
         // colocation
         Env.getCurrentColocateIndex().removeTable(olapTable.getId());
+
+        getInternalCatalog().eraseTableDropBackendReplicas(olapTable, 
isReplay);
     }
 
     public void onErasePartition(Partition partition) {
@@ -5721,6 +5698,8 @@ public class Env {
                 invertedIndex.deleteTablet(tablet.getId());
             }
         }
+
+        
getInternalCatalog().erasePartitionDropBackendReplicas(Lists.newArrayList(partition));
     }
 
     public void cleanTrash(AdminCleanTrashStmt stmt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
index 9cd2d61bf91..aebcae2e51d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -78,12 +77,7 @@ public class TempPartitions implements Writable, 
GsonPostProcessable {
             idToPartition.remove(partition.getId());
             nameToPartition.remove(partitionName);
             if (needDropTablet) {
-                TabletInvertedIndex invertedIndex = 
Env.getCurrentInvertedIndex();
-                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
-                    for (Tablet tablet : index.getTablets()) {
-                        invertedIndex.deleteTablet(tablet.getId());
-                    }
-                }
+                Env.getCurrentEnv().onErasePartition(partition);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 8d49783f3ca..85162fec825 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -19,8 +19,13 @@ package org.apache.doris.cloud.catalog;
 
 import org.apache.doris.catalog.Env;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 public class CloudEnv extends Env {
 
+    private static final Logger LOG = LogManager.getLogger(CloudEnv.class);
+
     public CloudEnv(boolean isCheckpointCatalog) {
         super(isCheckpointCatalog);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 13f04fd36df..f2e3f10c280 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -27,9 +27,11 @@ import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
 import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
@@ -54,6 +56,7 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TTabletType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import doris.segment_v2.SegmentV2;
 import org.apache.logging.log4j.LogManager;
@@ -498,12 +501,164 @@ public class CloudInternalCatalog extends 
InternalCatalog {
 
     // END CREATE TABLE
 
+    // BEGIN DROP TABLE
+
+    @Override
+    public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean 
isReplay) {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+
+        List<Long> indexs = Lists.newArrayList();
+        for (Partition partition : olapTable.getAllPartitions()) {
+            List<MaterializedIndex> allIndices = 
partition.getMaterializedIndices(IndexExtState.ALL);
+            for (MaterializedIndex materializedIndex : allIndices) {
+                long indexId = materializedIndex.getId();
+                indexs.add(indexId);
+            }
+        }
+
+        int tryCnt = 0;
+        while (true) {
+            if (tryCnt++ > Config.drop_rpc_retry_num) {
+                LOG.warn("failed to drop index {} of table {}, try cnt {} 
reaches maximum retry count",
+                            indexs, olapTable.getId(), tryCnt);
+                break;
+            }
+
+            try {
+                if (indexs.isEmpty()) {
+                    break;
+                }
+                dropMaterializedIndex(olapTable.getId(), indexs);
+            } catch (Exception e) {
+                LOG.warn("failed to drop index {} of table {}, try cnt {}, 
execption {}",
+                        indexs, olapTable.getId(), tryCnt, e);
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException ie) {
+                    LOG.warn("Thread sleep is interrupted");
+                }
+                continue;
+            }
+            break;
+        }
+    }
+
+    @Override
+    public void erasePartitionDropBackendReplicas(List<Partition> partitions) {
+        if (!Env.getCurrentEnv().isMaster() || partitions.isEmpty()) {
+            return;
+        }
+
+        long tableId = -1;
+        List<Long> partitionIds = Lists.newArrayList();
+        List<Long> indexIds = Lists.newArrayList();
+        for (Partition partition : partitions) {
+            for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
+                indexIds.add(index.getId());
+                if (tableId == -1) {
+                    tableId = ((CloudReplica) 
index.getTablets().get(0).getReplicas().get(0)).getTableId();
+                }
+            }
+            partitionIds.add(partition.getId());
+        }
+
+        CloudPartition partition0 = (CloudPartition) partitions.get(0);
+
+        int tryCnt = 0;
+        while (true) {
+            if (tryCnt++ > Config.drop_rpc_retry_num) {
+                LOG.warn("failed to drop partition {} of table {}, try cnt {} 
reaches maximum retry count",
+                        partitionIds, tableId, tryCnt);
+                break;
+            }
+            try {
+                dropCloudPartition(partition0.getDbId(), tableId, 
partitionIds, indexIds);
+            } catch (Exception e) {
+                LOG.warn("failed to drop partition {} of table {}, try cnt {}, 
execption {}",
+                        partitionIds, tableId, tryCnt, e);
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException ie) {
+                    LOG.warn("Thread sleep is interrupted");
+                }
+                continue;
+            }
+            break;
+        }
+    }
+
+    private void dropCloudPartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+        Cloud.PartitionRequest.Builder partitionRequestBuilder =
+                Cloud.PartitionRequest.newBuilder();
+        partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        partitionRequestBuilder.setTableId(tableId);
+        partitionRequestBuilder.addAllPartitionIds(partitionIds);
+        partitionRequestBuilder.addAllIndexIds(indexIds);
+        if (dbId > 0) {
+            partitionRequestBuilder.setDbId(dbId);
+        }
+        final Cloud.PartitionRequest partitionRequest = 
partitionRequestBuilder.build();
+
+        Cloud.PartitionResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().dropPartition(partitionRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, dropPartition RpcException", tryTimes, 
e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("dropPartition response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    private void dropMaterializedIndex(Long tableId, List<Long> indexIds) 
throws DdlException {
+        Cloud.IndexRequest.Builder indexRequestBuilder = 
Cloud.IndexRequest.newBuilder();
+        indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        indexRequestBuilder.addAllIndexIds(indexIds);
+        indexRequestBuilder.setTableId(tableId);
+        final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+        Cloud.IndexResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().dropIndex(indexRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, dropIndex RpcException", tryTimes, e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("dropIndex response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    // END DROP TABLE
+
     @Override
     protected void checkAvailableCapacity(Database db) throws DdlException {
-        // check cluster capacity
-        Env.getCurrentSystemInfo().checkAvailableCapacity();
-        // check db quota
-        db.checkQuota();
     }
 
     private void sleepSeveralMs() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 4671daba0cb..cf270a2c091 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -156,6 +156,7 @@ import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageFormat;
@@ -965,6 +966,38 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
     }
 
+    public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean 
isReplay) {
+        if (isReplay || Env.isCheckpointThread()) {
+            return;
+        }
+
+        // drop all replicas
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Partition partition : olapTable.getAllPartitions()) {
+            List<MaterializedIndex> allIndices = 
partition.getMaterializedIndices(IndexExtState.ALL);
+            for (MaterializedIndex materializedIndex : allIndices) {
+                long indexId = materializedIndex.getId();
+                int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+                for (Tablet tablet : materializedIndex.getTablets()) {
+                    long tabletId = tablet.getId();
+                    List<Replica> replicas = tablet.getReplicas();
+                    for (Replica replica : replicas) {
+                        long backendId = replica.getBackendId();
+                        long replicaId = replica.getId();
+                        DropReplicaTask dropTask = new 
DropReplicaTask(backendId, tabletId,
+                                replicaId, schemaHash, true);
+                        batchTask.addTask(dropTask);
+                    } // end for replicas
+                } // end for tablets
+            } // end for indices
+        } // end for partitions
+        AgentTaskExecutor.submit(batchTask);
+    }
+
+    public void erasePartitionDropBackendReplicas(List<Partition> partitions) {
+        // no need send be delete task, when be report its tablets, fe will 
send delete task then.
+    }
+
     private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo 
info) {
         LOG.debug("replay add a replica {}", info);
         Partition partition = olapTable.getPartition(info.getPartitionId());
@@ -3085,6 +3118,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         // Things may be changed outside the table lock.
         olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId());
         olapTable.writeLockOrDdlException();
+        List<Partition> oldPartitions = Lists.newArrayList();
         try {
             olapTable.checkNormalStateForAlter();
             // check partitions
@@ -3141,7 +3175,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             }
 
             // replace
-            truncateTableInternal(olapTable, newPartitions, 
truncateEntireTable);
+            oldPartitions = truncateTableInternal(olapTable, newPartitions, 
truncateEntireTable);
 
             // write edit log
             TruncateTableInfo info =
@@ -3152,6 +3186,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         } finally {
             olapTable.writeUnlock();
         }
+
+        erasePartitionDropBackendReplicas(oldPartitions);
+
         if (truncateEntireTable) {
             // Drop the whole table stats after truncate the entire table
             Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
@@ -3162,11 +3199,14 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         LOG.info("finished to truncate table {}, partitions: {}", 
tblRef.getName().toSql(), tblRef.getPartitionNames());
     }
 
-    private void truncateTableInternal(OlapTable olapTable, List<Partition> 
newPartitions, boolean isEntireTable) {
+    private List<Partition> truncateTableInternal(OlapTable olapTable, 
List<Partition> newPartitions,
+            boolean isEntireTable) {
         // use new partitions to replace the old ones.
+        List<Partition> oldPartitions = Lists.newArrayList();
         Set<Long> oldTabletIds = Sets.newHashSet();
         for (Partition newPartition : newPartitions) {
             Partition oldPartition = olapTable.replacePartition(newPartition);
+            oldPartitions.add(oldPartition);
             // save old tablets to be removed
             for (MaterializedIndex index : 
oldPartition.getMaterializedIndices(IndexExtState.ALL)) {
                 index.getTablets().forEach(t -> {
@@ -3176,6 +3216,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
 
         if (isEntireTable) {
+            Set<Long> oldPartitionsIds = 
oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet());
+            for (Partition partition : olapTable.getTempPartitions()) {
+                if (!oldPartitionsIds.contains(partition.getId())) {
+                    oldPartitions.add(partition);
+                }
+            }
             // drop all temp partitions
             olapTable.dropAllTempPartitions();
         }
@@ -3184,9 +3230,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         for (Long tabletId : oldTabletIds) {
             Env.getCurrentInvertedIndex().deleteTablet(tabletId);
         }
+
+        return oldPartitions;
     }
 
     public void replayTruncateTable(TruncateTableInfo info) throws 
MetaNotFoundException {
+        List<Partition> oldPartitions = Lists.newArrayList();
         Database db = (Database) getDbOrMetaException(info.getDbId());
         OlapTable olapTable = (OlapTable) 
db.getTableOrMetaException(info.getTblId(), TableType.OLAP);
         olapTable.writeLock();
@@ -3196,6 +3245,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             // add tablet to inverted index
             TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
             for (Partition partition : info.getPartitions()) {
+                oldPartitions.add(partition);
                 long partitionId = partition.getId();
                 TStorageMedium medium = 
olapTable.getPartitionInfo().getDataProperty(partitionId)
                         .getStorageMedium();
@@ -3216,6 +3266,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         } finally {
             olapTable.writeUnlock();
         }
+
+        if (!Env.isCheckpointThread()) {
+            erasePartitionDropBackendReplicas(oldPartitions);
+        }
     }
 
     public void replayAlterExternalTableSchema(String dbName, String 
tableName, List<Column> newSchema)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to