This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9e3a75b7484 [feature](cloud) multi cloud cluster (#31749) 9e3a75b7484 is described below commit 9e3a75b74847d867fce9991f04de4b190d747008 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sun Mar 17 23:54:03 2024 +0800 [feature](cloud) multi cloud cluster (#31749) --- be/src/io/fs/multi_table_pipe.cpp | 2 + .../routine_load/routine_load_task_executor.cpp | 6 + be/src/runtime/stream_load/stream_load_context.h | 4 + .../java/org/apache/doris/analysis/LoadStmt.java | 2 + .../apache/doris/analysis/NativeInsertStmt.java | 5 +- .../apache/doris/analysis/SetUserPropertyVar.java | 11 ++ .../main/java/org/apache/doris/catalog/Env.java | 4 +- .../java/org/apache/doris/catalog/EnvFactory.java | 23 +++ .../java/org/apache/doris/catalog/OlapTable.java | 19 +++ .../main/java/org/apache/doris/catalog/Table.java | 1 + .../org/apache/doris/cloud/catalog/CloudEnv.java | 2 +- .../doris/cloud/catalog/CloudEnvFactory.java | 30 ++++ .../doris/cloud/load/CloudBrokerLoadJob.java | 181 +++++++++++++++++++++ .../apache/doris/cloud/load/CloudLoadManager.java | 49 ++++++ .../doris/cloud/load/CloudRoutineLoadManager.java | 66 ++++++++ .../cloud/planner/CloudGroupCommitPlanner.java | 81 +++++++++ .../apache/doris/cloud/qe/CloudCoordinator.java | 4 +- .../doris/cloud/system/CloudSystemInfoService.java | 43 ++++- .../transaction/CloudGlobalTransactionMgr.java | 2 +- .../java/org/apache/doris/common/ErrorCode.java | 5 +- .../apache/doris/datasource/ExternalScanNode.java | 3 +- .../doris/datasource/FederationBackendPolicy.java | 6 +- .../org/apache/doris/httpv2/rest/LoadAction.java | 139 ++++++++++++++-- .../doris/httpv2/rest/manager/ClusterAction.java | 37 +++++ .../apache/doris/load/loadv2/BrokerLoadJob.java | 24 ++- .../org/apache/doris/load/loadv2/JobState.java | 3 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 5 +- .../apache/doris/load/loadv2/LoadLoadingTask.java | 1 + .../org/apache/doris/load/loadv2/LoadTask.java | 23 +++ .../apache/doris/load/loadv2/MysqlLoadManager.java | 11 ++ .../doris/load/routineload/KafkaTaskInfo.java | 2 + .../doris/load/routineload/RoutineLoadJob.java | 64 ++++++++ .../doris/load/routineload/RoutineLoadManager.java | 5 +- .../org/apache/doris/mysql/privilege/Auth.java | 4 +- .../plans/commands/insert/GroupCommitInserter.java | 4 +- .../apache/doris/planner/GroupCommitPlanner.java | 59 ++++--- .../org/apache/doris/planner/OlapScanNode.java | 13 +- .../org/apache/doris/planner/OlapTableSink.java | 12 +- .../org/apache/doris/plugin/audit/AuditEvent.java | 7 + .../java/org/apache/doris/qe/AuditLogHelper.java | 4 + .../java/org/apache/doris/qe/ConnectContext.java | 24 ++- .../main/java/org/apache/doris/qe/Coordinator.java | 5 +- .../java/org/apache/doris/qe/MasterOpExecutor.java | 6 + .../java/org/apache/doris/qe/SessionVariable.java | 14 ++ .../java/org/apache/doris/qe/ShowExecutor.java | 7 +- .../java/org/apache/doris/qe/StmtExecutor.java | 179 +++++++++++++++++++- .../apache/doris/service/FrontendServiceImpl.java | 82 ++++++++++ .../main/java/org/apache/doris/system/Backend.java | 22 +++ .../org/apache/doris/system/BackendHbResponse.java | 15 +- .../org/apache/doris/system/BeSelectionPolicy.java | 4 +- .../java/org/apache/doris/system/HeartbeatMgr.java | 7 +- .../org/apache/doris/system/SystemInfoService.java | 27 ++- .../load/routineload/RoutineLoadManagerTest.java | 6 +- .../load/routineload/RoutineLoadSchedulerTest.java | 4 +- .../apache/doris/system/SystemInfoServiceTest.java | 2 +- .../doris/utframe/DemoMultiBackendsTest.java | 3 +- gensrc/thrift/BackendService.thrift | 2 + .../cloud_p0/conf/regression-conf-custom.groovy | 15 +- .../pipeline/p0/conf/regression-conf.groovy | 6 +- regression-test/plugins/plugin_cluster.groovy | 180 ++++++++++++++++++++ 60 files changed, 1476 insertions(+), 100 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 916f8151739..6b41bf6988b 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -165,6 +165,8 @@ Status MultiTablePipe::request_and_exec_plans() { request.fileType = TFileType::FILE_STREAM; request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); + request.__set_user(_ctx->qualified_user); + request.__set_cloud_cluster(_ctx->cloud_cluster); // no need to register new_load_stream_mgr coz it is already done in routineload submit task // plan this load diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d22f2bb4a8c..3e5eb48afbc 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -216,6 +216,12 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { if (task.__isset.memtable_on_sink_node) { ctx->memtable_on_sink_node = task.memtable_on_sink_node; } + if (task.__isset.qualified_user) { + ctx->qualified_user = task.qualified_user; + } + if (task.__isset.cloud_cluster) { + ctx->cloud_cluster = task.cloud_cluster; + } // set execute plan params (only for non-single-stream-multi-table load) TStreamLoadPutResult put_result; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 376228e022d..1461e863d5d 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -236,6 +236,10 @@ public: bool memtable_on_sink_node = false; + // use for cloud cluster mode + std::string qualified_user; + std::string cloud_cluster; + public: ExecEnv* exec_env() { return _exec_env; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 01a4490003b..19b72129516 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -125,6 +125,8 @@ public class LoadStmt extends DdlStmt { public static final String KEY_COMMENT = "comment"; + public static final String KEY_CLOUD_CLUSTER = "cloud_cluster"; + public static final String KEY_ENCLOSE = "enclose"; public static final String KEY_ESCAPE = "escape"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 193ce3abc80..3f367464441 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MysqlTable; @@ -1247,8 +1248,8 @@ public class NativeInsertStmt extends InsertStmt { this.analyzer = analyzerTmp; } analyzeSubquery(analyzer, true); - groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId, - ConnectContext.get().getSessionVariable().getGroupCommit()); + groupCommitPlanner = EnvFactory.getInstance().createGroupCommitPlanner((Database) db, olapTable, + targetColumnNames, queryId, ConnectContext.get().getSessionVariable().getGroupCommit()); // save plan message to be reused for prepare stmt loadId = queryId; baseSchemaVersion = olapTable.getBaseSchemaVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java index 37f3318a89e..a2f31818b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetUserPropertyVar.java @@ -18,7 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -82,6 +84,15 @@ public class SetUserPropertyVar extends SetVar { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "GRANT"); } + if (Config.isCloudMode()) { + // check value, clusterName is valid. + if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER) + && !Strings.isNullOrEmpty(value) + && !((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterNames().contains(value)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value); + } + } return; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index a2386b61392..1a93efa0c5a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -637,7 +637,7 @@ public class Env { public Env(boolean isCheckpointCatalog) { this.catalogMgr = new CatalogMgr(); this.load = new Load(); - this.routineLoadManager = new RoutineLoadManager(); + this.routineLoadManager = EnvFactory.getInstance().createRoutineLoadManager(); this.groupCommitManager = new GroupCommitManager(); this.sqlBlockRuleMgr = new SqlBlockRuleMgr(); this.exportMgr = new ExportMgr(); @@ -720,7 +720,7 @@ public class Env { Config.async_loading_load_task_pool_size, LoadTask.COMPARATOR, LoadTask.class, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); - this.loadManager = new LoadManager(loadJobScheduler); + this.loadManager = EnvFactory.getInstance().createLoadManager(loadJobScheduler); this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java index a286cf33cea..34e9d5ef93b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java @@ -24,10 +24,15 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.cloud.catalog.CloudEnvFactory; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.BrokerLoadJob; +import org.apache.doris.load.loadv2.LoadJobScheduler; +import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; @@ -39,10 +44,15 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.GlobalTransactionMgrIface; +import org.apache.thrift.TException; + import java.lang.reflect.Type; import java.util.List; import java.util.Map; +// EnvFactory is responsed for create none-cloud object. +// CloudEnvFactory is responsed for create cloud object. + public class EnvFactory { public EnvFactory() {} @@ -131,4 +141,17 @@ public class EnvFactory { String timezone, boolean loadZeroTolerance) { return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance); } + + public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, + TUniqueId queryId, String groupCommit) throws UserException, TException { + return new GroupCommitPlanner(db, table, targetColumnNames, queryId, groupCommit); + } + + public RoutineLoadManager createRoutineLoadManager() { + return new RoutineLoadManager(); + } + + public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) { + return new LoadManager(loadJobScheduler); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0130b78a9b9..463b3b1f19d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -476,6 +476,25 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return null; } + public List<Long> getAllTabletIds() { + List<Long> tabletIds = new ArrayList<>(); + try { + rwLock.readLock().lock(); + for (Partition partition : getPartitions()) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { + tabletIds.addAll(index.getTablets().stream() + .map(tablet -> tablet.getId()) + .collect(Collectors.toList())); + } + } + } catch (Exception e) { + LOG.warn("get all tablet ids failed {}", e.getMessage()); + } finally { + rwLock.readLock().unlock(); + } + return tabletIds; + } + public Map<Long, MaterializedIndexMeta> getVisibleIndexIdToMeta() { Map<Long, MaterializedIndexMeta> visibleMVs = Maps.newHashMap(); List<MaterializedIndex> mvs = getVisibleIndex(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 0c73a19c7d5..35f5b14efc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -87,6 +87,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { // especially to reduce conflicts when obtaining delete bitmap update locks for // MoW table protected ReentrantLock commitLock; + /* * fullSchema and nameToColumn should contains all columns, both visible and shadow. * eg. for OlapTable, when doing schema change, there will be some shadow columns which are not visible diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 00aef304cf3..19bd102c84c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -383,7 +383,7 @@ public class CloudEnv extends Env { public void changeCloudCluster(String clusterName, ConnectContext ctx) throws DdlException { checkCloudClusterPriv(clusterName); // TODO(merge-cloud): pick cloud auto start - // waitForAutoStart(clusterName); + CloudSystemInfoService.waitForAutoStart(clusterName); try { ((CloudSystemInfoService) Env.getCurrentSystemInfo()).addCloudCluster(clusterName, ""); } catch (UserException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java index 8990f2cd966..c7fb81fa6c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java @@ -21,9 +21,11 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EnvFactory; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; @@ -31,14 +33,22 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer; import org.apache.doris.cloud.datasource.CloudInternalCatalog; import org.apache.doris.cloud.load.CloudBrokerLoadJob; +import org.apache.doris.cloud.load.CloudLoadManager; +import org.apache.doris.cloud.load.CloudRoutineLoadManager; +import org.apache.doris.cloud.planner.CloudGroupCommitPlanner; import org.apache.doris.cloud.qe.CloudCoordinator; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.BrokerLoadJob; +import org.apache.doris.load.loadv2.LoadJobScheduler; +import org.apache.doris.load.loadv2.LoadManager; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; @@ -49,6 +59,8 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgrIface; +import org.apache.thrift.TException; + import java.lang.reflect.Type; import java.util.List; import java.util.Map; @@ -139,14 +151,32 @@ public class CloudEnvFactory extends EnvFactory { return new CloudBrokerLoadJob(); } + @Override public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, Planner planner, StatsErrorEstimator statsErrorEstimator) { return new CloudCoordinator(context, analyzer, planner, statsErrorEstimator); } + @Override public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments, List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance) { return new CloudCoordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance); } + + @Override + public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, + TUniqueId queryId, String groupCommit) throws UserException, TException { + return new CloudGroupCommitPlanner(db, table, targetColumnNames, queryId, groupCommit); + } + + @Override + public RoutineLoadManager createRoutineLoadManager() { + return new CloudRoutineLoadManager(); + } + + @Override + public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) { + return new CloudLoadManager(loadJobScheduler); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 52b52fd70ae..814e6020362 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -22,25 +22,39 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.loadv2.BrokerLoadJob; import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment; +import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadLoadingTask; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; public class CloudBrokerLoadJob extends BrokerLoadJob { private static final Logger LOG = LogManager.getLogger(CloudBrokerLoadJob.class); @@ -48,6 +62,8 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { protected static final String CLOUD_CLUSTER_ID = "clusterId"; protected String cloudClusterId; + private int retryTimes = 3; + public CloudBrokerLoadJob() { } @@ -96,6 +112,13 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { } } + // override BulkLoadJob.analyze + @Override + public void analyze() { + cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID); + super.analyze(); + } + @Override protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, BrokerPendingTaskAttachment attachment) @@ -127,4 +150,162 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { // in fe. So pint an edit log to save the status information of the job here. logFinalOperation(); } + + @Override + protected void afterLoadingTaskCommitTransaction(List<Table> tableList) { + ConnectContext ctx = null; + if (ConnectContext.get() == null) { + ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + } else { + ctx = ConnectContext.get(); + } + + if (ctx.getSessionVariable().enableMultiClusterSyncLoad()) { + // get the backends of each cluster expect the load cluster + CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo(); + List<List<Backend>> backendsList = infoService.getCloudClusterIds() + .stream() + .filter(id -> !id.equals(cloudClusterId)) + .map(id -> infoService.getBackendsByClusterId(id)) + .collect(Collectors.toList()); + // for each all load table, get its tablets + tableList.forEach(table -> { + List<Long> allTabletIds = ((OlapTable) table).getAllTabletIds(); + StmtExecutor.syncLoadForTablets(backendsList, allTabletIds); + }); + } + } + + @Override + public void onTaskFailed(long taskId, FailMsg failMsg) { + if (Strings.isNullOrEmpty(this.cloudClusterId)) { + super.onTaskFailed(taskId, failMsg); + return; + } + + try { + writeLock(); + if (isTxnDone()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("label", label) + .add("transactionId", transactionId) + .add("state", state) + .add("error_msg", "this task will be ignored when job is: " + state) + .build()); + return; + } + LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) + .add("label", label) + .add("transactionId", transactionId) + .add("state", state) + .add("retryTimes", retryTimes) + .add("failMsg", failMsg.getMsg()) + .build()); + + this.retryTimes--; + if (this.retryTimes <= 0) { + boolean abortTxn = this.transactionId > 0 ? true : false; + unprotectedExecuteCancel(failMsg, abortTxn); + logFinalOperation(); + return; + } else { + unprotectedExecuteRetry(failMsg); + } + } finally { + writeUnlock(); + } + + boolean allTaskDone = false; + while (!allTaskDone) { + try { + writeLock(); + // check if all task has been done + // unprotectedExecuteRetry() will cancel all running task + allTaskDone = true; + for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) { + if (entry.getKey() != taskId && !entry.getValue().isDone()) { + LOG.info("LoadTask({}) has not been done", entry.getKey()); + allTaskDone = false; + } + } + } finally { + writeUnlock(); + } + if (!allTaskDone) { + try { + Thread.sleep(1000); + continue; + } catch (InterruptedException e) { + LOG.warn("", e); + } + } + } + + try { + writeLock(); + this.state = JobState.PENDING; + this.idToTasks.clear(); + this.failMsg = null; + this.finishedTaskIds.clear(); + Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); + LoadTask task = createPendingTask(); + // retry default backoff 60 seconds, because `be restart` is slow + task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000); + idToTasks.put(task.getSignature(), task); + Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task); + } finally { + writeUnlock(); + } + } + + protected void unprotectedExecuteRetry(FailMsg failMsg) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", transactionId) + .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()).build()); + + // get load ids of all loading tasks, we will cancel their coordinator process later + List<TUniqueId> loadIds = Lists.newArrayList(); + for (LoadTask loadTask : idToTasks.values()) { + if (loadTask instanceof LoadLoadingTask) { + loadIds.add(((LoadLoadingTask) loadTask).getLoadId()); + } + } + + // set failMsg and state + this.failMsg = failMsg; + if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) { + // for bug fix, see LoadManager's fixLoadJobMetaBugs() method + finishTimestamp = createTimestamp; + } else { + finishTimestamp = System.currentTimeMillis(); + } + + // remove callback before abortTransaction(), so that the afterAborted() callback will not be called again + Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); + // abort txn by label, because transactionId here maybe -1 + try { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id) + .add("label", label) + .add("msg", "begin to abort txn") + .build()); + Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, label, failMsg.getMsg()); + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("label", label) + .add("error_msg", "failed to abort txn when job is cancelled. " + e.getMessage()) + .build()); + } + + // cancel all running coordinators, so that the scheduler's worker thread will be released + for (TUniqueId loadId : loadIds) { + Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); + if (coordinator != null) { + coordinator.cancel(); + } + } + + // change state + state = JobState.RETRY; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java new file mode 100644 index 00000000000..1d0bfc23f6c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.load; + +import org.apache.doris.analysis.InsertStmt; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.loadv2.LoadJobScheduler; +import org.apache.doris.load.loadv2.LoadManager; + +public class CloudLoadManager extends LoadManager { + + public CloudLoadManager(LoadJobScheduler loadJobScheduler) { + super(loadJobScheduler); + } + + @Override + public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException { + CloudSystemInfoService.waitForAutoStartCurrentCluster(); + + return super.createLoadJobFromStmt(stmt); + } + + @Override + public long createLoadJobFromStmt(InsertStmt stmt) throws DdlException { + CloudSystemInfoService.waitForAutoStartCurrentCluster(); + + return super.createLoadJobFromStmt(stmt); + } + +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java new file mode 100644 index 00000000000..fd10a3bc467 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudRoutineLoadManager.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.load; + +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.stream.Collectors; + +public class CloudRoutineLoadManager extends RoutineLoadManager { + private static final Logger LOG = LogManager.getLogger(CloudRoutineLoadManager.class); + + @Override + public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName) + throws UserException { + if (!Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) { + routineLoadJob.setCloudCluster(ConnectContext.get().getCloudCluster()); + } else { + throw new UserException("cloud cluster is empty, please specify cloud cluster"); + } + super.addRoutineLoadJob(routineLoadJob, dbName, tableName); + } + + @Override + protected List<Long> getAvailableBackendIds(long jobId) throws LoadException { + RoutineLoadJob routineLoadJob = getJob(jobId); + String cloudClusterId = routineLoadJob.getCloudClusterId(); + if (Strings.isNullOrEmpty(cloudClusterId)) { + LOG.warn("cluster id is empty"); + throw new LoadException("cluster id is empty"); + } + + return ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterId(cloudClusterId) + .stream() + .filter(Backend::isAlive) + .map(Backend::getId) + .collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java new file mode 100644 index 00000000000..8480055e8b9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudGroupCommitPlanner.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.planner; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.GroupCommitPlanner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class CloudGroupCommitPlanner extends GroupCommitPlanner { + private static final Logger LOG = LogManager.getLogger(CloudGroupCommitPlanner.class); + + public CloudGroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId, + String groupCommit) + throws UserException, TException { + super(db, table, targetColumnNames, queryId, groupCommit); + } + + @Override + protected void selectBackends(ConnectContext ctx) throws DdlException { + backend = ctx.getInsertGroupCommit(this.table.getId()); + if (backend != null && backend.isAlive() && !backend.isDecommissioned() + && backend.getCloudClusterName().equals(ctx.getCloudCluster())) { + return; + } + + String cluster = ctx.getCloudCluster(); + if (Strings.isNullOrEmpty(cluster)) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); + } + + // select be + List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) + .values().stream().collect(Collectors.toList()); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + this.backend = backend; + ctx.setInsertGroupCommit(this.table.getId(), backend); + LOG.debug("choose new be {}", backend.getId()); + return; + } + } + + throw new DdlException("No suitable backend for cloud cluster=" + cluster); + } + +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java index 02f1f906b3b..2ce8950c12f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java @@ -61,7 +61,7 @@ public class CloudCoordinator extends Coordinator { } @Override - protected void prepare() throws Exception { + protected void prepare() throws UserException { String cluster = null; ConnectContext context = ConnectContext.get(); if (context != null) { @@ -96,7 +96,7 @@ public class CloudCoordinator extends Coordinator { LOG.warn("no available backends, idToBackend {}", idToBackend); String clusterName = ConnectContext.get() != null ? ConnectContext.get().getCloudCluster() : "ctx empty cant get clusterName"; - throw new Exception("no available backends, the cluster maybe not be set or been dropped clusterName = " + throw new UserException("no available backends, the cluster maybe not be set or been dropped clusterName = " + clusterName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index ae43da4b244..6e7ca874f6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -19,6 +19,7 @@ package org.apache.doris.cloud.system; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.ClusterPB; import org.apache.doris.cloud.proto.Cloud.InstanceInfoPB; @@ -30,6 +31,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -310,6 +312,33 @@ public class CloudSystemInfoService extends SystemInfoService { return clusterNameToId.containsKey(clusterName); } + @Override + public List<Backend> getBackendsByCurrentCluster() throws UserException { + ConnectContext ctx = ConnectContext.get(); + if (ctx == null) { + throw new UserException("connect context is null"); + } + + String cluster = ctx.getCurrentCloudCluster(); + if (Strings.isNullOrEmpty(cluster)) { + throw new UserException("cluster name is empty"); + } + + ((CloudEnv) Env.getCurrentEnv()).checkCloudClusterPriv(cluster); + + return getBackendsByClusterName(cluster); + } + + @Override + public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() throws UserException { + List<Backend> backends = getBackendsByCurrentCluster(); + Map<Long, Backend> idToBackend = Maps.newHashMap(); + for (Backend be : backends) { + idToBackend.put(be.getId(), be); + } + return ImmutableMap.copyOf(idToBackend); + } + public List<Backend> getBackendsByClusterName(final String clusterName) { String clusterId = clusterNameToId.getOrDefault(clusterName, ""); if (clusterId.isEmpty()) { @@ -540,9 +569,19 @@ public class CloudSystemInfoService extends SystemInfoService { this.instanceStatus = instanceStatus; } + public static void waitForAutoStartCurrentCluster() throws DdlException { + ConnectContext context = ConnectContext.get(); + if (context != null) { + String cloudCluster = context.getCloudCluster(); + if (!Strings.isNullOrEmpty(cloudCluster)) { + waitForAutoStart(cloudCluster); + } + } + } + public static void waitForAutoStart(final String clusterName) throws DdlException { - // TODO: merge from cloud. - throw new DdlException("Env.waitForAutoStart unimplemented"); + // TODO(merge-cloud): merge from cloud. + // throw new DdlException("Env.waitForAutoStart unimplemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index a48af1c4979..bf8cc592c20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -990,7 +990,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List<Long> tableIds) throws UserException { - throw new UserException(NOT_SUPPORTED_MSG); + //throw new UserException(NOT_SUPPORTED_MSG); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index 1f9ff87647d..183c733097b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1213,7 +1213,10 @@ public enum ErrorCode { ERR_CLOUD_CLUSTER_ERROR(5098, new byte[]{'4', '2', '0', '0', '0'}, "Cluster %s not exist, use SQL 'SHOW CLUSTERS' to get a valid cluster"), - ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No cluster selected"); + ERR_NO_CLUSTER_ERROR(5099, new byte[]{'4', '2', '0', '0', '0'}, "No cluster selected"), + + ERR_NOT_CLOUD_MODE(6000, new byte[]{'4', '2', '0', '0', '0'}, + "Command only support in cloud mode."); // This is error code private final int code; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java index aa616372184..d41fab5916c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java @@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode { protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableFileCache) - ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); + ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) + : new FederationBackendPolicy(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 73a49bb24a8..e3e3405a1b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -71,7 +71,7 @@ import java.util.stream.Collectors; public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); - private final List<Backend> backends = Lists.newArrayList(); + protected final List<Backend> backends = Lists.newArrayList(); private final Map<String, List<Backend>> backendMap = Maps.newHashMap(); public Map<Backend, Long> getAssignedWeightPerBackend() { @@ -80,7 +80,7 @@ public class FederationBackendPolicy { private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap(); - private ConsistentHash<Split, Backend> consistentHash; + protected ConsistentHash<Split, Backend> consistentHash; private int nextBe = 0; private boolean initialized = false; @@ -184,7 +184,7 @@ public class FederationBackendPolicy { } public void init(BeSelectionPolicy policy) throws UserException { - backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values())); + backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getBackendsByCurrentCluster())); if (backends.isEmpty()) { throw new UserException("No available backends"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 6be5654a2ea..2b37cf997d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -20,9 +20,12 @@ package org.apache.doris.httpv2.rest; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; @@ -30,6 +33,7 @@ import org.apache.doris.httpv2.exception.UnauthorizedException; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.resource.Tag; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; @@ -39,6 +43,7 @@ import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; import io.netty.handler.codec.http.HttpHeaderNames; +import org.apache.commons.validator.routines.InetAddressValidator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -50,8 +55,11 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.view.RedirectView; import java.net.URI; +import java.security.SecureRandom; import java.util.List; +import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -149,8 +157,7 @@ public class LoadAction extends RestBaseController { } String label = request.getHeader(LABEL_KEY); - TNetworkAddress redirectAddr; - redirectAddr = selectRedirectBackend(groupCommit); + TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -228,6 +235,7 @@ public class LoadAction extends RestBaseController { // we return error by using RestBaseResult. private Object executeWithoutPassword(HttpServletRequest request, HttpServletResponse response, String db, String table, boolean isStreamLoad, boolean groupCommit) { + String label = null; try { String dbName = db; String tableName = table; @@ -246,11 +254,7 @@ public class LoadAction extends RestBaseController { String fullDbName = dbName; - String label = request.getParameter(LABEL_KEY); - if (isStreamLoad) { - label = request.getHeader(LABEL_KEY); - } - + label = isStreamLoad ? request.getHeader(LABEL_KEY) : request.getParameter(LABEL_KEY); if (!isStreamLoad && Strings.isNullOrEmpty(label)) { // for stream load, the label can be generated by system automatically return new RestBaseResult("No label selected."); @@ -273,7 +277,7 @@ public class LoadAction extends RestBaseController { return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(groupCommit); + redirectAddr = selectRedirectBackend(request, groupCommit); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -282,6 +286,8 @@ public class LoadAction extends RestBaseController { RedirectView redirectView = redirectTo(request, redirectAddr); return redirectView; } catch (Exception e) { + LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, err: {}", + isStreamLoad, db, table, label, e.getMessage()); return new RestBaseResult(e.getMessage()); } } @@ -304,7 +310,7 @@ public class LoadAction extends RestBaseController { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -322,7 +328,35 @@ public class LoadAction extends RestBaseController { return index; } - private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException { + private String getCloudClusterName(HttpServletRequest request) { + String cloudClusterName = request.getHeader(SessionVariable.CLOUD_CLUSTER); + if (!Strings.isNullOrEmpty(cloudClusterName)) { + return cloudClusterName; + } + + cloudClusterName = ConnectContext.get().getCloudCluster(); + if (!Strings.isNullOrEmpty(cloudClusterName)) { + return cloudClusterName; + } + + return ""; + } + + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit) + throws LoadException { + if (Config.isCloudMode()) { + String cloudClusterName = getCloudClusterName(request); + if (Strings.isNullOrEmpty(cloudClusterName)) { + throw new LoadException("No cloud cluster name selected."); + } + String reqHostStr = request.getHeader(HttpHeaderNames.HOST.toString()); + return selectCloudRedirectBackend(cloudClusterName, reqHostStr, groupCommit); + } else { + return selectLocalRedirectBackend(groupCommit); + } + } + + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -358,6 +392,89 @@ public class LoadAction extends RestBaseController { return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } + private TNetworkAddress selectCloudRedirectBackend(String clusterName, String reqHostStr, boolean groupCommit) + throws LoadException { + List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterName(clusterName) + .stream().filter(be -> be.isAlive() && (!groupCommit || groupCommit && !be.isDecommissioned())) + .collect(Collectors.toList()); + + if (backends.isEmpty()) { + LOG.warn("No available backend for stream load redirect, cluster name {}", clusterName); + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", cluster: " + clusterName); + } + + Random rand = new SecureRandom(); + int randomIndex = rand.nextInt(backends.size()); + Backend backend = backends.get(randomIndex); + + Pair<String, Integer> publicHostPort = null; + Pair<String, Integer> privateHostPort = null; + try { + if (!Strings.isNullOrEmpty(backend.getCloudPublicEndpoint())) { + publicHostPort = splitHostAndPort(backend.getCloudPublicEndpoint()); + } + } catch (AnalysisException e) { + throw new LoadException(e.getMessage()); + } + + try { + if (!Strings.isNullOrEmpty(backend.getCloudPrivateEndpoint())) { + privateHostPort = splitHostAndPort(backend.getCloudPrivateEndpoint()); + } + } catch (AnalysisException e) { + throw new LoadException(e.getMessage()); + } + + reqHostStr = reqHostStr.replaceAll("\\s+", ""); + if (reqHostStr.isEmpty()) { + LOG.info("Invalid header host: {}", reqHostStr); + throw new LoadException("Invalid header host: " + reqHostStr); + } + + String reqHost = ""; + String[] pair = reqHostStr.split(":"); + if (pair.length == 1) { + reqHost = pair[0]; + } else if (pair.length == 2) { + reqHost = pair[0]; + } else { + LOG.info("Invalid header host: {}", reqHostStr); + throw new LoadException("Invalid header host: " + reqHost); + } + + if (InetAddressValidator.getInstance().isValid(reqHost) + && publicHostPort != null && reqHost == publicHostPort.first) { + return new TNetworkAddress(publicHostPort.first, publicHostPort.second); + } else if (privateHostPort != null) { + return new TNetworkAddress(reqHost, privateHostPort.second); + } else { + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } + } + + private Pair<String, Integer> splitHostAndPort(String hostPort) throws AnalysisException { + hostPort = hostPort.replaceAll("\\s+", ""); + if (hostPort.isEmpty()) { + LOG.info("empty endpoint"); + throw new AnalysisException("empty endpoint: " + hostPort); + } + + String[] pair = hostPort.split(":"); + if (pair.length != 2) { + LOG.info("Invalid endpoint: {}", hostPort); + throw new AnalysisException("Invalid endpoint: " + hostPort); + } + + int port = Integer.parseInt(pair[1]); + if (port <= 0 || port >= 65536) { + LOG.info("Invalid endpoint port: {}", pair[1]); + throw new AnalysisException("Invalid endpoint port: " + pair[1]); + } + + return Pair.of(pair[0], port); + } + // NOTE: This function can only be used for AuditlogPlugin stream load for now. // AuditlogPlugin should be re-disigned carefully, and blow method focuses on // temporarily addressing the users' needs for audit logs. @@ -410,7 +527,7 @@ public class LoadAction extends RestBaseController { return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false); LOG.info("Redirect load action with auth token to destination={}," + "stream: {}, db: {}, tbl: {}, label: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java index ee09946f4fc..a5c915e0bbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/ClusterAction.java @@ -18,6 +18,7 @@ package org.apache.doris.httpv2.rest.manager; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.util.NetUtils; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; @@ -73,4 +74,40 @@ public class ClusterAction extends RestBaseController { .collect(Collectors.toList())); return ResponseEntityBuilder.ok(result); } + + public static class BeClusterInfo { + public volatile String host; + public volatile int heartbeatPort; + public volatile int bePort; + public volatile int httpPort; + public volatile int brpcPort; + public volatile long currentFragmentNum = 0; + public volatile long lastFragmentUpdateTime = 0; + } + + @RequestMapping(path = "/cluster_info/cloud_cluster_status", method = RequestMethod.GET) + public Object cloudClusterInfo(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); + + // Key: cluster_name Value: be status + Map<String, List<BeClusterInfo>> result = Maps.newHashMap(); + + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterIdToBackend() + .forEach((clusterId, backends) -> { + List<BeClusterInfo> bis = backends.stream().map(backend -> { + BeClusterInfo bi = new BeClusterInfo(); + bi.host = backend.getHost(); + bi.heartbeatPort = backend.getHeartbeatPort(); + bi.bePort = backend.getBePort(); + bi.httpPort = backend.getHttpPort(); + bi.brpcPort = backend.getBrpcPort(); + bi.currentFragmentNum = backend.getBackendStatus().currentFragmentNum; + bi.lastFragmentUpdateTime = backend.getBackendStatus().lastFragmentUpdateTime; + return bi; }).collect(Collectors.toList()); + result.put(clusterId, bis); + }); + + return ResponseEntityBuilder.ok(result); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index abf0500b90e..52007d86239 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -118,12 +118,15 @@ public class BrokerLoadJob extends BulkLoadJob { @Override protected void unprotectedExecuteJob() { - LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), - brokerDesc, getPriority()); + LoadTask task = createPendingTask(); idToTasks.put(task.getSignature(), task); Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task); } + protected LoadTask createPendingTask() { + return new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc, getPriority()); + } + /** * Situation1: When attachment is instance of BrokerPendingTaskAttachment, * this method is called by broker pending task. @@ -312,7 +315,11 @@ public class BrokerLoadJob extends BulkLoadJob { try { db = getDb(); tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); - MetaLockUtils.writeLockTablesOrMetaException(tableList); + if (Config.isCloudMode()) { + MetaLockUtils.commitLockTables(tableList); + } else { + MetaLockUtils.writeLockTablesOrMetaException(tableList); + } } catch (MetaNotFoundException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("database_id", dbId) @@ -330,6 +337,7 @@ public class BrokerLoadJob extends BulkLoadJob { dbId, tableList, transactionId, commitInfos, new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, finishTimestamp, state, failMsg)); + afterLoadingTaskCommitTransaction(tableList); } catch (UserException e) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("database_id", dbId) @@ -337,10 +345,18 @@ public class BrokerLoadJob extends BulkLoadJob { .build(), e); cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); } finally { - MetaLockUtils.writeUnlockTables(tableList); + if (Config.isCloudMode()) { + MetaLockUtils.commitUnlockTables(tableList); + } else { + MetaLockUtils.writeUnlockTables(tableList); + } } } + // cloud override + protected void afterLoadingTaskCommitTransaction(List<Table> tableList) { + } + private void writeProfile() { if (!enableProfile) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java index e023b686c0c..10151694947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -25,7 +25,8 @@ public enum JobState { LOADING, // job is running COMMITTED, // transaction is committed but not visible FINISHED, // transaction is visible and job is finished - CANCELLED; // transaction is aborted and job is cancelled + CANCELLED, // transaction is aborted and job is cancelled + RETRY; public boolean isFinalState() { return this == FINISHED || this == CANCELLED; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index c15904be8ef..55b61b1e981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -250,9 +250,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements */ abstract Set<String> getTableNames() throws MetaNotFoundException; - // return true if the corresponding transaction is done(COMMITTED, FINISHED, CANCELLED) + // return true if the corresponding transaction is done(COMMITTED, FINISHED, CANCELLED, RETRY) public boolean isTxnDone() { - return state == JobState.COMMITTED || state == JobState.FINISHED || state == JobState.CANCELLED; + return state == JobState.COMMITTED || state == JobState.FINISHED + || state == JobState.CANCELLED || state == JobState.RETRY; } // return true if job is done(FINISHED/CANCELLED/UNKNOWN) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index eef73542ed4..8c56547ff8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -130,6 +130,7 @@ public class LoadLoadingTask extends LoadTask { protected void executeTask() throws Exception { LOG.info("begin to execute loading task. load id: {} job id: {}. db: {}, tbl: {}. left retry: {}", DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime); + retryTime--; beginTime = System.currentTimeMillis(); if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index d6789805e67..06991d5fb52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LogBuilder; @@ -69,6 +70,8 @@ public abstract class LoadTask extends MasterTask { protected TaskAttachment attachment; protected FailMsg failMsg = new FailMsg(); protected int retryTime = 1; + private volatile boolean done = false; + protected long startTimeMs = 0; protected final Priority priority; public LoadTask(LoadTaskCallback callback, TaskType taskType, Priority priority) { @@ -82,6 +85,17 @@ public abstract class LoadTask extends MasterTask { protected void exec() { boolean isFinished = false; try { + if (Config.isCloudMode()) { + while (startTimeMs > System.currentTimeMillis()) { + try { + Thread.sleep(1000); + LOG.info("LoadTask:{} backoff startTimeMs:{} now:{}", + signature, startTimeMs, System.currentTimeMillis()); + } catch (InterruptedException e) { + LOG.info("ignore InterruptedException: ", e); + } + } + } // execute pending task executeTask(); // callback on pending task finished @@ -100,6 +114,7 @@ public abstract class LoadTask extends MasterTask { // callback on pending task failed callback.onTaskFailed(signature, failMsg); } + done = true; } } @@ -131,6 +146,14 @@ public abstract class LoadTask extends MasterTask { return taskType; } + public boolean isDone() { + return done; + } + + public void setStartTimeMs(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + public int getPriorityValue() { return this.priority.value; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index 5c46ed5d862..adbfc27fca2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -41,6 +41,7 @@ import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.google.common.collect.EvictingQueue; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -428,6 +429,16 @@ public class MysqlLoadManager { httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames); } } + + // cloud cluster + if (Config.isCloudMode()) { + String clusterName = ConnectContext.get().getCloudCluster(); + if (Strings.isNullOrEmpty(clusterName)) { + throw new LoadException("cloud cluster is empty"); + } + httpPut.addHeader(LoadStmt.KEY_CLOUD_CLUSTER, clusterName); + } + httpPut.setEntity(entity); return httpPut; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index d8b79d9bdce..477dc02955f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -110,6 +110,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tRoutineLoadTask.setFormat(TFileFormatType.FORMAT_CSV_PLAIN); } tRoutineLoadTask.setMemtableOnSinkNode(routineLoadJob.isMemtableOnSinkNode()); + tRoutineLoadTask.setQualifiedUser(routineLoadJob.getQualifiedUser()); + tRoutineLoadTask.setCloudCluster(routineLoadJob.getCloudCluster()); return tRoutineLoadTask; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index f9be2014e30..b0911f00634 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -262,10 +263,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected boolean isTypeRead = false; + private String cloudClusterId; + protected byte enclose = 0; protected byte escape = 0; + // use for cloud cluster mode + protected String qualifiedUser; + protected String cloudCluster; + public void setTypeRead(boolean isTypeRead) { this.isTypeRead = isTypeRead; } @@ -314,6 +321,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); this.memtableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; + this.qualifiedUser = ConnectContext.get().getQualifiedUser(); + this.cloudCluster = ConnectContext.get().getCloudCluster(); } else { sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } @@ -720,6 +729,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl this.comment = comment; } + public String getQualifiedUser() { + return qualifiedUser; + } + + public String getCloudCluster() { + return cloudCluster; + } + public int getSizeOfRoutineLoadTaskInfoList() { readLock(); try { @@ -916,8 +933,28 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl Preconditions.checkNotNull(planner); Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP); + boolean needCleanCtx = false; table.readLock(); try { + if (Config.isCloudMode()) { + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getClusterNameByClusterId(cloudClusterId); + if (Strings.isNullOrEmpty(clusterName)) { + String err = String.format("cluster name is empty, cluster id is %s", cloudClusterId); + LOG.warn(err); + throw new UserException(err); + } + + if (ConnectContext.get() == null) { + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + ctx.setCloudCluster(clusterName); + needCleanCtx = true; + } else { + ConnectContext.get().setCloudCluster(clusterName); + } + } + TExecPlanFragmentParams planParams = planner.plan(loadId); // add table indexes to transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId); @@ -931,6 +968,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return planParams; } finally { + if (needCleanCtx) { + ConnectContext.remove(); + } table.readUnlock(); } } @@ -1486,6 +1526,24 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl this.origStmt = origStmt; } + public void setCloudCluster(String cloudClusterName) throws UserException { + if (Strings.isNullOrEmpty(cloudClusterName)) { + LOG.warn("cluster name is empty"); + throw new UserException("cluster name is empty"); + } + + this.cloudClusterId = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getCloudClusterIdByName(cloudClusterName); + if (Strings.isNullOrEmpty(this.cloudClusterId)) { + LOG.warn("cluster id is empty, cluster name {}", cloudClusterName); + throw new UserException("cluster id is empty, cluster name: " + cloudClusterName); + } + } + + public String getCloudClusterId() { + return cloudClusterId; + } + // check the correctness of commit info protected abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment, TransactionState txnState, @@ -1797,6 +1855,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl out.writeBoolean(true); userIdentity.write(out); } + if (Config.isCloudMode()) { + Text.writeString(out, cloudClusterId); + } Text.writeString(out, comment); } @@ -1889,6 +1950,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl userIdentity = UserIdentity.UNKNOWN; } } + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_123 && Config.isCloudMode()) { + cloudClusterId = Text.readString(in); + } if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_117) { comment = Text.readString(in); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 13a86f5d173..522d320e912 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -153,6 +153,7 @@ public class RoutineLoadManager implements Writable { } + // cloud override public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt) throws UserException { // check load auth @@ -184,7 +185,7 @@ public class RoutineLoadManager implements Writable { } public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName) - throws DdlException { + throws UserException { writeLock(); try { // check if db.routineLoadName has been used @@ -542,7 +543,7 @@ public class RoutineLoadManager implements Writable { * @return * @throws LoadException */ - private List<Long> getAvailableBackendIds(long jobId) throws LoadException { + protected List<Long> getAvailableBackendIds(long jobId) throws LoadException { RoutineLoadJob job = getJob(jobId); if (job == null) { throw new LoadException("job " + jobId + " does not exist"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index f84e8bb4c90..43f93df71e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -1818,7 +1818,7 @@ public class Auth implements Writable { return sb.toString(); } - // ====== CLOUD ====== + // ====== BEGIN CLOUD ====== public List<String> getCloudClusterUsers(String clusterName) { return propertyMgr.getCloudClusterUsers(userManager.getAllUsers(), clusterName); } @@ -1844,5 +1844,5 @@ public class Auth implements Writable { } return cluster; } - // ====== CLOUD ====== + // ====== END CLOUD ====== } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java index 4926d5486dc..655a37c7554 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -111,7 +112,8 @@ public class GroupCommitInserter { for (List<Expr> list : materializedConstExprLists) { rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize)); } - GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), + GroupCommitPlanner groupCommitPlanner = EnvFactory.getInstance().createGroupCommitPlanner( + physicalOlapTableSink.getDatabase(), physicalOlapTableSink.getTargetTable(), null, ctx.queryId(), ConnectContext.get().getSessionVariable().getGroupCommit()); PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index c8e7ef6c4e2..e3c4bf0ecec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -72,10 +72,10 @@ import java.util.stream.Collectors; public class GroupCommitPlanner { private static final Logger LOG = LogManager.getLogger(GroupCommitPlanner.class); - private Database db; - private OlapTable table; - private TUniqueId loadId; - private Backend backend; + protected Database db; + protected OlapTable table; + protected TUniqueId loadId; + protected Backend backend; private TExecPlanFragmentParamsList paramsList; private ByteString execPlanFragmentParamsBytes; @@ -131,29 +131,8 @@ public class GroupCommitPlanner { public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, List<InternalService.PDataRow> rows) throws DdlException, RpcException, ExecutionException, InterruptedException { - backend = ctx.getInsertGroupCommit(this.table.getId()); - if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { - List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (allBackendIds.isEmpty()) { - throw new DdlException("No alive backend"); - } - Collections.shuffle(allBackendIds); - boolean find = false; - for (Long beId : allBackendIds) { - backend = Env.getCurrentSystemInfo().getBackend(beId); - if (!backend.isDecommissioned()) { - ctx.setInsertGroupCommit(this.table.getId(), backend); - find = true; - if (LOG.isDebugEnabled()) { - LOG.debug("choose new be {}", backend.getId()); - } - break; - } - } - if (!find) { - throw new DdlException("No suitable backend"); - } - } + selectBackends(ctx); + PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() .setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder() .setRequest(execPlanFragmentParamsBytes) @@ -166,6 +145,32 @@ public class GroupCommitPlanner { return future.get(); } + // cloud override + protected void selectBackends(ConnectContext ctx) throws DdlException { + backend = ctx.getInsertGroupCommit(this.table.getId()); + if (backend != null && backend.isAlive() && !backend.isDecommissioned()) { + return; + } + + List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new DdlException("No alive backend"); + } + Collections.shuffle(allBackendIds); + for (Long beId : allBackendIds) { + backend = Env.getCurrentSystemInfo().getBackend(beId); + if (!backend.isDecommissioned()) { + ctx.setInsertGroupCommit(this.table.getId(), backend); + if (LOG.isDebugEnabled()) { + LOG.debug("choose new be {}", backend.getId()); + } + return; + } + } + + throw new DdlException("No suitable backend"); + } + // only for nereids use public static InternalService.PDataRow getRowStringValue(List<Expr> cols, int filterSize) throws UserException { if (cols.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 29243a4abeb..7ee563b6783 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -894,12 +894,15 @@ public class OlapScanNode extends ScanNode { LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(), replica.getId()); } - errs.add("replica " + replica.getId() + "'s backend " + replica.getBackendId() - + " does not exist or not alive"); - errs.add(" or you may not have permission to access the current cluster"); - if (ConnectContext.get() != null && Config.isCloudMode()) { - errs.add("clusterName=" + ConnectContext.get().getCloudCluster()); + String err = "replica " + replica.getId() + "'s backend " + replica.getBackendId() + + " does not exist or not alive"; + if (Config.isCloudMode()) { + err += ", or you may not have permission to access the current cluster"; + if (ConnectContext.get() != null) { + err += " clusterName=" + ConnectContext.get().getCloudCluster(); + } } + errs.add(err); continue; } if (!backend.isMixNode()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index d8c27dd32e5..19c6e7e1d5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -518,11 +518,13 @@ public class OlapTableSink extends DataSink { Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap(); if (bePathsMap.keySet().size() < loadRequiredReplicaNum) { String errMsg = "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() - + " < quorum replica num " + loadRequiredReplicaNum - + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]"; - errMsg += " or you may not have permission to access the current cluster"; - if (ConnectContext.get() != null && Config.isCloudMode()) { - errMsg += " clusterName=" + ConnectContext.get().getCloudCluster(); + + " < load required replica num " + loadRequiredReplicaNum + + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]"; + if (Config.isCloudMode()) { + errMsg += ", or you may not have permission to access the current cluster"; + if (ConnectContext.get() != null) { + errMsg += " clusterName=" + ConnectContext.get().getCloudCluster(); + } } throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java index c8915966725..6a5fe19fcc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditEvent.java @@ -98,6 +98,8 @@ public class AuditEvent { public long peakMemoryBytes = -1; @AuditField(value = "SqlDigest") public String sqlDigest = ""; + @AuditField(value = "cloudClusterName") + public String cloudClusterName = ""; @AuditField(value = "TraceId") public String traceId = ""; @AuditField(value = "WorkloadGroup") @@ -149,6 +151,11 @@ public class AuditEvent { return this; } + public AuditEventBuilder setCloudCluster(String cloudClusterName) { + auditEvent.cloudClusterName = cloudClusterName; + return this; + } + public AuditEventBuilder setState(String state) { auditEvent.state = state; return this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 9311b4ca8e8..fa8b1bdaf3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -32,6 +32,7 @@ import org.apache.doris.plugin.audit.AuditEvent.EventType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.FrontendOptions; +import com.google.common.base.Strings; import org.apache.commons.codec.digest.DigestUtils; public class AuditLogHelper { @@ -44,6 +45,8 @@ public class AuditLogHelper { long elapseMs = endTime - ctx.getStartTime(); CatalogIf catalog = ctx.getCurrentCatalog(); + String cluster = Config.isCloudMode() ? ctx.getCloudCluster(false) : ""; + AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); auditEventBuilder.reset(); auditEventBuilder @@ -66,6 +69,7 @@ public class AuditLogHelper { .setReturnRows(ctx.getReturnRows()) .setStmtId(ctx.getStmtId()) .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())) + .setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" : cluster) .setWorkloadGroup(ctx.getWorkloadGroupName()) .setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 7e0bd962cf4..4ef75173786 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1066,6 +1066,10 @@ public class ConnectContext { this.cloudCluster = cluster; } + public String getCloudCluster() { + return getCloudCluster(true); + } + /** * @return Returns an available cluster in the following order * 1 Use an explicitly specified cluster @@ -1073,7 +1077,11 @@ public class ConnectContext { * 3 If the user does not have a default cluster, select a cluster with permissions for the user * Returns null when there is no available cluster */ - public String getCloudCluster() { + public String getCloudCluster(boolean updateErr) { + if (!Config.isCloudMode()) { + return null; + } + String cluster = null; if (!Strings.isNullOrEmpty(this.cloudCluster)) { cluster = this.cloudCluster; @@ -1091,11 +1099,13 @@ public class ConnectContext { if (Strings.isNullOrEmpty(cluster)) { LOG.warn("cant get a valid cluster for user {} to use", getCurrentUserIdentity()); - getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR, - "Cant get a Valid cluster for you to use, plz connect admin"); + if (updateErr) { + getState().setError(ErrorCode.ERR_NO_CLUSTER_ERROR, + "Cant get a Valid cluster for you to use, plz connect admin"); + } } else { this.cloudCluster = cluster; - LOG.info("finally set context cluster name {}", cloudCluster); + LOG.info("finally set context cluster name {} for user {}", cloudCluster, getCurrentUserIdentity()); } return cluster; @@ -1103,6 +1113,12 @@ public class ConnectContext { // TODO implement this function public String getDefaultCloudCluster() { + List<String> cloudClusterNames = ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudClusterNames(); + String defaultCluster = Env.getCurrentEnv().getAuth().getDefaultCloudCluster(getQualifiedUser()); + if (!Strings.isNullOrEmpty(defaultCluster) && cloudClusterNames.contains(defaultCluster)) { + return defaultCluster; + } + return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7a213e47862..b1d591bbdd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -508,7 +508,7 @@ public class Coordinator implements CoordInterface { } // Initialize - protected void prepare() throws Exception { + protected void prepare() throws UserException { for (PlanFragment fragment : fragments) { fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment)); } @@ -524,7 +524,8 @@ public class Coordinator implements CoordInterface { coordAddress = new TNetworkAddress(localIP, Config.rpc_port); - this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); + this.idToBackend = Env.getCurrentSystemInfo().getBackendsWithIdByCurrentCluster(); + if (LOG.isDebugEnabled()) { int backendNum = idToBackend.size(); StringBuilder backendInfos = new StringBuilder("backends info:"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 40c126b732d..f0cd170d267 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.FrontendService; @@ -28,6 +29,7 @@ import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -159,6 +161,10 @@ public class MasterOpExecutor { params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + if (Config.isCloudMode() && !Strings.isNullOrEmpty(ctx.getCloudCluster())) { + params.setCloudCluster(ctx.getCloudCluster()); + } + // query options params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); // session variables diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 0304000a1f0..8bfd0d7f1a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -254,6 +254,8 @@ public class SessionVariable implements Serializable, Writable { public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit"; + public static final String CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD = "enable_multi_cluster_sync_load"; + public static final String ENABLE_PARALLEL_OUTFILE = "enable_parallel_outfile"; public static final String SQL_QUOTE_SHOW_CREATE = "sql_quote_show_create"; @@ -773,6 +775,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC) public String fragmentTransmissionCompressionCodec = "none"; + // whether sync load to other cluster + @VariableMgr.VarAttr(name = CLOUD_ENABLE_MULTI_CLUSTER_SYNC_LOAD, needForward = true) + public static boolean cloudEnableMultiClusterSyncLoad = false; + /* * the parallel exec instance num for one Fragment in one BE * 1 means disable this feature @@ -1969,6 +1975,14 @@ public class SessionVariable implements Serializable, Writable { return autoCommit; } + public boolean enableMultiClusterSyncLoad() { + return cloudEnableMultiClusterSyncLoad; + } + + public void setEnableMultiClusterSyncLoad(boolean cloudEnableMultiClusterSyncLoad) { + this.cloudEnableMultiClusterSyncLoad = cloudEnableMultiClusterSyncLoad; + } + public boolean isTxReadonly() { return txReadonly; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 6008dc32c41..53ff62edb68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -744,7 +744,12 @@ public class ShowExecutor { } // Show clusters - private void handleShowCluster() { + private void handleShowCluster() throws AnalysisException { + if (!Config.isCloudMode()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE); + return; + } + final ShowClusterStmt showStmt = (ShowClusterStmt) stmt; final List<List<String>> rows = Lists.newArrayList(); List<String> clusterNames = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index d88663ad6fc..72073e1e847 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -53,6 +53,7 @@ import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.ReplacePartitionClause; import org.apache.doris.analysis.ReplaceTableClause; +import org.apache.doris.analysis.ResourceTypeEnum; import org.apache.doris.analysis.SelectStmt; import org.apache.doris.analysis.SetOperationStmt; import org.apache.doris.analysis.SetStmt; @@ -90,8 +91,11 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; import org.apache.doris.cloud.analysis.UseCloudClusterStmt; import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.proto.Cloud.ClusterStatus; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuditLog; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -165,17 +169,22 @@ import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; +import org.apache.doris.thrift.BackendService.Client; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnBeginRequest; import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TMergeType; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TSyncLoadForTabletsRequest; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TabletCommitInfo; @@ -210,6 +219,8 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; // Do one COM_QUERY process. @@ -221,6 +232,7 @@ public class StmtExecutor { private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); public static final int MAX_DATA_TO_SEND_FOR_TXN = 100; public static final String NULL_VALUE_FOR_LOAD = "\\N"; + private Pattern beIpPattern = Pattern.compile("\\[(\\d+):"); private final Object writeProfileLock = new Object(); private ConnectContext context; private final StatementContext statementContext; @@ -476,7 +488,34 @@ public class StmtExecutor { public void execute() throws Exception { UUID uuid = UUID.randomUUID(); TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - execute(queryId); + TUniqueId firstQueryId = queryId; + int retryTime = Config.max_query_retry_time; + for (int i = 1; i <= retryTime; i++) { + try { + execute(queryId); + return; + } catch (UserException e) { + if (!e.getMessage().contains("E-230") || i == retryTime) { + throw e; + } + TUniqueId lastQueryId = queryId; + uuid = UUID.randomUUID(); + queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + int randomMillis = 10 + (int) (Math.random() * 10); + if (i > retryTime / 2) { + randomMillis = 20 + (int) (Math.random() * 10); + } + if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) { + randomMillis = 1000; + } + LOG.warn("receive E-230 tried={} first queryId={} last queryId={} new queryId={} sleep={}ms", + i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId), + DebugUtil.printId(queryId), randomMillis); + Thread.sleep(randomMillis); + } catch (Exception e) { + throw e; + } + } } public boolean notAllowFallback() { @@ -718,14 +757,62 @@ public class StmtExecutor { AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); context.setQueryId(newQueryId); + if (Config.isCloudMode()) { + // sleep random millis [1000, 1500] ms + // in the begining of retryTime/2 + int randomMillis = 1000 + (int) (Math.random() * (1000 - 500)); + LOG.debug("stmt executor retry times {}, wait randomMillis:{}, stmt:{}", + i, randomMillis, originStmt.originStmt); + try { + if (i > retryTime / 2) { + // sleep random millis [2000, 2500] ms + // in the ending of retryTime/2 + randomMillis = 2000 + (int) (Math.random() * (1000 - 500)); + } + Thread.sleep(randomMillis); + } catch (InterruptedException e) { + LOG.info("stmt executor sleep wait InterruptedException: ", e); + } + } } if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { context.setReturnResultFromLocal(false); } handleQueryStmt(); break; - } catch (RpcException e) { - if (i == retryTime - 1) { + } catch (RpcException | UserException e) { + // cloud mode retry + LOG.debug("due to exception {} retry {} rpc {} user {}", + e.getMessage(), i, e instanceof RpcException, e instanceof UserException); + // errCode = 2, detailMessage = There is no scanNode Backend available.[10003: not alive] + List<String> bes = Env.getCurrentSystemInfo().getAllBackendIds().stream() + .map(id -> Long.toString(id)).collect(Collectors.toList()); + String msg = e.getMessage(); + boolean isNeedRetry = true; + if (e instanceof UserException + && msg.contains(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG)) { + isNeedRetry = false; + Matcher matcher = beIpPattern.matcher(msg); + // here retry planner not be recreated, so + // in cloud mode drop node, be id invalid, so need not retry + // such as be ids [11000, 11001] -> after drop node 11001 + // don't need to retry 11001's request + if (matcher.find()) { + String notAliveBe = matcher.group(1); + isNeedRetry = bes.contains(notAliveBe); + if (isNeedRetry) { + Backend abnormalBe = Env.getCurrentSystemInfo().getBackend(Long.parseLong(notAliveBe)); + String deadCloudClusterStatus = abnormalBe.getCloudClusterStatus(); + String deadCloudClusterClusterName = abnormalBe.getCloudClusterName(); + LOG.info("need retry cluster {} status {}-{}", deadCloudClusterClusterName, + deadCloudClusterStatus, ClusterStatus.valueOf(deadCloudClusterStatus)); + if (ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) { + CloudSystemInfoService.waitForAutoStart(deadCloudClusterClusterName); + } + } + } + } + if (i == retryTime - 1 || !isNeedRetry) { throw e; } if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) { @@ -1008,6 +1095,14 @@ public class StmtExecutor { } } + private boolean hasCloudClusterPriv() { + if (ConnectContext.get() == null || Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster())) { + return false; + } + return Env.getCurrentEnv().getAuth().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(), + ConnectContext.get().getCloudCluster(), PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER); + } + // Analyze one statement to structure in memory. public void analyze(TQueryOptions tQueryOptions) throws UserException, InterruptedException { if (LOG.isDebugEnabled()) { @@ -1136,6 +1231,27 @@ public class StmtExecutor { resetAnalyzerAndStmt(); } } catch (UserException e) { + // cloud mode retry, when retry need check this user has cloud cluster auth. + // if user doesn't have cloud cluster auth, don't retry, just return. + if (Config.isCloudMode() + && (e.getMessage().contains(SystemInfoService.NOT_USING_VALID_CLUSTER_MSG) + || e.getMessage().contains("backend -1")) + && hasCloudClusterPriv()) { + LOG.debug("cloud mode analyzeAndGenerateQueryPlan retry times {}", i); + // sleep random millis [500, 1000] ms + int randomMillis = 500 + (int) (Math.random() * (1000 - 500)); + try { + if (i > analyzeTimes / 2) { + randomMillis = 1000 + (int) (Math.random() * (1000 - 500)); + } + Thread.sleep(randomMillis); + } catch (InterruptedException ie) { + LOG.info("stmt executor sleep wait InterruptedException: ", ie); + } + if (i < analyzeTimes) { + continue; + } + } throw e; } catch (Exception e) { LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); @@ -2101,6 +2217,23 @@ public class StmtExecutor { txnStatus = TransactionStatus.COMMITTED; } + // TODO(meiyi) + // insertStmt.afterFinishTxn(true); + if (Config.isCloudMode()) { + String clusterName = context.getCloudCluster(); + if (context.getSessionVariable().enableMultiClusterSyncLoad() + && clusterName != null && !clusterName.isEmpty()) { + CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo(); + List<List<Backend>> backendsList = infoService + .getCloudClusterNames() + .stream() + .filter(name -> !name.equals(clusterName)) + .map(name -> infoService.getBackendsByClusterName(name)) + .collect(Collectors.toList()); + List<Long> allTabletIds = ((OlapTable) insertStmt.getTargetTable()).getAllTabletIds(); + syncLoadForTablets(backendsList, allTabletIds); + } + } } catch (Throwable t) { // if any throwable being thrown during insert operation, first we should abort this txn LOG.warn("handle insert stmt fail: {}", label, t); @@ -2173,6 +2306,41 @@ public class StmtExecutor { context.updateReturnRows((int) loadedRows); } + public static void syncLoadForTablets(List<List<Backend>> backendsList, List<Long> allTabletIds) { + backendsList.forEach(backends -> backends.forEach(backend -> { + if (backend.isAlive()) { + List<Long> tabletIdList = new ArrayList<Long>(); + Set<Long> beTabletIds = null; + // TODO(merge-cloud): need implements cloud rebalancer, otherwise raise beTabletIds NPE + //Set<Long> beTabletIds = Env.getCurrentEnv() + // .getCloudTabletRebalancer() + // .getSnapshotTabletsByBeId(backend.getId()); + allTabletIds.forEach(tabletId -> { + if (beTabletIds.contains(tabletId)) { + tabletIdList.add(tabletId); + } + }); + boolean ok = false; + TNetworkAddress address = null; + Client client = null; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + client.syncLoadForTablets(new TSyncLoadForTabletsRequest(allTabletIds)); + ok = true; + } catch (Exception e) { + LOG.warn(e.getMessage()); + } finally { + if (!ok) { + ClientPool.backendPool.invalidateObject(address, client); + } else { + ClientPool.backendPool.returnObject(address, client); + } + } + } + })); + } + private void handleExternalInsertStmt() { // TODO(tsy): load refactor, handle external load here try { @@ -2257,6 +2425,11 @@ public class StmtExecutor { } private void handleUseCloudClusterStmt() throws AnalysisException { + if (!Config.isCloudMode()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE); + return; + } + UseCloudClusterStmt useCloudClusterStmt = (UseCloudClusterStmt) parsedStmt; try { ((CloudEnv) context.getEnv()).changeCloudCluster(useCloudClusterStmt.getCluster(), context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 36d24ddfc4d..d2ce1a7194a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -44,6 +44,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.planner.CloudStreamLoadPlanner; import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; @@ -988,6 +989,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { ConnectContext context = new ConnectContext(null, true); // Set current connected FE to the client address, so that we can know where this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); + if (Config.isCloudMode() && !Strings.isNullOrEmpty(params.getCloudCluster())) { + context.setCloudCluster(params.getCloudCluster()); + } ConnectProcessor processor = null; if (context.getConnectType().equals(ConnectType.MYSQL)) { @@ -1941,6 +1945,41 @@ public class FrontendServiceImpl implements FrontendService.Iface { TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); List<String> tableNames = request.getTableNames(); + + if (Config.isCloudMode()) { + try { + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + ctx.setQualifiedUser(request.getUser()); + ctx.setRemoteIP(request.getUserIp()); + String userName = ClusterNamespace.getNameFromFullName(request.getUser()); + if (userName != null) { + List<UserIdentity> currentUser = Lists.newArrayList(); + try { + Env.getCurrentEnv().getAuth().checkPlainPassword(userName, + request.getUserIp(), request.getPasswd(), currentUser); + } catch (AuthenticationException e) { + throw new UserException(e.formatErrMsg()); + } + Preconditions.checkState(currentUser.size() == 1); + ctx.setCurrentUserIdentity(currentUser.get(0)); + } + LOG.info("one stream multi table load use cloud cluster {}", request.getCloudCluster()); + //ctx.setCloudCluster(); + if (!Strings.isNullOrEmpty(request.getCloudCluster())) { + if (Strings.isNullOrEmpty(request.getUser())) { + ctx.setCloudCluster(request.getCloudCluster()); + } else { + ((CloudEnv) Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx); + } + } + } catch (UserException e) { + LOG.warn("failed to set ConnectContext info: {}", e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } + } + try { if (CollectionUtils.isEmpty(tableNames)) { throw new MetaNotFoundException("table not found"); @@ -2101,6 +2140,49 @@ public class FrontendServiceImpl implements FrontendService.Iface { private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result) throws UserException { + if (request.isSetAuthCode()) { + String clientAddr = getClientAddrAsString(); + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(clientAddr); + long backendId = request.getBackendId(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Preconditions.checkNotNull(backend); + ctx.setCloudCluster(backend.getCloudClusterName()); + LOG.info("streamLoadPutImpl set context: cluster {}", ctx.getCloudCluster()); + } else if (Config.isCloudMode()) { + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + ctx.setQualifiedUser(request.getUser()); + ctx.setRemoteIP(request.getUserIp()); + String userName = ClusterNamespace.getNameFromFullName(request.getUser()); + if (userName != null) { + List<UserIdentity> currentUser = Lists.newArrayList(); + try { + Env.getCurrentEnv().getAuth().checkPlainPassword(userName, + request.getUserIp(), request.getPasswd(), currentUser); + } catch (AuthenticationException e) { + throw new UserException(e.formatErrMsg()); + } + Preconditions.checkState(currentUser.size() == 1); + ctx.setCurrentUserIdentity(currentUser.get(0)); + } + + LOG.info("stream load use cloud cluster {}", request.getCloudCluster()); + if (!Strings.isNullOrEmpty(request.getCloudCluster())) { + if (Strings.isNullOrEmpty(request.getUser())) { + // mysql load + ctx.setCloudCluster(request.getCloudCluster()); + } else { + // stream load + ((CloudEnv) Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx); + } + } + + LOG.debug("streamLoadPutImpl set context: cluster {}, setCurrentUserIdentity {}", + ctx.getCloudCluster(), ctx.getCurrentUserIdentity()); + } + Env env = Env.getCurrentEnv(); String fullDbName = request.getDb(); Database db = env.getInternalCatalog().getDbNullable(fullDbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index d8822633937..6e6268020c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -277,6 +277,18 @@ public class Backend implements Writable { this.backendStatus.isLoadDisabled = isLoadDisabled; } + public void setActive(boolean isActive) { + this.backendStatus.isActive = isActive; + } + + public boolean isActive() { + return this.backendStatus.isActive; + } + + public long getCurrentFragmentNum() { + return this.backendStatus.currentFragmentNum; + } + // for test only public void updateOnce(int bePort, int httpPort, int beRpcPort) { if (this.bePort != bePort) { @@ -779,6 +791,10 @@ public class Backend implements Writable { this.lastStartTime = hbResponse.getBeStartTime(); isChanged = true; } + + this.backendStatus.currentFragmentNum = hbResponse.getFragmentNum(); + this.backendStatus.lastFragmentUpdateTime = hbResponse.getLastFragmentUpdateTime(); + heartbeatErrMsg = ""; this.heartbeatFailureCounter = 0; } else { @@ -834,6 +850,12 @@ public class Backend implements Writable { public volatile boolean isQueryDisabled = false; @SerializedName("isLoadDisabled") public volatile boolean isLoadDisabled = false; + @SerializedName("isActive") + public volatile boolean isActive = true; + + // cloud mode, cloud control just query master, so not need SerializedName + public volatile long currentFragmentNum = 0; + public volatile long lastFragmentUpdateTime = 0; @Override public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java index 0b347f0cbb2..e009ab4abf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BackendHbResponse.java @@ -48,6 +48,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { private long beStartTime = 0; private String host; private String version = ""; + private long fragmentNum; + private long lastFragmentUpdateTime; @SerializedName(value = "isShutDown") private boolean isShutDown = false; @@ -56,7 +58,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { } public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort, long hbTime, long beStartTime, - String version, String nodeRole, boolean isShutDown, int arrowFlightSqlPort) { + String version, String nodeRole, long fragmentNum, long lastFragmentUpdateTime, + boolean isShutDown, int arrowFlightSqlPort) { super(HeartbeatResponse.Type.BACKEND); this.beId = beId; this.status = HbStatus.OK; @@ -67,6 +70,8 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { this.beStartTime = beStartTime; this.version = version; this.nodeRole = nodeRole; + this.fragmentNum = fragmentNum; + this.lastFragmentUpdateTime = lastFragmentUpdateTime; this.isShutDown = isShutDown; this.arrowFlightSqlPort = arrowFlightSqlPort; } @@ -86,6 +91,14 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable { this.msg = errMsg; } + public long getFragmentNum() { + return fragmentNum; + } + + public long getLastFragmentUpdateTime() { + return lastFragmentUpdateTime; + } + public long getBeId() { return beId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 2c766221acb..dfb1ffeaae0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -20,12 +20,12 @@ package org.apache.doris.system; import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; -import com.google.common.collect.ImmutableCollection; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; @@ -172,7 +172,7 @@ public class BeSelectionPolicy { return true; } - public List<Backend> getCandidateBackends(ImmutableCollection<Backend> backends) { + public List<Backend> getCandidateBackends(Collection<Backend> backends) { List<Backend> filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList()); List<Backend> preLocationFilterBackends = filterBackends.stream() .filter(iterm -> preferredLocations.contains(iterm.getHost())).collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 5d17846476f..e09c7f2b991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -268,12 +268,17 @@ public class HeartbeatMgr extends MasterDaemon { if (tBackendInfo.isSetBeNodeRole()) { nodeRole = tBackendInfo.getBeNodeRole(); } + + long fragmentNum = tBackendInfo.getFragmentExecutingCount(); + long lastFragmentUpdateTime = tBackendInfo.getFragmentLastActiveTime(); + boolean isShutDown = false; if (tBackendInfo.isSetIsShutdown()) { isShutDown = tBackendInfo.isIsShutdown(); } return new BackendHbResponse(backendId, bePort, httpPort, brpcPort, - System.currentTimeMillis(), beStartTime, version, nodeRole, isShutDown, arrowFlightSqlPort); + System.currentTimeMillis(), beStartTime, version, nodeRole, + fragmentNum, lastFragmentUpdateTime, isShutDown, arrowFlightSqlPort); } else { return new BackendHbResponse(backendId, backend.getHost(), result.getStatus().getErrorMsgs().isEmpty() diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index d31e786c384..891a6dc36fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -993,12 +993,35 @@ public class SystemInfoService { return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList()); } + // CloudSystemInfoService override + public List<Backend> getBackendsByCurrentCluster() throws UserException { + return idToBackendRef.values().stream().collect(Collectors.toList()); + } + + // CloudSystemInfoService override + public ImmutableMap<Long, Backend> getBackendsWithIdByCurrentCluster() throws UserException { + return getIdToBackend(); + } + public int getMinPipelineExecutorSize() { - if (idToBackendRef.size() == 0) { + if (Config.isCloudMode() && ConnectContext.get() != null + && Strings.isNullOrEmpty(ConnectContext.get().getCloudCluster(false))) { + return 1; + } + List<Backend> currentBackends = null; + try { + currentBackends = getBackendsByCurrentCluster(); + } catch (UserException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("get current cluster backends failed: ", e); + } + return 1; + } + if (currentBackends.size() == 0) { return 1; } int minPipelineExecutorSize = Integer.MAX_VALUE; - for (Backend be : idToBackendRef.values()) { + for (Backend be : currentBackends) { int size = be.getPipelineExecutorSize(); if (size > 0) { minPipelineExecutorSize = Math.min(minPipelineExecutorSize, size); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 156dae72234..07e5b725aff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -217,7 +217,7 @@ public class RoutineLoadManagerTest { try { routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table"); Assert.fail(); - } catch (DdlException e) { + } catch (UserException e) { LOG.info(e.getMessage()); } } @@ -225,7 +225,7 @@ public class RoutineLoadManagerTest { @Test public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectContext, @Mocked Env env, - @Mocked EditLog editLog) throws DdlException { + @Mocked EditLog editLog) throws UserException { String jobName = "job1"; String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080";; @@ -761,7 +761,7 @@ public class RoutineLoadManagerTest { @Test public void testCheckBeToTask(@Mocked Env env, - @Mocked SystemInfoService systemInfoService) throws LoadException, DdlException { + @Mocked SystemInfoService systemInfoService) throws UserException { List<Long> beIdsInCluster = Lists.newArrayList(); beIdsInCluster.add(1L); Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index f5994f89a89..0391c7c602c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -21,10 +21,10 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.RoutineLoadDesc; @@ -128,7 +128,7 @@ public class RoutineLoadSchedulerTest { public void functionTest(@Mocked Env env, @Mocked InternalCatalog catalog, @Mocked SystemInfoService systemInfoService, @Injectable Database database) - throws DdlException, InterruptedException { + throws UserException, InterruptedException { new Expectations() { { minTimes = 0; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index e933c0df17c..0ac72df4a6d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -98,7 +98,7 @@ public class SystemInfoServiceTest { System.out.println(Env.getCurrentEnvJournalVersion()); BackendHbResponse writeResponse = new BackendHbResponse(1L, 1234, 1234, 1234, 1234, 1234, "test", - Tag.VALUE_COMPUTATION, false, 1234); + Tag.VALUE_COMPUTATION, 10, 100, false, 1234); // Write objects to file File file1 = new File("./BackendHbResponseSerialization"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index f359d6e6a9e..c9e3c50d998 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -201,7 +201,8 @@ public class DemoMultiBackendsTest { Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 6)); Assert.assertEquals( - "{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}", + "{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false," + + "\"isLoadDisabled\":false,\"isActive\":true,\"currentFragmentNum\":0,\"lastFragmentUpdateTime\":0}", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3)); Assert.assertEquals("0", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 2)); Assert.assertEquals(Tag.VALUE_MIX, result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1)); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 376e2a34df9..127852ae6b4 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -68,6 +68,8 @@ struct TRoutineLoadTask { 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params 16: optional bool is_multi_table 17: optional bool memtable_on_sink_node; + 18: optional string qualified_user + 19: optional string cloud_cluster } struct TKafkaMetaProxyRequest { diff --git a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy index 07aced68f60..3a26e153be5 100644 --- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy @@ -18,5 +18,18 @@ testGroups = "p0" //exclude groups and exclude suites is more prior than include groups and include suites. excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_stream_load_new_move_memtable,test_stream_load_move_memtable,test_materialized_view_move_memtable,test_disable_move_memtable,test_insert_move_memtable,set_and_unset_variable,test_pk_uk_case_cluster,test_point_query_cluster_key,test_compaction_uniq_cluster_keys_with_delete,test_compaction_uniq_keys_cluster_key,test_set_partit [...] -excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,unique_with_mow_p0/ssb_unique_load_zstd_c,nereids_rules_p0/mv,backup_restore,cold_heat_separation,storage_medium_p0" + +excludeDirectories = """ + cloud/multi_cluster, + workload_manager_p1, + nereids_rules_p0/subquery, + unique_with_mow_p0/cluster_key, + unique_with_mow_p0/ssb_unique_sql_zstd_cluster, + unique_with_mow_p0/ssb_unique_load_zstd_c, + nereids_rules_p0/mv, + backup_restore, + cold_heat_separation, + storage_medium_p0 +""" + max_failure_num = 200 diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index b99e21c4e7c..d8de4fd5059 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -59,7 +59,11 @@ excludeGroups = "" excludeSuites = "test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external" // this directories will not be executed -excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery" +excludeDirectories = """ + cloud, + nereids_rules_p0/subquery, + workload_manager_p1 +""" customConf1 = "test_custom_conf_value" diff --git a/regression-test/plugins/plugin_cluster.groovy b/regression-test/plugins/plugin_cluster.groovy new file mode 100644 index 00000000000..15237e29aa8 --- /dev/null +++ b/regression-test/plugins/plugin_cluster.groovy @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonOutput +import org.apache.doris.regression.suite.Suite + +Suite.metaClass.add_cluster = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def s3 = [ + type: 'COMPUTE', + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: s3] + def js = jsonOutput.toJson(map) + log.info("add cluster req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/add_cluster?token=${token}" + body request_body + check check_func + } + } + + add_cluster_api.call(js) { + respCode, body -> + log.info("add cluster resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase('OK') || json.code.equalsIgnoreCase('ALREADY_EXISTED')) + } +} + +Suite.metaClass.get_cluster = { be_unique_id -> + def jsonOutput = new JsonOutput() + def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ] + def js = jsonOutput.toJson(map) + log.info("get cluster req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/get_cluster?token=${token}" + body request_body + check check_func + } + } + + def json + add_cluster_api.call(js) { + respCode, body -> + log.info("get cluster resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase('OK') || json.code.equalsIgnoreCase('ALREADY_EXISTED')) + } + json.result.cluster +} + +Suite.metaClass.drop_cluster = { cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def reqBody = [ + type: 'COMPUTE', + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + ] + ] + def map = [instance_id: "${instance_id}", cluster: reqBody] + def js = jsonOutput.toJson(map) + log.info("drop cluster req: ${js} ".toString()) + + def drop_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/drop_cluster?token=${token}" + body request_body + check check_func + } + } + + drop_cluster_api.call(js) { + respCode, body -> + log.info("dorp cluster resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase('OK') || json.code.equalsIgnoreCase('ALREADY_EXISTED')) + } +} + +Suite.metaClass.add_node = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def clusterInfo = [ + type: 'COMPUTE', + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: clusterInfo] + def js = jsonOutput.toJson(map) + log.info("add node req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/add_node?token=${token}" + body request_body + check check_func + } + } + + add_cluster_api.call(js) { + respCode, body -> + log.info("add node resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase('OK') || json.code.equalsIgnoreCase('ALREADY_EXISTED')) + } +} + +Suite.metaClass.d_node = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def clusterInfo = [ + type: 'COMPUTE', + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: clusterInfo] + def js = jsonOutput.toJson(map) + log.info("decommission node req: ${js} ".toString()) + + def d_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/decommission_node?token=${token}" + body request_body + check check_func + } + } + + d_cluster_api.call(js) { + respCode, body -> + log.info("decommission node resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase('OK') || json.code.equalsIgnoreCase('ALREADY_EXISTED')) + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org