This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b0dfba08c58 [Enhancement] (nereids) implement
Stop/Resume/PauseDataSyncJobCommand in nereids (#49126)
b0dfba08c58 is described below
commit b0dfba08c586527c34be49008d4ff8f6952e770e
Author: yaoxiao <[email protected]>
AuthorDate: Mon Apr 28 14:07:11 2025 +0800
[Enhancement] (nereids) implement Stop/Resume/PauseDataSyncJobCommand in
nereids (#49126)
Issue Number: #42802, #42803, #42804
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 6 +-
.../org/apache/doris/load/sync/SyncJobManager.java | 97 ++++++++++++++++++++++
.../doris/nereids/parser/LogicalPlanBuilder.java | 59 ++++++++++++-
.../apache/doris/nereids/trees/plans/PlanType.java | 3 +
.../commands/load/PauseDataSyncJobCommand.java | 69 +++++++++++++++
.../commands/load/ResumeDataSyncJobCommand.java | 69 +++++++++++++++
.../commands/load/StopDataSyncJobCommand.java | 69 +++++++++++++++
.../trees/plans/commands/load/SyncJobName.java | 64 ++++++++++++++
.../trees/plans/visitor/CommandVisitor.java | 15 ++++
.../commands/PauseDataSyncJobCommandTest.java | 50 +++++++++++
.../commands/ResumeDataSyncJobCommandTest.java | 50 +++++++++++
.../plans/commands/StopDataSyncJobCommandTest.java | 51 ++++++++++++
12 files changed, 597 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 9a3d6a84754..4a82cc9e28e 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -372,6 +372,9 @@ supportedShowStatement
supportedLoadStatement
: SYNC
#sync
| createRoutineLoad
#createRoutineLoadAlias
+ | STOP SYNC JOB name=multipartIdentifier
#stopDataSyncJob
+ | RESUME SYNC JOB name=multipartIdentifier
#resumeDataSyncJob
+ | PAUSE SYNC JOB name=multipartIdentifier
#pauseDataSyncJob
| CREATE SYNC label=multipartIdentifier
LEFT_PAREN channelDescriptions RIGHT_PAREN
FROM BINLOG LEFT_PAREN propertyItemList RIGHT_PAREN
@@ -463,9 +466,6 @@ unsupportedLoadStatement
: LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)?
#mysqlLoad
- | STOP SYNC JOB name=multipartIdentifier
#stopDataSyncJob
- | RESUME SYNC JOB name=multipartIdentifier
#resumeDataSyncJob
- | PAUSE SYNC JOB name=multipartIdentifier
#pauseDataSyncJob
| PAUSE ROUTINE LOAD FOR label=multipartIdentifier
#pauseRoutineLoad
| PAUSE ALL ROUTINE LOAD
#pauseAllRoutineLoad
| RESUME ROUTINE LOAD FOR label=multipartIdentifier
#resumeRoutineLoad
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index f0de866d801..f8a4f5eaaf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -32,6 +32,9 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.sync.canal.CanalDestination;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import
org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -143,6 +146,100 @@ public class SyncJobManager implements Writable {
map.get(syncJob.getJobName()).add(syncJob);
}
+ public void pauseSyncJob(PauseDataSyncJobCommand command) throws
UserException {
+ String dbName = command.getDbFullName();
+ String jobName = command.getJobName();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+ List<SyncJob> syncJobs = Lists.newArrayList();
+ readLock();
+ try {
+ List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(),
jobName);
+ if (matchJobs.isEmpty()) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ List<SyncJob> runningSyncJobs =
matchJobs.stream().filter(SyncJob::isRunning)
+ .collect(Collectors.toList());
+ if (runningSyncJobs.isEmpty()) {
+ throw new DdlException("There is no running job with jobName `"
+ + command.getJobName() + "` to pause");
+ }
+
+ syncJobs.addAll(runningSyncJobs);
+ } finally {
+ readUnlock();
+ }
+
+ for (SyncJob syncJob : syncJobs) {
+ syncJob.pause();
+ }
+ }
+
+ public void resumeSyncJob(ResumeDataSyncJobCommand command) throws
UserException {
+ String dbName = command.getDbFullName();
+ String jobName = command.getJobName();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+ List<SyncJob> syncJobs = Lists.newArrayList();
+ readLock();
+ try {
+ List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(),
jobName);
+ if (matchJobs.isEmpty()) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ List<SyncJob> pausedSyncJob =
matchJobs.stream().filter(SyncJob::isPaused)
+ .collect(Collectors.toList());
+ if (pausedSyncJob.isEmpty()) {
+ throw new DdlException("There is no paused job with jobName `"
+ + command.getJobName() + "` to resume");
+ }
+
+ syncJobs.addAll(pausedSyncJob);
+ } finally {
+ readUnlock();
+ }
+
+ for (SyncJob syncJob : syncJobs) {
+ syncJob.resume();
+ }
+ }
+
+ public void stopSyncJob(StopDataSyncJobCommand command) throws
UserException {
+ String dbName = command.getDbFullName();
+ String jobName = command.getJobName();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+ // List of sync jobs waiting to be cancelled
+ List<SyncJob> syncJobs = Lists.newArrayList();
+ readLock();
+ try {
+ List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(),
jobName);
+ if (matchJobs.isEmpty()) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ List<SyncJob> uncompletedSyncJob =
matchJobs.stream().filter(entity -> !entity.isCompleted())
+ .collect(Collectors.toList());
+ if (uncompletedSyncJob.isEmpty()) {
+ throw new DdlException("There is no uncompleted job with
jobName `"
+ + command.getJobName() + "`");
+ }
+
+ syncJobs.addAll(uncompletedSyncJob);
+ } finally {
+ readUnlock();
+ }
+
+ for (SyncJob syncJob : syncJobs) {
+ syncJob.cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
+ }
+ }
+
public void pauseSyncJob(PauseSyncJobStmt stmt) throws UserException {
String dbName = stmt.getDbFullName();
String jobName = stmt.getJobName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 44abc65c615..474595caaad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -833,6 +833,10 @@ import
org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
+import
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshDatabaseCommand;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshTableCommand;
@@ -6640,6 +6644,57 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
properties);
}
+ @Override
+ public LogicalPlan visitStopDataSyncJob(DorisParser.StopDataSyncJobContext
ctx) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.name);
+ int size = nameParts.size();
+ String jobName = nameParts.get(size - 1);
+ String dbName;
+ if (size == 1) {
+ dbName = null;
+ } else if (size == 2) {
+ dbName = nameParts.get(0);
+ } else {
+ throw new ParseException("only support [<db>.]<job_name>",
ctx.name);
+ }
+ SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+ return new StopDataSyncJobCommand(syncJobName);
+ }
+
+ @Override
+ public LogicalPlan
visitResumeDataSyncJob(DorisParser.ResumeDataSyncJobContext ctx) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.name);
+ int size = nameParts.size();
+ String jobName = nameParts.get(size - 1);
+ String dbName;
+ if (size == 1) {
+ dbName = null;
+ } else if (size == 2) {
+ dbName = nameParts.get(0);
+ } else {
+ throw new ParseException("only support [<db>.]<job_name>",
ctx.name);
+ }
+ SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+ return new ResumeDataSyncJobCommand(syncJobName);
+ }
+
+ @Override
+ public LogicalPlan
visitPauseDataSyncJob(DorisParser.PauseDataSyncJobContext ctx) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.name);
+ int size = nameParts.size();
+ String jobName = nameParts.get(size - 1);
+ String dbName;
+ if (size == 1) {
+ dbName = null;
+ } else if (size == 2) {
+ dbName = nameParts.get(0);
+ } else {
+ throw new ParseException("only support [<db>.]<job_name>",
ctx.name);
+ }
+ SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+ return new PauseDataSyncJobCommand(syncJobName);
+ }
+
@Override
public List<ChannelDescription>
visitChannelDescriptions(DorisParser.ChannelDescriptionsContext ctx) {
List<ChannelDescription> channelDescriptions = new ArrayList<>();
@@ -6685,7 +6740,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan
visitCreateDataSyncJob(DorisParser.CreateDataSyncJobContext ctx) {
List<ChannelDescription> channelDescriptions =
visitChannelDescriptions(ctx.channelDescriptions());
- List<String> labelParts =
visitMultipartIdentifier(ctx.multipartIdentifier());
+ List<String> labelParts = visitMultipartIdentifier(ctx.label);
int size = labelParts.size();
String jobName = labelParts.get(size - 1);
String dbName;
@@ -6694,7 +6749,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
} else if (size == 2) {
dbName = labelParts.get(0);
} else {
- throw new ParseException("only support <mysql_db>.<src_tbl>",
ctx.multipartIdentifier());
+ throw new ParseException("only support [<db>.]<job_name>",
ctx.label);
}
Map<String, String> propertieItem =
visitPropertyItemList(ctx.propertyItemList());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 2276280e83b..70ef5d782be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -339,6 +339,9 @@ public enum PlanType {
DROP_EXPIRED_STATS_COMMAND,
ALTER_TABLE_STATS_COMMAND,
ALTER_COLUMN_STATS_COMMAND,
+ PAUSE_DATA_SYNC_JOB_COMMAND,
+ RESUME_DATA_SYNC_JOB_COMMAND,
+ STOP_DATA_SYNC_JOB_COMMAND,
CREATE_DATA_SYNC_JOB_COMMAND,
DROP_RESOURCE_COMMAND,
DROP_ROW_POLICY_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
new file mode 100644
index 00000000000..44083df1563
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * PAUSE SYNC statement used to cancel sync job.
+ * syntax:
+ * PAUSE SYNC JOB [db.]jobName
+ */
+public class PauseDataSyncJobCommand extends Command implements
ForwardWithSync {
+ private SyncJobName syncJobName;
+
+ public PauseDataSyncJobCommand(SyncJobName syncJobName) {
+ super(PlanType.PAUSE_DATA_SYNC_JOB_COMMAND);
+ this.syncJobName = syncJobName;
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ validate(ctx);
+ ctx.getEnv().getSyncJobManager().pauseSyncJob(this);
+ }
+
+ public String getJobName() {
+ return syncJobName.getName();
+ }
+
+ public String getDbFullName() {
+ return syncJobName.getDbName();
+ }
+
+ public void validate(ConnectContext ctx) throws AnalysisException {
+ syncJobName.analyze(ctx);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPauseDataSyncJobCommand(this, context);
+ }
+
+ @Override
+ public StmtType stmtType() {
+ return StmtType.PAUSE;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
new file mode 100644
index 00000000000..e5e518beb7b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * RESUME SYNC JOB statement used to resume sync job.
+ * syntax:
+ * RESUME SYNC JOB [db.]jobName
+ */
+public class ResumeDataSyncJobCommand extends Command implements
ForwardWithSync {
+ private SyncJobName syncJobName;
+
+ public ResumeDataSyncJobCommand(SyncJobName syncJobName) {
+ super(PlanType.RESUME_DATA_SYNC_JOB_COMMAND);
+ this.syncJobName = syncJobName;
+ }
+
+ public String getJobName() {
+ return syncJobName.getName();
+ }
+
+ public String getDbFullName() {
+ return syncJobName.getDbName();
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ validate(ctx);
+ ctx.getEnv().getSyncJobManager().resumeSyncJob(this);
+ }
+
+ public void validate(ConnectContext ctx) throws AnalysisException {
+ syncJobName.analyze(ctx);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitResumeDataSyncJobCommand(this, context);
+ }
+
+ @Override
+ public StmtType stmtType() {
+ return StmtType.RESUME;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
new file mode 100644
index 00000000000..692468a2ef4
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * CANCEL SYNC statement used to cancel sync job.
+ * syntax:
+ * STOP SYNC JOB [db.]jobName
+ */
+public class StopDataSyncJobCommand extends Command implements ForwardWithSync
{
+ private SyncJobName syncJobName;
+
+ public StopDataSyncJobCommand(SyncJobName syncJobName) {
+ super(PlanType.STOP_DATA_SYNC_JOB_COMMAND);
+ this.syncJobName = syncJobName;
+ }
+
+ public String getJobName() {
+ return syncJobName.getName();
+ }
+
+ public String getDbFullName() {
+ return syncJobName.getDbName();
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ validate(ctx);
+ ctx.getEnv().getSyncJobManager().stopSyncJob(this);
+ }
+
+ public void validate(ConnectContext ctx) throws AnalysisException {
+ syncJobName.analyze(ctx);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitStopDataSyncJobCommand(this, context);
+ }
+
+ @Override
+ public StmtType stmtType() {
+ return StmtType.STOP;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
new file mode 100644
index 00000000000..783792d922b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+/**
+ * SyncJobName
+ */
+public class SyncJobName {
+ private String jobName;
+ private String dbName;
+
+ public SyncJobName(String dbName, String jobName) {
+ this.dbName = dbName;
+ this.jobName = jobName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getName() {
+ return jobName;
+ }
+
+ /**
+ * analyze
+ */
+ public void analyze(ConnectContext ctx) throws AnalysisException {
+ if (Strings.isNullOrEmpty(dbName)) {
+ if (Strings.isNullOrEmpty(ctx.getDatabase())) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ dbName = ctx.getDatabase();
+ }
+ }
+
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+
sb.append("`").append(dbName).append("`.`").append(jobName).append("`");
+ return sb.toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index 2cdd56989f2..ff96b0ae75e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -207,6 +207,9 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
import
org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshDatabaseCommand;
import
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshTableCommand;
@@ -965,6 +968,18 @@ public interface CommandVisitor<R, C> {
return visitCommand(alterColumnStatsCommand, context);
}
+ default R visitPauseDataSyncJobCommand(PauseDataSyncJobCommand
pauseDataSyncJobCommand, C context) {
+ return visitCommand(pauseDataSyncJobCommand, context);
+ }
+
+ default R visitResumeDataSyncJobCommand(ResumeDataSyncJobCommand
resumeDataSyncJobCommand, C context) {
+ return visitCommand(resumeDataSyncJobCommand, context);
+ }
+
+ default R visitStopDataSyncJobCommand(StopDataSyncJobCommand
stopDataSyncJobCommand, C context) {
+ return visitCommand(stopDataSyncJobCommand, context);
+ }
+
default R visitCreateDataSyncJobCommand(CreateDataSyncJobCommand
createDataSyncJobCommand, C context) {
return visitCommand(createDataSyncJobCommand, context);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..a54904f9124
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PauseDataSyncJobCommandTest {
+ @Mocked
+ private ConnectContext connectContext;
+
+ private void runBefore() {
+ new Expectations() {
+ {
+ ConnectContext.get();
+ minTimes = 0;
+ result = connectContext;
+ }
+ };
+ }
+
+ @Test
+ public void testValidateNormal() throws Exception {
+ runBefore();
+ SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+ PauseDataSyncJobCommand command = new
PauseDataSyncJobCommand(syncJobName);
+ Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..fe89853dc31
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ResumeDataSyncJobCommandTest {
+ @Mocked
+ private ConnectContext connectContext;
+
+ private void runBefore() {
+ new Expectations() {
+ {
+ ConnectContext.get();
+ minTimes = 0;
+ result = connectContext;
+ }
+ };
+ }
+
+ @Test
+ public void testValidateNormal() throws Exception {
+ runBefore();
+ SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+ ResumeDataSyncJobCommand command = new
ResumeDataSyncJobCommand(syncJobName);
+ Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..7b335cec21f
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class StopDataSyncJobCommandTest {
+
+ @Mocked
+ private ConnectContext connectContext;
+
+ private void runBefore() {
+ new Expectations() {
+ {
+ ConnectContext.get();
+ minTimes = 0;
+ result = connectContext;
+ }
+ };
+ }
+
+ @Test
+ public void testValidateNormal() throws Exception {
+ runBefore();
+ SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+ StopDataSyncJobCommand command = new
StopDataSyncJobCommand(syncJobName);
+ Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]