This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/master by this push:
     new 9e3a75b7484 [feature](cloud) multi cloud cluster (#31749)
9e3a75b7484 is described below

commit 9e3a75b74847d867fce9991f04de4b190d747008
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sun Mar 17 23:54:03 2024 +0800

    [feature](cloud) multi cloud cluster (#31749)
---
 be/src/io/fs/multi_table_pipe.cpp                  |   2 +
 .../routine_load/routine_load_task_executor.cpp    |   6 +
 be/src/runtime/stream_load/stream_load_context.h   |   4 +
 .../java/org/apache/doris/analysis/LoadStmt.java   |   2 +
 .../apache/doris/analysis/NativeInsertStmt.java    |   5 +-
 .../apache/doris/analysis/SetUserPropertyVar.java  |  11 ++
 .../main/java/org/apache/doris/catalog/Env.java    |   4 +-
 .../java/org/apache/doris/catalog/EnvFactory.java  |  23 +++
 .../java/org/apache/doris/catalog/OlapTable.java   |  19 +++
 .../main/java/org/apache/doris/catalog/Table.java  |   1 +
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   2 +-
 .../doris/cloud/catalog/CloudEnvFactory.java       |  30 ++++
 .../doris/cloud/load/CloudBrokerLoadJob.java       | 181 +++++++++++++++++++++
 .../apache/doris/cloud/load/CloudLoadManager.java  |  49 ++++++
 .../doris/cloud/load/CloudRoutineLoadManager.java  |  66 ++++++++
 .../cloud/planner/CloudGroupCommitPlanner.java     |  81 +++++++++
 .../apache/doris/cloud/qe/CloudCoordinator.java    |   4 +-
 .../doris/cloud/system/CloudSystemInfoService.java |  43 ++++-
 .../transaction/CloudGlobalTransactionMgr.java     |   2 +-
 .../java/org/apache/doris/common/ErrorCode.java    |   5 +-
 .../apache/doris/datasource/ExternalScanNode.java  |   3 +-
 .../doris/datasource/FederationBackendPolicy.java  |   6 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 139 ++++++++++++++--
 .../doris/httpv2/rest/manager/ClusterAction.java   |  37 +++++
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  24 ++-
 .../org/apache/doris/load/loadv2/JobState.java     |   3 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   5 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   1 +
 .../org/apache/doris/load/loadv2/LoadTask.java     |  23 +++
 .../apache/doris/load/loadv2/MysqlLoadManager.java |  11 ++
 .../doris/load/routineload/KafkaTaskInfo.java      |   2 +
 .../doris/load/routineload/RoutineLoadJob.java     |  64 ++++++++
 .../doris/load/routineload/RoutineLoadManager.java |   5 +-
 .../org/apache/doris/mysql/privilege/Auth.java     |   4 +-
 .../plans/commands/insert/GroupCommitInserter.java |   4 +-
 .../apache/doris/planner/GroupCommitPlanner.java   |  59 ++++---
 .../org/apache/doris/planner/OlapScanNode.java     |  13 +-
 .../org/apache/doris/planner/OlapTableSink.java    |  12 +-
 .../org/apache/doris/plugin/audit/AuditEvent.java  |   7 +
 .../java/org/apache/doris/qe/AuditLogHelper.java   |   4 +
 .../java/org/apache/doris/qe/ConnectContext.java   |  24 ++-
 .../main/java/org/apache/doris/qe/Coordinator.java |   5 +-
 .../java/org/apache/doris/qe/MasterOpExecutor.java |   6 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 ++
 .../java/org/apache/doris/qe/ShowExecutor.java     |   7 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 179 +++++++++++++++++++-
 .../apache/doris/service/FrontendServiceImpl.java  |  82 ++++++++++
 .../main/java/org/apache/doris/system/Backend.java |  22 +++
 .../org/apache/doris/system/BackendHbResponse.java |  15 +-
 .../org/apache/doris/system/BeSelectionPolicy.java |   4 +-
 .../java/org/apache/doris/system/HeartbeatMgr.java |   7 +-
 .../org/apache/doris/system/SystemInfoService.java |  27 ++-
 .../load/routineload/RoutineLoadManagerTest.java   |   6 +-
 .../load/routineload/RoutineLoadSchedulerTest.java |   4 +-
 .../apache/doris/system/SystemInfoServiceTest.java |   2 +-
 .../doris/utframe/DemoMultiBackendsTest.java       |   3 +-
 gensrc/thrift/BackendService.thrift                |   2 +
 .../cloud_p0/conf/regression-conf-custom.groovy    |  15 +-
 .../pipeline/p0/conf/regression-conf.groovy        |   6 +-
 regression-test/plugins/plugin_cluster.groovy      | 180 ++++++++++++++++++++
 60 files changed, 1476 insertions(+), 100 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 916f8151739..6b41bf6988b 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -165,6 +165,8 @@ Status MultiTablePipe::request_and_exec_plans() {
     request.fileType = TFileType::FILE_STREAM;
     request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
     request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
+    request.__set_user(_ctx->qualified_user);
+    request.__set_cloud_cluster(_ctx->cloud_cluster);
     // no need to register new_load_stream_mgr coz it is already done in 
routineload submit task
 
     // plan this load
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d22f2bb4a8c..3e5eb48afbc 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -216,6 +216,12 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     if (task.__isset.memtable_on_sink_node) {
         ctx->memtable_on_sink_node = task.memtable_on_sink_node;
     }
+    if (task.__isset.qualified_user) {
+        ctx->qualified_user = task.qualified_user;
+    }
+    if (task.__isset.cloud_cluster) {
+        ctx->cloud_cluster = task.cloud_cluster;
+    }
 
     // set execute plan params (only for non-single-stream-multi-table load)
     TStreamLoadPutResult put_result;
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 376228e022d..1461e863d5d 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -236,6 +236,10 @@ public:
 
     bool memtable_on_sink_node = false;
 
+    // use for cloud cluster mode
+    std::string qualified_user;
+    std::string cloud_cluster;
+
 public:
     ExecEnv* exec_env() { return _exec_env; }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 01a4490003b..19b72129516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -125,6 +125,8 @@ public class LoadStmt extends DdlStmt {
 
     public static final String KEY_COMMENT = "comment";
 
+    public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
+
     public static final String KEY_ENCLOSE = "enclose";
 
     public static final String KEY_ESCAPE = "escape";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 193ce3abc80..3f367464441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.JdbcTable;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MysqlTable;
@@ -1247,8 +1248,8 @@ public class NativeInsertStmt extends InsertStmt {
                 this.analyzer = analyzerTmp;
             }
             analyzeSubquery(analyzer, true);
-            groupCommitPlanner = new GroupCommitPlanner((Database) db, 
olapTable, targetColumnNames, queryId,
-                    
ConnectContext.get().getSessionVariable().getGroupCommit());
+            groupCommitPlanner = 
EnvFactory.getInstance().createGroupCommitPlanner((Database) db, olapTable,
+                    targetColumnNames, queryId, 
ConnectContext.get().getSessionVariable().getGroupCommit());
             // save plan message to be reused for prepare stmt
             loadId = queryId;
             baseSchemaVersion = olapTable.getBaseSchemaVersion();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
index 37f3318a89e..a2f31818b8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java
@@ -18,7 +18,9 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -82,6 +84,15 @@ public class SetUserPropertyVar extends SetVar {
                     
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
                             "GRANT");
                 }
+                if (Config.isCloudMode()) {
+                    // check value, clusterName is valid.
+                    if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER)
+                            && !Strings.isNullOrEmpty(value)
+                            && !((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                                    .getCloudClusterNames().contains(value)) {
+                        
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
+                    }
+                }
                 return;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index a2386b61392..1a93efa0c5a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -637,7 +637,7 @@ public class Env {
     public Env(boolean isCheckpointCatalog) {
         this.catalogMgr = new CatalogMgr();
         this.load = new Load();
-        this.routineLoadManager = new RoutineLoadManager();
+        this.routineLoadManager = 
EnvFactory.getInstance().createRoutineLoadManager();
         this.groupCommitManager = new GroupCommitManager();
         this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
         this.exportMgr = new ExportMgr();
@@ -720,7 +720,7 @@ public class Env {
                 Config.async_loading_load_task_pool_size, LoadTask.COMPARATOR, 
LoadTask.class, !isCheckpointCatalog);
 
         this.loadJobScheduler = new LoadJobScheduler();
-        this.loadManager = new LoadManager(loadJobScheduler);
+        this.loadManager = 
EnvFactory.getInstance().createLoadManager(loadJobScheduler);
         this.progressManager = new ProgressManager();
         this.streamLoadRecordMgr = new 
StreamLoadRecordMgr("stream_load_record_manager",
                 Config.fetch_stream_load_record_interval_second * 1000L);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index a286cf33cea..34e9d5ef93b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -24,10 +24,15 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.cloud.catalog.CloudEnvFactory;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
@@ -39,10 +44,15 @@ import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.GlobalTransactionMgrIface;
 
+import org.apache.thrift.TException;
+
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
 
+// EnvFactory is responsed for create none-cloud object.
+// CloudEnvFactory is responsed for create cloud object.
+
 public class EnvFactory {
 
     public EnvFactory() {}
@@ -131,4 +141,17 @@ public class EnvFactory {
                                          String timezone, boolean 
loadZeroTolerance) {
         return new Coordinator(jobId, queryId, descTable, fragments, 
scanNodes, timezone, loadZeroTolerance);
     }
+
+    public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable 
table, List<String> targetColumnNames,
+            TUniqueId queryId, String groupCommit) throws UserException, 
TException {
+        return  new GroupCommitPlanner(db, table, targetColumnNames, queryId, 
groupCommit);
+    }
+
+    public RoutineLoadManager createRoutineLoadManager() {
+        return new RoutineLoadManager();
+    }
+
+    public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
+        return new LoadManager(loadJobScheduler);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0130b78a9b9..463b3b1f19d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -476,6 +476,25 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         return null;
     }
 
+    public List<Long> getAllTabletIds() {
+        List<Long> tabletIds = new ArrayList<>();
+        try {
+            rwLock.readLock().lock();
+            for (Partition partition : getPartitions()) {
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
+                    tabletIds.addAll(index.getTablets().stream()
+                                                        .map(tablet -> 
tablet.getId())
+                                                        
.collect(Collectors.toList()));
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("get all tablet ids failed {}", e.getMessage());
+        } finally {
+            rwLock.readLock().unlock();
+        }
+        return tabletIds;
+    }
+
     public Map<Long, MaterializedIndexMeta> getVisibleIndexIdToMeta() {
         Map<Long, MaterializedIndexMeta> visibleMVs = Maps.newHashMap();
         List<MaterializedIndex> mvs = getVisibleIndex();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 0c73a19c7d5..35f5b14efc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -87,6 +87,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
     // especially to reduce conflicts when obtaining delete bitmap update 
locks for
     // MoW table
     protected ReentrantLock commitLock;
+
     /*
      *  fullSchema and nameToColumn should contains all columns, both visible 
and shadow.
      *  eg. for OlapTable, when doing schema change, there will be some shadow 
columns which are not visible
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 00aef304cf3..19bd102c84c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -383,7 +383,7 @@ public class CloudEnv extends Env {
     public void changeCloudCluster(String clusterName, ConnectContext ctx) 
throws DdlException {
         checkCloudClusterPriv(clusterName);
         // TODO(merge-cloud): pick cloud auto start
-        // waitForAutoStart(clusterName);
+        CloudSystemInfoService.waitForAutoStart(clusterName);
         try {
             ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).addCloudCluster(clusterName, "");
         } catch (UserException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 8990f2cd966..c7fb81fa6c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
@@ -31,14 +33,22 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
 import org.apache.doris.cloud.load.CloudBrokerLoadJob;
+import org.apache.doris.cloud.load.CloudLoadManager;
+import org.apache.doris.cloud.load.CloudRoutineLoadManager;
+import org.apache.doris.cloud.planner.CloudGroupCommitPlanner;
 import org.apache.doris.cloud.qe.CloudCoordinator;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
@@ -49,6 +59,8 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.GlobalTransactionMgrIface;
 
+import org.apache.thrift.TException;
+
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
@@ -139,14 +151,32 @@ public class CloudEnvFactory extends EnvFactory {
         return new CloudBrokerLoadJob();
     }
 
+    @Override
     public Coordinator createCoordinator(ConnectContext context, Analyzer 
analyzer, Planner planner,
                                          StatsErrorEstimator 
statsErrorEstimator) {
         return new CloudCoordinator(context, analyzer, planner, 
statsErrorEstimator);
     }
 
+    @Override
     public Coordinator createCoordinator(Long jobId, TUniqueId queryId, 
DescriptorTable descTable,
                                          List<PlanFragment> fragments, 
List<ScanNode> scanNodes,
                                          String timezone, boolean 
loadZeroTolerance) {
         return new CloudCoordinator(jobId, queryId, descTable, fragments, 
scanNodes, timezone, loadZeroTolerance);
     }
+
+    @Override
+    public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable 
table, List<String> targetColumnNames,
+            TUniqueId queryId, String groupCommit) throws UserException, 
TException {
+        return  new CloudGroupCommitPlanner(db, table, targetColumnNames, 
queryId, groupCommit);
+    }
+
+    @Override
+    public RoutineLoadManager createRoutineLoadManager() {
+        return new CloudRoutineLoadManager();
+    }
+
+    @Override
+    public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
+        return new CloudLoadManager(loadJobScheduler);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 52b52fd70ae..814e6020362 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -22,25 +22,39 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
 import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
+import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadLoadingTask;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class CloudBrokerLoadJob extends BrokerLoadJob {
     private static final Logger LOG = 
LogManager.getLogger(CloudBrokerLoadJob.class);
@@ -48,6 +62,8 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
     protected static final String CLOUD_CLUSTER_ID = "clusterId";
     protected String cloudClusterId;
 
+    private int retryTimes = 3;
+
     public CloudBrokerLoadJob() {
     }
 
@@ -96,6 +112,13 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
         }
     }
 
+    // override BulkLoadJob.analyze
+    @Override
+    public void analyze() {
+        cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID);
+        super.analyze();
+    }
+
     @Override
     protected LoadLoadingTask createTask(Database db, OlapTable table, 
List<BrokerFileGroup> brokerFileGroups,
             boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, 
BrokerPendingTaskAttachment attachment)
@@ -127,4 +150,162 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
         // in fe. So pint an edit log to save the status information of the 
job here.
         logFinalOperation();
     }
+
+    @Override
+    protected void afterLoadingTaskCommitTransaction(List<Table> tableList) {
+        ConnectContext ctx = null;
+        if (ConnectContext.get() == null) {
+            ctx = new ConnectContext();
+            ctx.setThreadLocalInfo();
+        } else {
+            ctx = ConnectContext.get();
+        }
+
+        if (ctx.getSessionVariable().enableMultiClusterSyncLoad()) {
+            // get the backends of each cluster expect the load cluster
+            CloudSystemInfoService infoService = (CloudSystemInfoService) 
Env.getCurrentSystemInfo();
+            List<List<Backend>> backendsList = infoService.getCloudClusterIds()
+                                                                .stream()
+                                                                .filter(id -> 
!id.equals(cloudClusterId))
+                                                                .map(id -> 
infoService.getBackendsByClusterId(id))
+                                                                
.collect(Collectors.toList());
+            // for each all load table, get its tablets
+            tableList.forEach(table -> {
+                List<Long> allTabletIds = ((OlapTable) 
table).getAllTabletIds();
+                StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
+            });
+        }
+    }
+
+    @Override
+    public void onTaskFailed(long taskId, FailMsg failMsg) {
+        if (Strings.isNullOrEmpty(this.cloudClusterId)) {
+            super.onTaskFailed(taskId, failMsg);
+            return;
+        }
+
+        try {
+            writeLock();
+            if (isTxnDone()) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                        .add("label", label)
+                        .add("transactionId", transactionId)
+                        .add("state", state)
+                        .add("error_msg", "this task will be ignored when job 
is: " + state)
+                        .build());
+                return;
+            }
+            LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+                    .add("label", label)
+                    .add("transactionId", transactionId)
+                    .add("state", state)
+                    .add("retryTimes", retryTimes)
+                    .add("failMsg", failMsg.getMsg())
+                    .build());
+
+            this.retryTimes--;
+            if (this.retryTimes <= 0) {
+                boolean abortTxn = this.transactionId > 0 ? true : false;
+                unprotectedExecuteCancel(failMsg, abortTxn);
+                logFinalOperation();
+                return;
+            } else {
+                unprotectedExecuteRetry(failMsg);
+            }
+        } finally {
+            writeUnlock();
+        }
+
+        boolean allTaskDone = false;
+        while (!allTaskDone) {
+            try {
+                writeLock();
+                // check if all task has been done
+                // unprotectedExecuteRetry() will cancel all running task
+                allTaskDone = true;
+                for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) {
+                    if (entry.getKey() != taskId && 
!entry.getValue().isDone()) {
+                        LOG.info("LoadTask({}) has not been done", 
entry.getKey());
+                        allTaskDone = false;
+                    }
+                }
+            } finally {
+                writeUnlock();
+            }
+            if (!allTaskDone) {
+                try {
+                    Thread.sleep(1000);
+                    continue;
+                } catch (InterruptedException e) {
+                    LOG.warn("", e);
+                }
+            }
+        }
+
+        try {
+            writeLock();
+            this.state = JobState.PENDING;
+            this.idToTasks.clear();
+            this.failMsg = null;
+            this.finishedTaskIds.clear();
+            
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+            LoadTask task = createPendingTask();
+            // retry default backoff 60 seconds, because `be restart` is slow
+            task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000);
+            idToTasks.put(task.getSignature(), task);
+            Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    protected void unprotectedExecuteRetry(FailMsg failMsg) {
+        LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", 
transactionId)
+                .add("error_msg", "Failed to execute load with error: " + 
failMsg.getMsg()).build());
+
+        // get load ids of all loading tasks, we will cancel their coordinator 
process later
+        List<TUniqueId> loadIds = Lists.newArrayList();
+        for (LoadTask loadTask : idToTasks.values()) {
+            if (loadTask instanceof LoadLoadingTask) {
+                loadIds.add(((LoadLoadingTask) loadTask).getLoadId());
+            }
+        }
+
+        // set failMsg and state
+        this.failMsg = failMsg;
+        if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) {
+            // for bug fix, see LoadManager's fixLoadJobMetaBugs() method
+            finishTimestamp = createTimestamp;
+        } else {
+            finishTimestamp = System.currentTimeMillis();
+        }
+
+        // remove callback before abortTransaction(), so that the 
afterAborted() callback will not be called again
+        
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
+        // abort txn by label, because transactionId here maybe -1
+        try {
+            LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
+                    .add("label", label)
+                    .add("msg", "begin to abort txn")
+                    .build());
+            Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, label, 
failMsg.getMsg());
+        } catch (UserException e) {
+            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                    .add("label", label)
+                    .add("error_msg", "failed to abort txn when job is 
cancelled. " + e.getMessage())
+                    .build());
+        }
+
+        // cancel all running coordinators, so that the scheduler's worker 
thread will be released
+        for (TUniqueId loadId : loadIds) {
+            Coordinator coordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
+            if (coordinator != null) {
+                coordinator.cancel();
+            }
+        }
+
+        // change state
+        state = JobState.RETRY;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
new file mode 100644
index 00000000000..1d0bfc23f6c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadManager;
+
+public class CloudLoadManager extends LoadManager {
+
+    public CloudLoadManager(LoadJobScheduler loadJobScheduler) {
+        super(loadJobScheduler);
+    }
+
+    @Override
+    public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, 
UserException {
+        CloudSystemInfoService.waitForAutoStartCurrentCluster();
+
+        return super.createLoadJobFromStmt(stmt);
+    }
+
+    @Override
+    public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException {
+        CloudSystemInfoService.waitForAutoStartCurrentCluster();
+
+        return super.createLoadJobFromStmt(stmt);
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
new file mode 100644
index 00000000000..fd10a3bc467
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.load;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CloudRoutineLoadManager extends RoutineLoadManager {
+    private static final Logger LOG = 
LogManager.getLogger(CloudRoutineLoadManager.class);
+
+    @Override
+    public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String 
dbName, String tableName)
+                    throws UserException {
+        if (!Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) {
+            
routineLoadJob.setCloudCluster(ConnectContext.get().getCloudCluster());
+        } else {
+            throw new UserException("cloud cluster is empty, please specify 
cloud cluster");
+        }
+        super.addRoutineLoadJob(routineLoadJob, dbName, tableName);
+    }
+
+    @Override
+    protected List<Long> getAvailableBackendIds(long jobId) throws 
LoadException {
+        RoutineLoadJob routineLoadJob = getJob(jobId);
+        String cloudClusterId = routineLoadJob.getCloudClusterId();
+        if (Strings.isNullOrEmpty(cloudClusterId)) {
+            LOG.warn("cluster id is empty");
+            throw new LoadException("cluster id is empty");
+        }
+
+        return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
+                .getBackendsByClusterId(cloudClusterId)
+                .stream()
+                .filter(Backend::isAlive)
+                .map(Backend::getId)
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
new file mode 100644
index 00000000000..8480055e8b9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.planner;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CloudGroupCommitPlanner extends GroupCommitPlanner {
+    private static final Logger LOG = 
LogManager.getLogger(CloudGroupCommitPlanner.class);
+
+    public CloudGroupCommitPlanner(Database db, OlapTable table, List<String> 
targetColumnNames, TUniqueId queryId,
+            String groupCommit)
+            throws UserException, TException {
+        super(db, table, targetColumnNames, queryId, groupCommit);
+    }
+
+    @Override
+    protected void selectBackends(ConnectContext ctx) throws DdlException {
+        backend = ctx.getInsertGroupCommit(this.table.getId());
+        if (backend != null && backend.isAlive() && !backend.isDecommissioned()
+                && 
backend.getCloudClusterName().equals(ctx.getCloudCluster())) {
+            return;
+        }
+
+        String cluster = ctx.getCloudCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR);
+        }
+
+        // select be
+        List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster)
+                .values().stream().collect(Collectors.toList());
+        Collections.shuffle(backends);
+        for (Backend backend : backends) {
+            if (backend.isActive() && !backend.isDecommissioned()) {
+                this.backend = backend;
+                ctx.setInsertGroupCommit(this.table.getId(), backend);
+                LOG.debug("choose new be {}", backend.getId());
+                return;
+            }
+        }
+
+        throw new DdlException("No suitable backend for cloud cluster=" + 
cluster);
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
index 02f1f906b3b..2ce8950c12f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
@@ -61,7 +61,7 @@ public class CloudCoordinator extends Coordinator {
     }
 
     @Override
-    protected void prepare() throws Exception {
+    protected void prepare() throws UserException {
         String cluster = null;
         ConnectContext context = ConnectContext.get();
         if (context != null) {
@@ -96,7 +96,7 @@ public class CloudCoordinator extends Coordinator {
             LOG.warn("no available backends, idToBackend {}", idToBackend);
             String clusterName = ConnectContext.get() != null
                     ? ConnectContext.get().getCloudCluster() : "ctx empty cant 
get clusterName";
-            throw new Exception("no available backends, the cluster maybe not 
be set or been dropped clusterName = "
+            throw new UserException("no available backends, the cluster maybe 
not be set or been dropped clusterName = "
                 + clusterName);
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index ae43da4b244..6e7ca874f6e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.system;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.proto.Cloud.ClusterPB;
 import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB;
@@ -30,6 +31,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
@@ -310,6 +312,33 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         return clusterNameToId.containsKey(clusterName);
     }
 
+    @Override
+    public List<Backend> getBackendsByCurrentCluster() throws UserException {
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx == null) {
+            throw new UserException("connect context is null");
+        }
+
+        String cluster = ctx.getCurrentCloudCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            throw new UserException("cluster name is empty");
+        }
+
+        ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster);
+
+        return getBackendsByClusterName(cluster);
+    }
+
+    @Override
+    public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() 
throws UserException {
+        List<Backend> backends = getBackendsByCurrentCluster();
+        Map<Long, Backend> idToBackend = Maps.newHashMap();
+        for (Backend be : backends) {
+            idToBackend.put(be.getId(), be);
+        }
+        return ImmutableMap.copyOf(idToBackend);
+    }
+
     public List<Backend> getBackendsByClusterName(final String clusterName) {
         String clusterId = clusterNameToId.getOrDefault(clusterName, "");
         if (clusterId.isEmpty()) {
@@ -540,9 +569,19 @@ public class CloudSystemInfoService extends 
SystemInfoService {
         this.instanceStatus = instanceStatus;
     }
 
+    public static void waitForAutoStartCurrentCluster() throws DdlException {
+        ConnectContext context = ConnectContext.get();
+        if (context != null) {
+            String cloudCluster = context.getCloudCluster();
+            if (!Strings.isNullOrEmpty(cloudCluster)) {
+                waitForAutoStart(cloudCluster);
+            }
+        }
+    }
+
     public static void waitForAutoStart(final String clusterName) throws 
DdlException {
-        // TODO: merge from cloud.
-        throw new DdlException("Env.waitForAutoStart unimplemented");
+        // TODO(merge-cloud): merge from cloud.
+        // throw new DdlException("Env.waitForAutoStart unimplemented");
     }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index a48af1c4979..bf8cc592c20 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -990,7 +990,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public void updateMultiTableRunningTransactionTableIds(Long dbId, Long 
transactionId, List<Long> tableIds)
             throws UserException {
-        throw new UserException(NOT_SUPPORTED_MSG);
+        //throw new UserException(NOT_SUPPORTED_MSG);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index 1f9ff87647d..183c733097b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1213,7 +1213,10 @@ public enum ErrorCode {
     ERR_CLOUD_CLUSTER_ERROR(5098, new byte[]{'4', '2', '0', '0', '0'},
             "Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid 
cluster"),
 
-    ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No 
cluster selected");
+    ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No 
cluster selected"),
+
+    ERR_NOT_CLOUD_MODE(6000, new byte[]{'4', '2', '0', '0', '0'},
+            "Command only support in cloud mode.");
 
     // This is error code
     private final int code;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index aa616372184..d41fab5916c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode {
 
     protected final FederationBackendPolicy backendPolicy = 
(ConnectContext.get() != null
             && ConnectContext.get().getSessionVariable().enableFileCache)
-            ? new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) :  new 
FederationBackendPolicy();
+            ? new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
+            : new FederationBackendPolicy();
 
     public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 73a49bb24a8..e3e3405a1b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -71,7 +71,7 @@ import java.util.stream.Collectors;
 
 public class FederationBackendPolicy {
     private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
-    private final List<Backend> backends = Lists.newArrayList();
+    protected final List<Backend> backends = Lists.newArrayList();
     private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
 
     public Map<Backend, Long> getAssignedWeightPerBackend() {
@@ -80,7 +80,7 @@ public class FederationBackendPolicy {
 
     private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
 
-    private ConsistentHash<Split, Backend> consistentHash;
+    protected ConsistentHash<Split, Backend> consistentHash;
 
     private int nextBe = 0;
     private boolean initialized = false;
@@ -184,7 +184,7 @@ public class FederationBackendPolicy {
     }
 
     public void init(BeSelectionPolicy policy) throws UserException {
-        
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
+        
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getBackendsByCurrentCluster()));
         if (backends.isEmpty()) {
             throw new UserException("No available backends");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 6be5654a2ea..2b37cf997d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -20,9 +20,12 @@ package org.apache.doris.httpv2.rest;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
@@ -30,6 +33,7 @@ import 
org.apache.doris.httpv2.exception.UnauthorizedException;
 import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.system.Backend;
@@ -39,6 +43,7 @@ import org.apache.doris.thrift.TNetworkAddress;
 
 import com.google.common.base.Strings;
 import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.commons.validator.routines.InetAddressValidator;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -50,8 +55,11 @@ import 
org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
 import java.net.URI;
+import java.security.SecureRandom;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -149,8 +157,7 @@ public class LoadAction extends RestBaseController {
             }
 
             String label = request.getHeader(LABEL_KEY);
-            TNetworkAddress redirectAddr;
-            redirectAddr = selectRedirectBackend(groupCommit);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
groupCommit);
 
             LOG.info("redirect load action to destination={}, label: {}",
                     redirectAddr.toString(), label);
@@ -228,6 +235,7 @@ public class LoadAction extends RestBaseController {
     // we return error by using RestBaseResult.
     private Object executeWithoutPassword(HttpServletRequest request,
             HttpServletResponse response, String db, String table, boolean 
isStreamLoad, boolean groupCommit) {
+        String label = null;
         try {
             String dbName = db;
             String tableName = table;
@@ -246,11 +254,7 @@ public class LoadAction extends RestBaseController {
 
             String fullDbName = dbName;
 
-            String label = request.getParameter(LABEL_KEY);
-            if (isStreamLoad) {
-                label = request.getHeader(LABEL_KEY);
-            }
-
+            label = isStreamLoad ? request.getHeader(LABEL_KEY) : 
request.getParameter(LABEL_KEY);
             if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
                 // for stream load, the label can be generated by system 
automatically
                 return new RestBaseResult("No label selected.");
@@ -273,7 +277,7 @@ public class LoadAction extends RestBaseController {
                     return new RestBaseResult(e.getMessage());
                 }
             } else {
-                redirectAddr = selectRedirectBackend(groupCommit);
+                redirectAddr = selectRedirectBackend(request, groupCommit);
             }
 
             LOG.info("redirect load action to destination={}, stream: {}, db: 
{}, tbl: {}, label: {}",
@@ -282,6 +286,8 @@ public class LoadAction extends RestBaseController {
             RedirectView redirectView = redirectTo(request, redirectAddr);
             return redirectView;
         } catch (Exception e) {
+            LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, 
err: {}",
+                    isStreamLoad, db, table, label, e.getMessage());
             return new RestBaseResult(e.getMessage());
         }
     }
@@ -304,7 +310,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No transaction operation(\'commit\' 
or \'abort\') selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false);
             LOG.info("redirect stream load 2PC action to destination={}, db: 
{}, txn: {}, operation: {}",
                     redirectAddr.toString(), dbName, 
request.getHeader(TXN_ID_KEY), txnOperation);
 
@@ -322,7 +328,35 @@ public class LoadAction extends RestBaseController {
         return index;
     }
 
-    private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws 
LoadException {
+    private String getCloudClusterName(HttpServletRequest request) {
+        String cloudClusterName = 
request.getHeader(SessionVariable.CLOUD_CLUSTER);
+        if (!Strings.isNullOrEmpty(cloudClusterName)) {
+            return cloudClusterName;
+        }
+
+        cloudClusterName = ConnectContext.get().getCloudCluster();
+        if (!Strings.isNullOrEmpty(cloudClusterName)) {
+            return cloudClusterName;
+        }
+
+        return "";
+    }
+
+    private TNetworkAddress selectRedirectBackend(HttpServletRequest request, 
boolean groupCommit)
+            throws LoadException {
+        if (Config.isCloudMode()) {
+            String cloudClusterName = getCloudClusterName(request);
+            if (Strings.isNullOrEmpty(cloudClusterName)) {
+                throw new LoadException("No cloud cluster name selected.");
+            }
+            String reqHostStr = 
request.getHeader(HttpHeaderNames.HOST.toString());
+            return selectCloudRedirectBackend(cloudClusterName, reqHostStr, 
groupCommit);
+        } else {
+            return selectLocalRedirectBackend(groupCommit);
+        }
+    }
+
+    private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) 
throws LoadException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
@@ -358,6 +392,89 @@ public class LoadAction extends RestBaseController {
         return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
     }
 
+    private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
String reqHostStr, boolean groupCommit)
+            throws LoadException {
+        List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                .getBackendsByClusterName(clusterName)
+                .stream().filter(be -> be.isAlive() && (!groupCommit || 
groupCommit && !be.isDecommissioned()))
+                .collect(Collectors.toList());
+
+        if (backends.isEmpty()) {
+            LOG.warn("No available backend for stream load redirect, cluster 
name {}", clusterName);
+            throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", cluster: " + 
clusterName);
+        }
+
+        Random rand = new SecureRandom();
+        int randomIndex = rand.nextInt(backends.size());
+        Backend backend = backends.get(randomIndex);
+
+        Pair<String, Integer> publicHostPort = null;
+        Pair<String, Integer> privateHostPort = null;
+        try {
+            if (!Strings.isNullOrEmpty(backend.getCloudPublicEndpoint())) {
+                publicHostPort = 
splitHostAndPort(backend.getCloudPublicEndpoint());
+            }
+        } catch (AnalysisException e) {
+            throw new LoadException(e.getMessage());
+        }
+
+        try {
+            if (!Strings.isNullOrEmpty(backend.getCloudPrivateEndpoint())) {
+                privateHostPort = 
splitHostAndPort(backend.getCloudPrivateEndpoint());
+            }
+        } catch (AnalysisException e) {
+            throw new LoadException(e.getMessage());
+        }
+
+        reqHostStr = reqHostStr.replaceAll("\\s+", "");
+        if (reqHostStr.isEmpty()) {
+            LOG.info("Invalid header host: {}", reqHostStr);
+            throw new LoadException("Invalid header host: " + reqHostStr);
+        }
+
+        String reqHost = "";
+        String[] pair = reqHostStr.split(":");
+        if (pair.length == 1) {
+            reqHost = pair[0];
+        } else if (pair.length == 2) {
+            reqHost = pair[0];
+        } else {
+            LOG.info("Invalid header host: {}", reqHostStr);
+            throw new LoadException("Invalid header host: " + reqHost);
+        }
+
+        if (InetAddressValidator.getInstance().isValid(reqHost)
+                && publicHostPort != null && reqHost == publicHostPort.first) {
+            return new TNetworkAddress(publicHostPort.first, 
publicHostPort.second);
+        } else if (privateHostPort != null) {
+            return new TNetworkAddress(reqHost, privateHostPort.second);
+        } else {
+            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
+        }
+    }
+
+    private Pair<String, Integer> splitHostAndPort(String hostPort) throws 
AnalysisException {
+        hostPort = hostPort.replaceAll("\\s+", "");
+        if (hostPort.isEmpty()) {
+            LOG.info("empty endpoint");
+            throw new AnalysisException("empty endpoint: " + hostPort);
+        }
+
+        String[] pair = hostPort.split(":");
+        if (pair.length != 2) {
+            LOG.info("Invalid endpoint: {}", hostPort);
+            throw new AnalysisException("Invalid endpoint: " + hostPort);
+        }
+
+        int port = Integer.parseInt(pair[1]);
+        if (port <= 0 || port >= 65536) {
+            LOG.info("Invalid endpoint port: {}", pair[1]);
+            throw new AnalysisException("Invalid endpoint port: " + pair[1]);
+        }
+
+        return Pair.of(pair[0], port);
+    }
+
     // NOTE: This function can only be used for AuditlogPlugin stream load for 
now.
     // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
     // temporarily addressing the users' needs for audit logs.
@@ -410,7 +527,7 @@ public class LoadAction extends RestBaseController {
                 return new RestBaseResult("No label selected.");
             }
 
-            TNetworkAddress redirectAddr = selectRedirectBackend(false);
+            TNetworkAddress redirectAddr = selectRedirectBackend(request, 
false);
 
             LOG.info("Redirect load action with auth token to destination={},"
                         + "stream: {}, db: {}, tbl: {}, label: {}",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
index ee09946f4fc..a5c915e0bbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java
@@ -18,6 +18,7 @@
 package org.apache.doris.httpv2.rest.manager;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
@@ -73,4 +74,40 @@ public class ClusterAction extends RestBaseController {
                 .collect(Collectors.toList()));
         return ResponseEntityBuilder.ok(result);
     }
+
+    public static class BeClusterInfo {
+        public volatile String host;
+        public volatile int heartbeatPort;
+        public volatile int bePort;
+        public volatile int httpPort;
+        public volatile int brpcPort;
+        public volatile long currentFragmentNum = 0;
+        public volatile long lastFragmentUpdateTime = 0;
+    }
+
+    @RequestMapping(path = "/cluster_info/cloud_cluster_status", method = 
RequestMethod.GET)
+    public Object cloudClusterInfo(HttpServletRequest request, 
HttpServletResponse response) {
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
+
+        // Key: cluster_name Value: be status
+        Map<String, List<BeClusterInfo>> result = Maps.newHashMap();
+
+        ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudClusterIdToBackend()
+                .forEach((clusterId, backends) -> {
+                    List<BeClusterInfo> bis = backends.stream().map(backend -> 
{
+                        BeClusterInfo bi = new BeClusterInfo();
+                        bi.host = backend.getHost();
+                        bi.heartbeatPort = backend.getHeartbeatPort();
+                        bi.bePort = backend.getBePort();
+                        bi.httpPort = backend.getHttpPort();
+                        bi.brpcPort = backend.getBrpcPort();
+                        bi.currentFragmentNum = 
backend.getBackendStatus().currentFragmentNum;
+                        bi.lastFragmentUpdateTime = 
backend.getBackendStatus().lastFragmentUpdateTime;
+                        return bi; }).collect(Collectors.toList());
+                    result.put(clusterId, bis);
+                });
+
+        return ResponseEntityBuilder.ok(result);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index abf0500b90e..52007d86239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -118,12 +118,15 @@ public class BrokerLoadJob extends BulkLoadJob {
 
     @Override
     protected void unprotectedExecuteJob() {
-        LoadTask task = new BrokerLoadPendingTask(this, 
fileGroupAggInfo.getAggKeyToFileGroups(),
-                brokerDesc, getPriority());
+        LoadTask task = createPendingTask();
         idToTasks.put(task.getSignature(), task);
         Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
     }
 
+    protected LoadTask createPendingTask() {
+        return new BrokerLoadPendingTask(this, 
fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc, getPriority());
+    }
+
     /**
      * Situation1: When attachment is instance of BrokerPendingTaskAttachment,
      * this method is called by broker pending task.
@@ -312,7 +315,11 @@ public class BrokerLoadJob extends BulkLoadJob {
         try {
             db = getDb();
             tableList = 
db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
-            MetaLockUtils.writeLockTablesOrMetaException(tableList);
+            if (Config.isCloudMode()) {
+                MetaLockUtils.commitLockTables(tableList);
+            } else {
+                MetaLockUtils.writeLockTablesOrMetaException(tableList);
+            }
         } catch (MetaNotFoundException e) {
             LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                     .add("database_id", dbId)
@@ -330,6 +337,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                     dbId, tableList, transactionId, commitInfos,
                     new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
                             finishTimestamp, state, failMsg));
+            afterLoadingTaskCommitTransaction(tableList);
         } catch (UserException e) {
             LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
                     .add("database_id", dbId)
@@ -337,10 +345,18 @@ public class BrokerLoadJob extends BulkLoadJob {
                     .build(), e);
             cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
         } finally {
-            MetaLockUtils.writeUnlockTables(tableList);
+            if (Config.isCloudMode()) {
+                MetaLockUtils.commitUnlockTables(tableList);
+            } else {
+                MetaLockUtils.writeUnlockTables(tableList);
+            }
         }
     }
 
+    // cloud override
+    protected void afterLoadingTaskCommitTransaction(List<Table> tableList) {
+    }
+
     private void writeProfile() {
         if (!enableProfile) {
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
index e023b686c0c..10151694947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -25,7 +25,8 @@ public enum JobState {
     LOADING, // job is running
     COMMITTED, // transaction is committed but not visible
     FINISHED, // transaction is visible and job is finished
-    CANCELLED; // transaction is aborted and job is cancelled
+    CANCELLED, // transaction is aborted and job is cancelled
+    RETRY;
 
     public boolean isFinalState() {
         return this == FINISHED || this == CANCELLED;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c15904be8ef..55b61b1e981 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -250,9 +250,10 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      */
     abstract Set<String> getTableNames() throws MetaNotFoundException;
 
-    // return true if the corresponding transaction is done(COMMITTED, 
FINISHED, CANCELLED)
+    // return true if the corresponding transaction is done(COMMITTED, 
FINISHED, CANCELLED, RETRY)
     public boolean isTxnDone() {
-        return state == JobState.COMMITTED || state == JobState.FINISHED || 
state == JobState.CANCELLED;
+        return state == JobState.COMMITTED || state == JobState.FINISHED
+                || state == JobState.CANCELLED || state == JobState.RETRY;
     }
 
     // return true if job is done(FINISHED/CANCELLED/UNKNOWN)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index eef73542ed4..8c56547ff8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -130,6 +130,7 @@ public class LoadLoadingTask extends LoadTask {
     protected void executeTask() throws Exception {
         LOG.info("begin to execute loading task. load id: {} job id: {}. db: 
{}, tbl: {}. left retry: {}",
                 DebugUtil.printId(loadId), callback.getCallbackId(), 
db.getFullName(), table.getName(), retryTime);
+
         retryTime--;
         beginTime = System.currentTimeMillis();
         if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index d6789805e67..06991d5fb52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LogBuilder;
@@ -69,6 +70,8 @@ public abstract class LoadTask extends MasterTask {
     protected TaskAttachment attachment;
     protected FailMsg failMsg = new FailMsg();
     protected int retryTime = 1;
+    private volatile boolean done = false;
+    protected long startTimeMs = 0;
     protected final Priority priority;
 
     public LoadTask(LoadTaskCallback callback, TaskType taskType, Priority 
priority) {
@@ -82,6 +85,17 @@ public abstract class LoadTask extends MasterTask {
     protected void exec() {
         boolean isFinished = false;
         try {
+            if (Config.isCloudMode()) {
+                while (startTimeMs > System.currentTimeMillis()) {
+                    try {
+                        Thread.sleep(1000);
+                        LOG.info("LoadTask:{} backoff startTimeMs:{} now:{}",
+                                signature, startTimeMs, 
System.currentTimeMillis());
+                    } catch (InterruptedException e) {
+                        LOG.info("ignore InterruptedException: ", e);
+                    }
+                }
+            }
             // execute pending task
             executeTask();
             // callback on pending task finished
@@ -100,6 +114,7 @@ public abstract class LoadTask extends MasterTask {
                 // callback on pending task failed
                 callback.onTaskFailed(signature, failMsg);
             }
+            done = true;
         }
     }
 
@@ -131,6 +146,14 @@ public abstract class LoadTask extends MasterTask {
         return taskType;
     }
 
+    public boolean isDone() {
+        return done;
+    }
+
+    public void setStartTimeMs(long startTimeMs) {
+        this.startTimeMs = startTimeMs;
+    }
+
     public int getPriorityValue() {
         return this.priority.value;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index 5c46ed5d862..adbfc27fca2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -41,6 +41,7 @@ import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.google.common.collect.EvictingQueue;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
@@ -428,6 +429,16 @@ public class MysqlLoadManager {
                 httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames);
             }
         }
+
+        // cloud cluster
+        if (Config.isCloudMode()) {
+            String clusterName = ConnectContext.get().getCloudCluster();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                throw new LoadException("cloud cluster is empty");
+            }
+            httpPut.addHeader(LoadStmt.KEY_CLOUD_CLUSTER, clusterName);
+        }
+
         httpPut.setEntity(entity);
         return httpPut;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index d8b79d9bdce..477dc02955f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -110,6 +110,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
             tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN);
         }
         
tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode());
+        tRoutineLoadTask.setQualifiedUser(routineLoadJob.getQualifiedUser());
+        tRoutineLoadTask.setCloudCluster(routineLoadJob.getCloudCluster());
         return tRoutineLoadTask;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index f9be2014e30..b0911f00634 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -262,10 +263,16 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     protected boolean isTypeRead = false;
 
+    private String cloudClusterId;
+
     protected byte enclose = 0;
 
     protected byte escape = 0;
 
+    // use for cloud cluster mode
+    protected String qualifiedUser;
+    protected String cloudCluster;
+
     public void setTypeRead(boolean isTypeRead) {
         this.isTypeRead = isTypeRead;
     }
@@ -314,6 +321,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
             this.memtableOnSinkNode = 
ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
+            this.qualifiedUser = ConnectContext.get().getQualifiedUser();
+            this.cloudCluster = ConnectContext.get().getCloudCluster();
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -720,6 +729,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         this.comment = comment;
     }
 
+    public String getQualifiedUser() {
+        return qualifiedUser;
+    }
+
+    public String getCloudCluster() {
+        return cloudCluster;
+    }
+
     public int getSizeOfRoutineLoadTaskInfoList() {
         readLock();
         try {
@@ -916,8 +933,28 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         Preconditions.checkNotNull(planner);
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
         Table table = db.getTableOrMetaException(tableId, 
Table.TableType.OLAP);
+        boolean needCleanCtx = false;
         table.readLock();
         try {
+            if (Config.isCloudMode()) {
+                String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                        .getClusterNameByClusterId(cloudClusterId);
+                if (Strings.isNullOrEmpty(clusterName)) {
+                    String err = String.format("cluster name is empty, cluster 
id is %s", cloudClusterId);
+                    LOG.warn(err);
+                    throw new UserException(err);
+                }
+
+                if (ConnectContext.get() == null) {
+                    ConnectContext ctx = new ConnectContext();
+                    ctx.setThreadLocalInfo();
+                    ctx.setCloudCluster(clusterName);
+                    needCleanCtx = true;
+                } else {
+                    ConnectContext.get().setCloudCluster(clusterName);
+                }
+            }
+
             TExecPlanFragmentParams planParams = planner.plan(loadId);
             // add table indexes to transaction state
             TransactionState txnState = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId);
@@ -931,6 +968,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
             return planParams;
         } finally {
+            if (needCleanCtx) {
+                ConnectContext.remove();
+            }
             table.readUnlock();
         }
     }
@@ -1486,6 +1526,24 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         this.origStmt = origStmt;
     }
 
+    public void setCloudCluster(String cloudClusterName) throws UserException {
+        if (Strings.isNullOrEmpty(cloudClusterName)) {
+            LOG.warn("cluster name is empty");
+            throw new UserException("cluster name is empty");
+        }
+
+        this.cloudClusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
+                .getCloudClusterIdByName(cloudClusterName);
+        if (Strings.isNullOrEmpty(this.cloudClusterId)) {
+            LOG.warn("cluster id is empty, cluster name {}", cloudClusterName);
+            throw new UserException("cluster id is empty, cluster name: " + 
cloudClusterName);
+        }
+    }
+
+    public String getCloudClusterId() {
+        return cloudClusterId;
+    }
+
     // check the correctness of commit info
     protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment 
rlTaskTxnCommitAttachment,
                                                TransactionState txnState,
@@ -1797,6 +1855,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             out.writeBoolean(true);
             userIdentity.write(out);
         }
+        if (Config.isCloudMode()) {
+            Text.writeString(out, cloudClusterId);
+        }
         Text.writeString(out, comment);
     }
 
@@ -1889,6 +1950,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 userIdentity = UserIdentity.UNKNOWN;
             }
         }
+        if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_123 && 
Config.isCloudMode()) {
+            cloudClusterId = Text.readString(in);
+        }
         if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) {
             comment = Text.readString(in);
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 13a86f5d173..522d320e912 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -153,6 +153,7 @@ public class RoutineLoadManager implements Writable {
 
     }
 
+    // cloud override
     public void createRoutineLoadJob(CreateRoutineLoadStmt 
createRoutineLoadStmt)
             throws UserException {
         // check load auth
@@ -184,7 +185,7 @@ public class RoutineLoadManager implements Writable {
     }
 
     public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String 
dbName, String tableName)
-                    throws DdlException {
+                    throws UserException {
         writeLock();
         try {
             // check if db.routineLoadName has been used
@@ -542,7 +543,7 @@ public class RoutineLoadManager implements Writable {
      * @return
      * @throws LoadException
      */
-    private List<Long> getAvailableBackendIds(long jobId) throws LoadException 
{
+    protected List<Long> getAvailableBackendIds(long jobId) throws 
LoadException {
         RoutineLoadJob job = getJob(jobId);
         if (job == null) {
             throw new LoadException("job " + jobId + " does not exist");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index f84e8bb4c90..43f93df71e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -1818,7 +1818,7 @@ public class Auth implements Writable {
         return sb.toString();
     }
 
-    // ====== CLOUD ======
+    // ====== BEGIN CLOUD ======
     public List<String> getCloudClusterUsers(String clusterName) {
         return propertyMgr.getCloudClusterUsers(userManager.getAllUsers(), 
clusterName);
     }
@@ -1844,5 +1844,5 @@ public class Auth implements Writable {
         }
         return cluster;
     }
-    // ====== CLOUD ======
+    // ====== END CLOUD ======
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
index 4926d5486dc..655a37c7554 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -111,7 +112,8 @@ public class GroupCommitInserter {
         for (List<Expr> list : materializedConstExprLists) {
             rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
         }
-        GroupCommitPlanner groupCommitPlanner = new 
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
+        GroupCommitPlanner groupCommitPlanner = 
EnvFactory.getInstance().createGroupCommitPlanner(
+                physicalOlapTableSink.getDatabase(),
                 physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
                 ConnectContext.get().getSessionVariable().getGroupCommit());
         PGroupCommitInsertResponse response = 
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index c8e7ef6c4e2..e3c4bf0ecec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -72,10 +72,10 @@ import java.util.stream.Collectors;
 public class GroupCommitPlanner {
     private static final Logger LOG = 
LogManager.getLogger(GroupCommitPlanner.class);
 
-    private Database db;
-    private OlapTable table;
-    private TUniqueId loadId;
-    private Backend backend;
+    protected Database db;
+    protected OlapTable table;
+    protected TUniqueId loadId;
+    protected Backend backend;
     private TExecPlanFragmentParamsList paramsList;
     private ByteString execPlanFragmentParamsBytes;
 
@@ -131,29 +131,8 @@ public class GroupCommitPlanner {
     public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext 
ctx,
             List<InternalService.PDataRow> rows)
             throws DdlException, RpcException, ExecutionException, 
InterruptedException {
-        backend = ctx.getInsertGroupCommit(this.table.getId());
-        if (backend == null || !backend.isAlive() || 
backend.isDecommissioned()) {
-            List<Long> allBackendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
-            if (allBackendIds.isEmpty()) {
-                throw new DdlException("No alive backend");
-            }
-            Collections.shuffle(allBackendIds);
-            boolean find = false;
-            for (Long beId : allBackendIds) {
-                backend = Env.getCurrentSystemInfo().getBackend(beId);
-                if (!backend.isDecommissioned()) {
-                    ctx.setInsertGroupCommit(this.table.getId(), backend);
-                    find = true;
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("choose new be {}", backend.getId());
-                    }
-                    break;
-                }
-            }
-            if (!find) {
-                throw new DdlException("No suitable backend");
-            }
-        }
+        selectBackends(ctx);
+
         PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
                 
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
                         .setRequest(execPlanFragmentParamsBytes)
@@ -166,6 +145,32 @@ public class GroupCommitPlanner {
         return future.get();
     }
 
+    // cloud override
+    protected void selectBackends(ConnectContext ctx) throws DdlException {
+        backend = ctx.getInsertGroupCommit(this.table.getId());
+        if (backend != null && backend.isAlive() && 
!backend.isDecommissioned()) {
+            return;
+        }
+
+        List<Long> allBackendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+        if (allBackendIds.isEmpty()) {
+            throw new DdlException("No alive backend");
+        }
+        Collections.shuffle(allBackendIds);
+        for (Long beId : allBackendIds) {
+            backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (!backend.isDecommissioned()) {
+                ctx.setInsertGroupCommit(this.table.getId(), backend);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("choose new be {}", backend.getId());
+                }
+                return;
+            }
+        }
+
+        throw new DdlException("No suitable backend");
+    }
+
     // only for nereids use
     public static InternalService.PDataRow getRowStringValue(List<Expr> cols, 
int filterSize) throws UserException {
         if (cols.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 29243a4abeb..7ee563b6783 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -894,12 +894,15 @@ public class OlapScanNode extends ScanNode {
                         LOG.debug("backend {} not exists or is not alive for 
replica {}", replica.getBackendId(),
                                 replica.getId());
                     }
-                    errs.add("replica " + replica.getId() + "'s backend " + 
replica.getBackendId()
-                            + " does not exist or not alive");
-                    errs.add(" or you may not have permission to access the 
current cluster");
-                    if (ConnectContext.get() != null && Config.isCloudMode()) {
-                        errs.add("clusterName=" + 
ConnectContext.get().getCloudCluster());
+                    String err = "replica " + replica.getId() + "'s backend " 
+ replica.getBackendId()
+                            + " does not exist or not alive";
+                    if (Config.isCloudMode()) {
+                        err += ", or you may not have permission to access the 
current cluster";
+                        if (ConnectContext.get() != null) {
+                            err += " clusterName=" + 
ConnectContext.get().getCloudCluster();
+                        }
                     }
+                    errs.add(err);
                     continue;
                 }
                 if (!backend.isMixNode()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d8c27dd32e5..19c6e7e1d5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -518,11 +518,13 @@ public class OlapTableSink extends DataSink {
                     Multimap<Long, Long> bePathsMap = 
tablet.getNormalReplicaBackendPathMap();
                     if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
                         String errMsg = "tablet " + tablet.getId() + " alive 
replica num " + bePathsMap.keySet().size()
-                                + " < quorum replica num " + 
loadRequiredReplicaNum
-                                + ", alive backends: [" + 
StringUtils.join(bePathsMap.keySet(), ",") + "]";
-                        errMsg += " or you may not have permission to access 
the current cluster";
-                        if (ConnectContext.get() != null && 
Config.isCloudMode()) {
-                            errMsg += " clusterName=" + 
ConnectContext.get().getCloudCluster();
+                                        + " < load required replica num " + 
loadRequiredReplicaNum
+                                        + ", alive backends: [" + 
StringUtils.join(bePathsMap.keySet(), ",") + "]";
+                        if (Config.isCloudMode()) {
+                            errMsg += ", or you may not have permission to 
access the current cluster";
+                            if (ConnectContext.get() != null) {
+                                errMsg += " clusterName=" + 
ConnectContext.get().getCloudCluster();
+                            }
                         }
                         throw new 
UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg);
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
index c8915966725..6a5fe19fcc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java
@@ -98,6 +98,8 @@ public class AuditEvent {
     public long peakMemoryBytes = -1;
     @AuditField(value = "SqlDigest")
     public String sqlDigest = "";
+    @AuditField(value = "cloudClusterName")
+    public String cloudClusterName = "";
     @AuditField(value = "TraceId")
     public String traceId = "";
     @AuditField(value = "WorkloadGroup")
@@ -149,6 +151,11 @@ public class AuditEvent {
             return this;
         }
 
+        public AuditEventBuilder setCloudCluster(String cloudClusterName) {
+            auditEvent.cloudClusterName = cloudClusterName;
+            return this;
+        }
+
         public AuditEventBuilder setState(String state) {
             auditEvent.state = state;
             return this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 9311b4ca8e8..fa8b1bdaf3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -32,6 +32,7 @@ import org.apache.doris.plugin.audit.AuditEvent.EventType;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.service.FrontendOptions;
 
+import com.google.common.base.Strings;
 import org.apache.commons.codec.digest.DigestUtils;
 
 public class AuditLogHelper {
@@ -44,6 +45,8 @@ public class AuditLogHelper {
         long elapseMs = endTime - ctx.getStartTime();
         CatalogIf catalog = ctx.getCurrentCatalog();
 
+        String cluster = Config.isCloudMode() ? ctx.getCloudCluster(false) : 
"";
+
         AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
         auditEventBuilder.reset();
         auditEventBuilder
@@ -66,6 +69,7 @@ public class AuditLogHelper {
                 .setReturnRows(ctx.getReturnRows())
                 .setStmtId(ctx.getStmtId())
                 .setQueryId(ctx.queryId() == null ? "NaN" : 
DebugUtil.printId(ctx.queryId()))
+                .setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : 
cluster)
                 .setWorkloadGroup(ctx.getWorkloadGroupName())
                 .setFuzzyVariables(!printFuzzyVariables ? "" : 
ctx.getSessionVariable().printFuzzyVariables());
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 7e0bd962cf4..4ef75173786 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1066,6 +1066,10 @@ public class ConnectContext {
         this.cloudCluster = cluster;
     }
 
+    public String getCloudCluster() {
+        return getCloudCluster(true);
+    }
+
     /**
      * @return Returns an available cluster in the following order
      *         1 Use an explicitly specified cluster
@@ -1073,7 +1077,11 @@ public class ConnectContext {
      *         3 If the user does not have a default cluster, select a cluster 
with permissions for the user
      *         Returns null when there is no available cluster
      */
-    public String getCloudCluster() {
+    public String getCloudCluster(boolean updateErr) {
+        if (!Config.isCloudMode()) {
+            return null;
+        }
+
         String cluster = null;
         if (!Strings.isNullOrEmpty(this.cloudCluster)) {
             cluster = this.cloudCluster;
@@ -1091,11 +1099,13 @@ public class ConnectContext {
 
         if (Strings.isNullOrEmpty(cluster)) {
             LOG.warn("cant get a valid cluster for user {} to use", 
getCurrentUserIdentity());
-            getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR,
-                    "Cant get a Valid cluster for you to use, plz connect 
admin");
+            if (updateErr) {
+                getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR,
+                        "Cant get a Valid cluster for you to use, plz connect 
admin");
+            }
         } else {
             this.cloudCluster = cluster;
-            LOG.info("finally set context cluster name {}", cloudCluster);
+            LOG.info("finally set context cluster name {} for user {}", 
cloudCluster, getCurrentUserIdentity());
         }
 
         return cluster;
@@ -1103,6 +1113,12 @@ public class ConnectContext {
 
     // TODO implement this function
     public String getDefaultCloudCluster() {
+        List<String> cloudClusterNames = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).getCloudClusterNames();
+        String defaultCluster = 
Env.getCurrentEnv().getAuth().getDefaultCloudCluster(getQualifiedUser());
+        if (!Strings.isNullOrEmpty(defaultCluster) && 
cloudClusterNames.contains(defaultCluster)) {
+            return defaultCluster;
+        }
+
         return null;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7a213e47862..b1d591bbdd4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -508,7 +508,7 @@ public class Coordinator implements CoordInterface {
     }
 
     // Initialize
-    protected void prepare() throws Exception {
+    protected void prepare() throws UserException {
         for (PlanFragment fragment : fragments) {
             fragmentExecParamsMap.put(fragment.getFragmentId(), new 
FragmentExecParams(fragment));
         }
@@ -524,7 +524,8 @@ public class Coordinator implements CoordInterface {
 
         coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
 
-        this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
+        this.idToBackend = 
Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster();
+
         if (LOG.isDebugEnabled()) {
             int backendNum = idToBackend.size();
             StringBuilder backendInfos = new StringBuilder("backends info:");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 40c126b732d..f0cd170d267 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.thrift.FrontendService;
@@ -28,6 +29,7 @@ import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -159,6 +161,10 @@ public class MasterOpExecutor {
         params.setStmtId(ctx.getStmtId());
         params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
 
+        if (Config.isCloudMode() && 
!Strings.isNullOrEmpty(ctx.getCloudCluster())) {
+            params.setCloudCluster(ctx.getCloudCluster());
+        }
+
         // query options
         
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
         // session variables
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0304000a1f0..8bfd0d7f1a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -254,6 +254,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit";
 
+    public static final String CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD = 
"enable_multi_cluster_sync_load";
+
     public static final String ENABLE_PARALLEL_OUTFILE = 
"enable_parallel_outfile";
 
     public static final String SQL_QUOTE_SHOW_CREATE = "sql_quote_show_create";
@@ -773,6 +775,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC)
     public String fragmentTransmissionCompressionCodec = "none";
 
+    // whether sync load to other cluster
+    @VariableMgr.VarAttr(name = CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD, 
needForward = true)
+    public static boolean cloudEnableMultiClusterSyncLoad = false;
+
     /*
      * the parallel exec instance num for one Fragment in one BE
      * 1 means disable this feature
@@ -1969,6 +1975,14 @@ public class SessionVariable implements Serializable, 
Writable {
         return autoCommit;
     }
 
+    public boolean enableMultiClusterSyncLoad() {
+        return cloudEnableMultiClusterSyncLoad;
+    }
+
+    public void setEnableMultiClusterSyncLoad(boolean 
cloudEnableMultiClusterSyncLoad) {
+        this.cloudEnableMultiClusterSyncLoad = cloudEnableMultiClusterSyncLoad;
+    }
+
     public boolean isTxReadonly() {
         return txReadonly;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 6008dc32c41..53ff62edb68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -744,7 +744,12 @@ public class ShowExecutor {
     }
 
     // Show clusters
-    private void handleShowCluster() {
+    private void handleShowCluster() throws AnalysisException {
+        if (!Config.isCloudMode()) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE);
+            return;
+        }
+
         final ShowClusterStmt showStmt = (ShowClusterStmt) stmt;
         final List<List<String>> rows = Lists.newArrayList();
         List<String> clusterNames = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d88663ad6fc..72073e1e847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -53,6 +53,7 @@ import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.analysis.ReplacePartitionClause;
 import org.apache.doris.analysis.ReplaceTableClause;
+import org.apache.doris.analysis.ResourceTypeEnum;
 import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.analysis.SetOperationStmt;
 import org.apache.doris.analysis.SetStmt;
@@ -90,8 +91,11 @@ import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
 import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuditLog;
+import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -165,17 +169,22 @@ import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.ResultRow;
 import org.apache.doris.statistics.util.InternalQueryBuffer;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.BackendService.Client;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
 import org.apache.doris.thrift.TLoadTxnBeginResult;
 import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TResultBatch;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TSyncLoadForTabletsRequest;
 import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TabletCommitInfo;
@@ -210,6 +219,8 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 // Do one COM_QUERY process.
@@ -221,6 +232,7 @@ public class StmtExecutor {
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
     public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
     public static final String NULL_VALUE_FOR_LOAD = "\\N";
+    private Pattern beIpPattern = Pattern.compile("\\[(\\d+):");
     private final Object writeProfileLock = new Object();
     private ConnectContext context;
     private final StatementContext statementContext;
@@ -476,7 +488,34 @@ public class StmtExecutor {
     public void execute() throws Exception {
         UUID uuid = UUID.randomUUID();
         TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-        execute(queryId);
+        TUniqueId firstQueryId = queryId;
+        int retryTime = Config.max_query_retry_time;
+        for (int i = 1; i <= retryTime; i++) {
+            try {
+                execute(queryId);
+                return;
+            } catch (UserException e) {
+                if (!e.getMessage().contains("E-230") || i == retryTime) {
+                    throw e;
+                }
+                TUniqueId lastQueryId = queryId;
+                uuid = UUID.randomUUID();
+                queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+                int randomMillis = 10 + (int) (Math.random() * 10);
+                if (i > retryTime / 2) {
+                    randomMillis = 20 + (int) (Math.random() * 10);
+                }
+                if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) {
+                    randomMillis = 1000;
+                }
+                LOG.warn("receive E-230 tried={} first queryId={} last 
queryId={} new queryId={} sleep={}ms",
+                        i, DebugUtil.printId(firstQueryId), 
DebugUtil.printId(lastQueryId),
+                        DebugUtil.printId(queryId), randomMillis);
+                Thread.sleep(randomMillis);
+            } catch (Exception e) {
+                throw e;
+            }
+        }
     }
 
     public boolean notAllowFallback() {
@@ -718,14 +757,62 @@ public class StmtExecutor {
                     AuditLog.getQueryAudit().log("Query {} {} times with new 
query id: {}",
                             DebugUtil.printId(queryId), i, 
DebugUtil.printId(newQueryId));
                     context.setQueryId(newQueryId);
+                    if (Config.isCloudMode()) {
+                        // sleep random millis [1000, 1500] ms
+                        // in the begining of retryTime/2
+                        int randomMillis = 1000 + (int) (Math.random() * (1000 
- 500));
+                        LOG.debug("stmt executor retry times {}, wait 
randomMillis:{}, stmt:{}",
+                                i, randomMillis, originStmt.originStmt);
+                        try {
+                            if (i > retryTime / 2) {
+                                // sleep random millis [2000, 2500] ms
+                                // in the ending of retryTime/2
+                                randomMillis = 2000 + (int) (Math.random() * 
(1000 - 500));
+                            }
+                            Thread.sleep(randomMillis);
+                        } catch (InterruptedException e) {
+                            LOG.info("stmt executor sleep wait 
InterruptedException: ", e);
+                        }
+                    }
                 }
                 if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
                     context.setReturnResultFromLocal(false);
                 }
                 handleQueryStmt();
                 break;
-            } catch (RpcException e) {
-                if (i == retryTime - 1) {
+            } catch (RpcException | UserException e) {
+                // cloud mode retry
+                LOG.debug("due to exception {} retry {} rpc {} user {}",
+                        e.getMessage(), i, e instanceof RpcException, e 
instanceof UserException);
+                // errCode = 2, detailMessage = There is no scanNode Backend 
available.[10003: not alive]
+                List<String> bes = 
Env.getCurrentSystemInfo().getAllBackendIds().stream()
+                            .map(id -> 
Long.toString(id)).collect(Collectors.toList());
+                String msg = e.getMessage();
+                boolean isNeedRetry = true;
+                if (e instanceof UserException
+                        && 
msg.contains(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG)) {
+                    isNeedRetry = false;
+                    Matcher matcher = beIpPattern.matcher(msg);
+                    // here retry planner not be recreated, so
+                    // in cloud mode drop node, be id invalid, so need not 
retry
+                    // such as be ids [11000, 11001] -> after drop node 11001
+                    // don't need to retry 11001's request
+                    if (matcher.find()) {
+                        String notAliveBe = matcher.group(1);
+                        isNeedRetry = bes.contains(notAliveBe);
+                        if (isNeedRetry) {
+                            Backend abnormalBe = 
Env.getCurrentSystemInfo().getBackend(Long.parseLong(notAliveBe));
+                            String deadCloudClusterStatus = 
abnormalBe.getCloudClusterStatus();
+                            String deadCloudClusterClusterName = 
abnormalBe.getCloudClusterName();
+                            LOG.info("need retry cluster {} status {}-{}", 
deadCloudClusterClusterName,
+                                    deadCloudClusterStatus, 
ClusterStatus.valueOf(deadCloudClusterStatus));
+                            if (ClusterStatus.valueOf(deadCloudClusterStatus) 
!= ClusterStatus.NORMAL) {
+                                
CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName);
+                            }
+                        }
+                    }
+                }
+                if (i == retryTime - 1 || !isNeedRetry) {
                     throw e;
                 }
                 if (context.getConnectType().equals(ConnectType.MYSQL) && 
!context.getMysqlChannel().isSend()) {
@@ -1008,6 +1095,14 @@ public class StmtExecutor {
         }
     }
 
+    private boolean hasCloudClusterPriv() {
+        if (ConnectContext.get() == null || 
Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) {
+            return false;
+        }
+        return 
Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
+            ConnectContext.get().getCloudCluster(), PrivPredicate.USAGE, 
ResourceTypeEnum.CLUSTER);
+    }
+
     // Analyze one statement to structure in memory.
     public void analyze(TQueryOptions tQueryOptions) throws UserException, 
InterruptedException {
         if (LOG.isDebugEnabled()) {
@@ -1136,6 +1231,27 @@ public class StmtExecutor {
                         resetAnalyzerAndStmt();
                     }
                 } catch (UserException e) {
+                    // cloud mode retry, when retry need check this user has 
cloud cluster auth.
+                    // if user doesn't have cloud cluster auth, don't retry, 
just return.
+                    if (Config.isCloudMode()
+                            && 
(e.getMessage().contains(SystemInfoService.NOT_USING_VALID_CLUSTER_MSG)
+                            || e.getMessage().contains("backend -1"))
+                            && hasCloudClusterPriv()) {
+                        LOG.debug("cloud mode analyzeAndGenerateQueryPlan 
retry times {}", i);
+                        // sleep random millis [500, 1000] ms
+                        int randomMillis = 500 + (int) (Math.random() * (1000 
- 500));
+                        try {
+                            if (i > analyzeTimes / 2) {
+                                randomMillis = 1000 + (int) (Math.random() * 
(1000 - 500));
+                            }
+                            Thread.sleep(randomMillis);
+                        } catch (InterruptedException ie) {
+                            LOG.info("stmt executor sleep wait 
InterruptedException: ", ie);
+                        }
+                        if (i < analyzeTimes) {
+                            continue;
+                        }
+                    }
                     throw e;
                 } catch (Exception e) {
                     LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
@@ -2101,6 +2217,23 @@ public class StmtExecutor {
                     txnStatus = TransactionStatus.COMMITTED;
                 }
 
+                // TODO(meiyi)
+                // insertStmt.afterFinishTxn(true);
+                if (Config.isCloudMode()) {
+                    String clusterName = context.getCloudCluster();
+                    if 
(context.getSessionVariable().enableMultiClusterSyncLoad()
+                            && clusterName != null && !clusterName.isEmpty()) {
+                        CloudSystemInfoService infoService = 
(CloudSystemInfoService) Env.getCurrentSystemInfo();
+                        List<List<Backend>> backendsList = infoService
+                                                                
.getCloudClusterNames()
+                                                                .stream()
+                                                                .filter(name 
-> !name.equals(clusterName))
+                                                                .map(name -> 
infoService.getBackendsByClusterName(name))
+                                                                
.collect(Collectors.toList());
+                        List<Long> allTabletIds = ((OlapTable) 
insertStmt.getTargetTable()).getAllTabletIds();
+                        syncLoadForTablets(backendsList, allTabletIds);
+                    }
+                }
             } catch (Throwable t) {
                 // if any throwable being thrown during insert operation, 
first we should abort this txn
                 LOG.warn("handle insert stmt fail: {}", label, t);
@@ -2173,6 +2306,41 @@ public class StmtExecutor {
         context.updateReturnRows((int) loadedRows);
     }
 
+    public static void syncLoadForTablets(List<List<Backend>> backendsList, 
List<Long> allTabletIds) {
+        backendsList.forEach(backends -> backends.forEach(backend -> {
+            if (backend.isAlive()) {
+                List<Long> tabletIdList = new ArrayList<Long>();
+                Set<Long> beTabletIds = null;
+                // TODO(merge-cloud): need implements cloud rebalancer, 
otherwise raise beTabletIds NPE
+                //Set<Long> beTabletIds = Env.getCurrentEnv()
+                //                            .getCloudTabletRebalancer()
+                //                            
.getSnapshotTabletsByBeId(backend.getId());
+                allTabletIds.forEach(tabletId -> {
+                    if (beTabletIds.contains(tabletId)) {
+                        tabletIdList.add(tabletId);
+                    }
+                });
+                boolean ok = false;
+                TNetworkAddress address = null;
+                Client client = null;
+                try {
+                    address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
+                    client = ClientPool.backendPool.borrowObject(address);
+                    client.syncLoadForTablets(new 
TSyncLoadForTabletsRequest(allTabletIds));
+                    ok = true;
+                } catch (Exception e) {
+                    LOG.warn(e.getMessage());
+                } finally {
+                    if (!ok) {
+                        ClientPool.backendPool.invalidateObject(address, 
client);
+                    } else {
+                        ClientPool.backendPool.returnObject(address, client);
+                    }
+                }
+            }
+        }));
+    }
+
     private void handleExternalInsertStmt() {
         // TODO(tsy): load refactor, handle external load here
         try {
@@ -2257,6 +2425,11 @@ public class StmtExecutor {
     }
 
     private void handleUseCloudClusterStmt() throws AnalysisException {
+        if (!Config.isCloudMode()) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE);
+            return;
+        }
+
         UseCloudClusterStmt useCloudClusterStmt = (UseCloudClusterStmt) 
parsedStmt;
         try {
             ((CloudEnv) 
context.getEnv()).changeCloudCluster(useCloudClusterStmt.getCluster(), context);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 36d24ddfc4d..d2ce1a7194a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.planner.CloudStreamLoadPlanner;
 import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
@@ -988,6 +989,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         ConnectContext context = new ConnectContext(null, true);
         // Set current connected FE to the client address, so that we can know 
where this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
+        if (Config.isCloudMode() && 
!Strings.isNullOrEmpty(params.getCloudCluster())) {
+            context.setCloudCluster(params.getCloudCluster());
+        }
 
         ConnectProcessor processor = null;
         if (context.getConnectType().equals(ConnectType.MYSQL)) {
@@ -1941,6 +1945,41 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
         List<String> tableNames = request.getTableNames();
+
+        if (Config.isCloudMode()) {
+            try {
+                ConnectContext ctx = new ConnectContext();
+                ctx.setThreadLocalInfo();
+                ctx.setQualifiedUser(request.getUser());
+                ctx.setRemoteIP(request.getUserIp());
+                String userName = 
ClusterNamespace.getNameFromFullName(request.getUser());
+                if (userName != null) {
+                    List<UserIdentity> currentUser = Lists.newArrayList();
+                    try {
+                        
Env.getCurrentEnv().getAuth().checkPlainPassword(userName,
+                                request.getUserIp(), request.getPasswd(), 
currentUser);
+                    } catch (AuthenticationException e) {
+                        throw new UserException(e.formatErrMsg());
+                    }
+                    Preconditions.checkState(currentUser.size() == 1);
+                    ctx.setCurrentUserIdentity(currentUser.get(0));
+                }
+                LOG.info("one stream multi table load use cloud cluster {}", 
request.getCloudCluster());
+                //ctx.setCloudCluster();
+                if (!Strings.isNullOrEmpty(request.getCloudCluster())) {
+                    if (Strings.isNullOrEmpty(request.getUser())) {
+                        ctx.setCloudCluster(request.getCloudCluster());
+                    } else {
+                        ((CloudEnv) 
Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx);
+                    }
+                }
+            } catch (UserException e) {
+                LOG.warn("failed to set ConnectContext info: {}", 
e.getMessage());
+                status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+                status.addToErrorMsgs(e.getMessage());
+            }
+        }
+
         try {
             if (CollectionUtils.isEmpty(tableNames)) {
                 throw new MetaNotFoundException("table not found");
@@ -2101,6 +2140,49 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
     private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest 
request, TStreamLoadPutResult result)
             throws UserException {
+        if (request.isSetAuthCode()) {
+            String clientAddr = getClientAddrAsString();
+            ConnectContext ctx = new ConnectContext();
+            ctx.setThreadLocalInfo();
+            ctx.setRemoteIP(clientAddr);
+            long backendId = request.getBackendId();
+            Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+            Preconditions.checkNotNull(backend);
+            ctx.setCloudCluster(backend.getCloudClusterName());
+            LOG.info("streamLoadPutImpl set context: cluster {}", 
ctx.getCloudCluster());
+        } else if (Config.isCloudMode()) {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setThreadLocalInfo();
+            ctx.setQualifiedUser(request.getUser());
+            ctx.setRemoteIP(request.getUserIp());
+            String userName = 
ClusterNamespace.getNameFromFullName(request.getUser());
+            if (userName != null) {
+                List<UserIdentity> currentUser = Lists.newArrayList();
+                try {
+                    Env.getCurrentEnv().getAuth().checkPlainPassword(userName,
+                            request.getUserIp(), request.getPasswd(), 
currentUser);
+                } catch (AuthenticationException e) {
+                    throw new UserException(e.formatErrMsg());
+                }
+                Preconditions.checkState(currentUser.size() == 1);
+                ctx.setCurrentUserIdentity(currentUser.get(0));
+            }
+
+            LOG.info("stream load use cloud cluster {}", 
request.getCloudCluster());
+            if (!Strings.isNullOrEmpty(request.getCloudCluster())) {
+                if (Strings.isNullOrEmpty(request.getUser())) {
+                    // mysql load
+                    ctx.setCloudCluster(request.getCloudCluster());
+                } else {
+                    // stream load
+                    ((CloudEnv) 
Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx);
+                }
+            }
+
+            LOG.debug("streamLoadPutImpl set context: cluster {}, 
setCurrentUserIdentity {}",
+                    ctx.getCloudCluster(), ctx.getCurrentUserIdentity());
+        }
+
         Env env = Env.getCurrentEnv();
         String fullDbName = request.getDb();
         Database db = env.getInternalCatalog().getDbNullable(fullDbName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index d8822633937..6e6268020c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -277,6 +277,18 @@ public class Backend implements Writable {
         this.backendStatus.isLoadDisabled = isLoadDisabled;
     }
 
+    public void setActive(boolean isActive) {
+        this.backendStatus.isActive = isActive;
+    }
+
+    public boolean isActive() {
+        return this.backendStatus.isActive;
+    }
+
+    public long getCurrentFragmentNum() {
+        return this.backendStatus.currentFragmentNum;
+    }
+
     // for test only
     public void updateOnce(int bePort, int httpPort, int beRpcPort) {
         if (this.bePort != bePort) {
@@ -779,6 +791,10 @@ public class Backend implements Writable {
                 this.lastStartTime = hbResponse.getBeStartTime();
                 isChanged = true;
             }
+
+            this.backendStatus.currentFragmentNum = 
hbResponse.getFragmentNum();
+            this.backendStatus.lastFragmentUpdateTime = 
hbResponse.getLastFragmentUpdateTime();
+
             heartbeatErrMsg = "";
             this.heartbeatFailureCounter = 0;
         } else {
@@ -834,6 +850,12 @@ public class Backend implements Writable {
         public volatile boolean isQueryDisabled = false;
         @SerializedName("isLoadDisabled")
         public volatile boolean isLoadDisabled = false;
+        @SerializedName("isActive")
+        public volatile boolean isActive = true;
+
+        // cloud mode, cloud control just query master, so not need 
SerializedName
+        public volatile long currentFragmentNum = 0;
+        public volatile long lastFragmentUpdateTime = 0;
 
         @Override
         public String toString() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
index 0b347f0cbb2..e009ab4abf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java
@@ -48,6 +48,8 @@ public class BackendHbResponse extends HeartbeatResponse 
implements Writable {
     private long beStartTime = 0;
     private String host;
     private String version = "";
+    private long fragmentNum;
+    private long lastFragmentUpdateTime;
     @SerializedName(value = "isShutDown")
     private boolean isShutDown = false;
 
@@ -56,7 +58,8 @@ public class BackendHbResponse extends HeartbeatResponse 
implements Writable {
     }
 
     public BackendHbResponse(long beId, int bePort, int httpPort, int 
brpcPort, long hbTime, long beStartTime,
-            String version, String nodeRole, boolean isShutDown, int 
arrowFlightSqlPort) {
+            String version, String nodeRole, long fragmentNum, long 
lastFragmentUpdateTime,
+            boolean isShutDown, int arrowFlightSqlPort) {
         super(HeartbeatResponse.Type.BACKEND);
         this.beId = beId;
         this.status = HbStatus.OK;
@@ -67,6 +70,8 @@ public class BackendHbResponse extends HeartbeatResponse 
implements Writable {
         this.beStartTime = beStartTime;
         this.version = version;
         this.nodeRole = nodeRole;
+        this.fragmentNum = fragmentNum;
+        this.lastFragmentUpdateTime = lastFragmentUpdateTime;
         this.isShutDown = isShutDown;
         this.arrowFlightSqlPort = arrowFlightSqlPort;
     }
@@ -86,6 +91,14 @@ public class BackendHbResponse extends HeartbeatResponse 
implements Writable {
         this.msg = errMsg;
     }
 
+    public long getFragmentNum() {
+        return fragmentNum;
+    }
+
+    public long getLastFragmentUpdateTime() {
+        return lastFragmentUpdateTime;
+    }
+
     public long getBeId() {
         return beId;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 2c766221acb..dfb1ffeaae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -20,12 +20,12 @@ package org.apache.doris.system;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.thrift.TStorageMedium;
 
-import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -172,7 +172,7 @@ public class BeSelectionPolicy {
         return true;
     }
 
-    public List<Backend> getCandidateBackends(ImmutableCollection<Backend> 
backends) {
+    public List<Backend> getCandidateBackends(Collection<Backend> backends) {
         List<Backend> filterBackends = 
backends.stream().filter(this::isMatch).collect(Collectors.toList());
         List<Backend> preLocationFilterBackends = filterBackends.stream()
                 .filter(iterm -> 
preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 5d17846476f..e09c7f2b991 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -268,12 +268,17 @@ public class HeartbeatMgr extends MasterDaemon {
                     if (tBackendInfo.isSetBeNodeRole()) {
                         nodeRole = tBackendInfo.getBeNodeRole();
                     }
+
+                    long fragmentNum = 
tBackendInfo.getFragmentExecutingCount();
+                    long lastFragmentUpdateTime = 
tBackendInfo.getFragmentLastActiveTime();
+
                     boolean isShutDown = false;
                     if (tBackendInfo.isSetIsShutdown()) {
                         isShutDown = tBackendInfo.isIsShutdown();
                     }
                     return new BackendHbResponse(backendId, bePort, httpPort, 
brpcPort,
-                            System.currentTimeMillis(), beStartTime, version, 
nodeRole, isShutDown, arrowFlightSqlPort);
+                            System.currentTimeMillis(), beStartTime, version, 
nodeRole,
+                            fragmentNum, lastFragmentUpdateTime, isShutDown, 
arrowFlightSqlPort);
                 } else {
                     return new BackendHbResponse(backendId, backend.getHost(),
                             result.getStatus().getErrorMsgs().isEmpty()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index d31e786c384..891a6dc36fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -993,12 +993,35 @@ public class SystemInfoService {
         return bes.stream().filter(b -> 
b.getLocationTag().equals(tag)).collect(Collectors.toList());
     }
 
+    // CloudSystemInfoService override
+    public List<Backend> getBackendsByCurrentCluster() throws UserException {
+        return idToBackendRef.values().stream().collect(Collectors.toList());
+    }
+
+    // CloudSystemInfoService override
+    public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() 
throws UserException {
+        return getIdToBackend();
+    }
+
     public int getMinPipelineExecutorSize() {
-        if (idToBackendRef.size() == 0) {
+        if (Config.isCloudMode() && ConnectContext.get() != null
+                && 
Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster(false))) {
+            return 1;
+        }
+        List<Backend> currentBackends = null;
+        try {
+            currentBackends = getBackendsByCurrentCluster();
+        } catch (UserException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get current cluster backends failed: ", e);
+            }
+            return 1;
+        }
+        if (currentBackends.size() == 0) {
             return 1;
         }
         int minPipelineExecutorSize = Integer.MAX_VALUE;
-        for (Backend be : idToBackendRef.values()) {
+        for (Backend be : currentBackends) {
             int size = be.getPipelineExecutorSize();
             if (size > 0) {
                 minPipelineExecutorSize = Math.min(minPipelineExecutorSize, 
size);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 156dae72234..07e5b725aff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -217,7 +217,7 @@ public class RoutineLoadManagerTest {
         try {
             routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", 
"table");
             Assert.fail();
-        } catch (DdlException e) {
+        } catch (UserException e) {
             LOG.info(e.getMessage());
         }
     }
@@ -225,7 +225,7 @@ public class RoutineLoadManagerTest {
     @Test
     public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext 
connectContext,
                                                    @Mocked Env env,
-                                                   @Mocked EditLog editLog) 
throws DdlException {
+                                                   @Mocked EditLog editLog) 
throws UserException {
         String jobName = "job1";
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";;
@@ -761,7 +761,7 @@ public class RoutineLoadManagerTest {
 
     @Test
     public void testCheckBeToTask(@Mocked Env env,
-                                  @Mocked SystemInfoService systemInfoService) 
throws LoadException, DdlException {
+                                  @Mocked SystemInfoService systemInfoService) 
throws UserException {
         List<Long> beIdsInCluster = Lists.newArrayList();
         beIdsInCluster.add(1L);
         Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index f5994f89a89..0391c7c602c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -21,10 +21,10 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.load.RoutineLoadDesc;
@@ -128,7 +128,7 @@ public class RoutineLoadSchedulerTest {
 
     public void functionTest(@Mocked Env env, @Mocked InternalCatalog catalog,
             @Mocked SystemInfoService systemInfoService, @Injectable Database 
database)
-            throws DdlException, InterruptedException {
+            throws UserException, InterruptedException {
         new Expectations() {
             {
                 minTimes = 0;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index e933c0df17c..0ac72df4a6d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -98,7 +98,7 @@ public class SystemInfoServiceTest {
         System.out.println(Env.getCurrentEnvJournalVersion());
 
         BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 
1234, 1234, 1234, 1234, "test",
-                Tag.VALUE_COMPUTATION, false, 1234);
+                Tag.VALUE_COMPUTATION, 10, 100, false, 1234);
 
         // Write objects to file
         File file1 = new File("./BackendHbResponseSerialization");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index f359d6e6a9e..c9e3c50d998 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -201,7 +201,8 @@ public class DemoMultiBackendsTest {
         Assert.assertEquals("{\"location\" : \"default\"}",
                 result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() 
- 6));
         Assert.assertEquals(
-                
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
+                
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,"
+                        + 
"\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}",
                 result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() 
- 3));
         Assert.assertEquals("0", 
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 2));
         Assert.assertEquals(Tag.VALUE_MIX, 
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1));
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 376e2a34df9..127852ae6b4 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -68,6 +68,8 @@ struct TRoutineLoadTask {
     15: optional PaloInternalService.TPipelineFragmentParams pipeline_params
     16: optional bool is_multi_table
     17: optional bool memtable_on_sink_node;
+    18: optional string qualified_user
+    19: optional string cloud_cluster
 }
 
 struct TKafkaMetaProxyRequest {
diff --git 
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy 
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 07aced68f60..3a26e153be5 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -18,5 +18,18 @@
 testGroups = "p0"
 //exclude groups and exclude suites is more prior than include groups and 
include suites.
 excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compaction_uniq_keys_cluster_key,test_set_partit
 [...]
-excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv,backup_restore,cold_heat_separation,storage_medium_p0"
+
+excludeDirectories = """
+    cloud/multi_cluster,
+    workload_manager_p1,
+    nereids_rules_p0/subquery,
+    unique_with_mow_p0/cluster_key,
+    unique_with_mow_p0/ssb_unique_sql_zstd_cluster,
+    unique_with_mow_p0/ssb_unique_load_zstd_c,
+    nereids_rules_p0/mv,
+    backup_restore,
+    cold_heat_separation,
+    storage_medium_p0
+"""
+
 max_failure_num = 200
diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy 
b/regression-test/pipeline/p0/conf/regression-conf.groovy
index b99e21c4e7c..d8de4fd5059 100644
--- a/regression-test/pipeline/p0/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p0/conf/regression-conf.groovy
@@ -59,7 +59,11 @@ excludeGroups = ""
 excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external"
 
 // this directories will not be executed
-excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery"
+excludeDirectories = """
+    cloud,
+    nereids_rules_p0/subquery,
+    workload_manager_p1
+"""
 
 customConf1 = "test_custom_conf_value"
 
diff --git a/regression-test/plugins/plugin_cluster.groovy 
b/regression-test/plugins/plugin_cluster.groovy
new file mode 100644
index 00000000000..15237e29aa8
--- /dev/null
+++ b/regression-test/plugins/plugin_cluster.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.Suite
+
+Suite.metaClass.add_cluster = { be_unique_id, ip, port, cluster_name, 
cluster_id ->
+    def jsonOutput = new JsonOutput()
+    def s3 = [
+                     type: 'COMPUTE',
+                     cluster_name : cluster_name,
+                     cluster_id : cluster_id,
+                     nodes: [
+                         [
+                             cloud_unique_id: be_unique_id,
+                             ip: ip,
+                             heartbeat_port: port
+                         ],
+                     ]
+                 ]
+    def map = [instance_id: "${instance_id}", cluster: s3]
+    def js = jsonOutput.toJson(map)
+    log.info("add cluster req: ${js} ".toString())
+
+    def add_cluster_api = { request_body, check_func ->
+        httpTest {
+                endpoint context.config.metaServiceHttpAddress
+                uri "/MetaService/http/add_cluster?token=${token}"
+                body request_body
+                check check_func
+        }
+    }
+
+    add_cluster_api.call(js) {
+            respCode, body ->
+                log.info("add cluster resp: ${body} ${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase('OK') || 
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+    }
+}
+
+Suite.metaClass.get_cluster = { be_unique_id ->
+    def jsonOutput = new JsonOutput()
+    def map = [instance_id: "${instance_id}", cloud_unique_id: 
"${be_unique_id}" ]
+    def js = jsonOutput.toJson(map)
+    log.info("get cluster req: ${js} ".toString())
+
+    def add_cluster_api = { request_body, check_func ->
+        httpTest {
+            endpoint context.config.metaServiceHttpAddress
+            uri "/MetaService/http/get_cluster?token=${token}"
+            body request_body
+            check check_func
+        }
+    }
+
+    def json
+    add_cluster_api.call(js) {
+        respCode, body ->
+            log.info("get cluster resp: ${body} ${respCode}".toString())
+            json = parseJson(body)
+            assertTrue(json.code.equalsIgnoreCase('OK') || 
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+    }
+    json.result.cluster
+}
+
+Suite.metaClass.drop_cluster = { cluster_name, cluster_id ->
+    def jsonOutput = new JsonOutput()
+    def reqBody = [
+                 type: 'COMPUTE',
+                 cluster_name : cluster_name,
+                 cluster_id : cluster_id,
+                 nodes: [
+                 ]
+             ]
+    def map = [instance_id: "${instance_id}", cluster: reqBody]
+    def js = jsonOutput.toJson(map)
+    log.info("drop cluster req: ${js} ".toString())
+
+    def drop_cluster_api = { request_body, check_func ->
+        httpTest {
+            endpoint context.config.metaServiceHttpAddress
+            uri "/MetaService/http/drop_cluster?token=${token}"
+            body request_body
+            check check_func
+        }
+    }
+
+    drop_cluster_api.call(js) {
+        respCode, body ->
+            log.info("dorp cluster resp: ${body} ${respCode}".toString())
+            def json = parseJson(body)
+            assertTrue(json.code.equalsIgnoreCase('OK') || 
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+    }
+}
+
+Suite.metaClass.add_node = { be_unique_id, ip, port, cluster_name, cluster_id 
->
+    def jsonOutput = new JsonOutput()
+    def clusterInfo = [
+                 type: 'COMPUTE',
+                 cluster_name : cluster_name,
+                 cluster_id : cluster_id,
+                 nodes: [
+                     [
+                         cloud_unique_id: be_unique_id,
+                         ip: ip,
+                         heartbeat_port: port
+                     ],
+                 ]
+             ]
+    def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+    def js = jsonOutput.toJson(map)
+    log.info("add node req: ${js} ".toString())
+
+    def add_cluster_api = { request_body, check_func ->
+        httpTest {
+            endpoint context.config.metaServiceHttpAddress
+            uri "/MetaService/http/add_node?token=${token}"
+            body request_body
+            check check_func
+        }
+    }
+
+    add_cluster_api.call(js) {
+        respCode, body ->
+            log.info("add node resp: ${body} ${respCode}".toString())
+            def json = parseJson(body)
+            assertTrue(json.code.equalsIgnoreCase('OK') || 
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+    }
+}
+
+Suite.metaClass.d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
+    def jsonOutput = new JsonOutput()
+    def clusterInfo = [
+                 type: 'COMPUTE',
+                 cluster_name : cluster_name,
+                 cluster_id : cluster_id,
+                 nodes: [
+                     [
+                         cloud_unique_id: be_unique_id,
+                         ip: ip,
+                         heartbeat_port: port
+                     ],
+                 ]
+             ]
+    def map = [instance_id: "${instance_id}", cluster: clusterInfo]
+    def js = jsonOutput.toJson(map)
+    log.info("decommission node req: ${js} ".toString())
+
+    def d_cluster_api = { request_body, check_func ->
+        httpTest {
+            endpoint context.config.metaServiceHttpAddress
+            uri "/MetaService/http/decommission_node?token=${token}"
+            body request_body
+            check check_func
+        }
+    }
+
+    d_cluster_api.call(js) {
+        respCode, body ->
+            log.info("decommission node resp: ${body} ${respCode}".toString())
+            def json = parseJson(body)
+            assertTrue(json.code.equalsIgnoreCase('OK') || 
json.code.equalsIgnoreCase('ALREADY_EXISTED'))
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to