This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0dfba08c58 [Enhancement] (nereids) implement 
Stop/Resume/PauseDataSyncJobCommand in nereids (#49126)
b0dfba08c58 is described below

commit b0dfba08c586527c34be49008d4ff8f6952e770e
Author: yaoxiao <[email protected]>
AuthorDate: Mon Apr 28 14:07:11 2025 +0800

    [Enhancement] (nereids) implement Stop/Resume/PauseDataSyncJobCommand in 
nereids (#49126)
    
    Issue Number: #42802, #42803, #42804
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  6 +-
 .../org/apache/doris/load/sync/SyncJobManager.java | 97 ++++++++++++++++++++++
 .../doris/nereids/parser/LogicalPlanBuilder.java   | 59 ++++++++++++-
 .../apache/doris/nereids/trees/plans/PlanType.java |  3 +
 .../commands/load/PauseDataSyncJobCommand.java     | 69 +++++++++++++++
 .../commands/load/ResumeDataSyncJobCommand.java    | 69 +++++++++++++++
 .../commands/load/StopDataSyncJobCommand.java      | 69 +++++++++++++++
 .../trees/plans/commands/load/SyncJobName.java     | 64 ++++++++++++++
 .../trees/plans/visitor/CommandVisitor.java        | 15 ++++
 .../commands/PauseDataSyncJobCommandTest.java      | 50 +++++++++++
 .../commands/ResumeDataSyncJobCommandTest.java     | 50 +++++++++++
 .../plans/commands/StopDataSyncJobCommandTest.java | 51 ++++++++++++
 12 files changed, 597 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 9a3d6a84754..4a82cc9e28e 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -372,6 +372,9 @@ supportedShowStatement
 supportedLoadStatement
     : SYNC                                                                     
     #sync
     | createRoutineLoad                                                        
     #createRoutineLoadAlias
+    | STOP SYNC JOB name=multipartIdentifier                                   
     #stopDataSyncJob
+    | RESUME SYNC JOB name=multipartIdentifier                                 
     #resumeDataSyncJob
+    | PAUSE SYNC JOB name=multipartIdentifier                                  
     #pauseDataSyncJob
     | CREATE SYNC label=multipartIdentifier
           LEFT_PAREN channelDescriptions RIGHT_PAREN
           FROM BINLOG LEFT_PAREN propertyItemList RIGHT_PAREN
@@ -463,9 +466,6 @@ unsupportedLoadStatement
     : LOAD mysqlDataDesc
         (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
         (commentSpec)?                                                         
     #mysqlLoad
-    | STOP SYNC JOB name=multipartIdentifier                                   
     #stopDataSyncJob
-    | RESUME SYNC JOB name=multipartIdentifier                                 
     #resumeDataSyncJob
-    | PAUSE SYNC JOB name=multipartIdentifier                                  
     #pauseDataSyncJob
     | PAUSE ROUTINE LOAD FOR label=multipartIdentifier                         
     #pauseRoutineLoad
     | PAUSE ALL ROUTINE LOAD                                                   
     #pauseAllRoutineLoad
     | RESUME ROUTINE LOAD FOR label=multipartIdentifier                        
     #resumeRoutineLoad
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index f0de866d801..f8a4f5eaaf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -32,6 +32,9 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.sync.canal.CanalDestination;
 import org.apache.doris.load.sync.canal.CanalSyncJob;
 import 
org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -143,6 +146,100 @@ public class SyncJobManager implements Writable {
         map.get(syncJob.getJobName()).add(syncJob);
     }
 
+    public void pauseSyncJob(PauseDataSyncJobCommand command) throws 
UserException {
+        String dbName = command.getDbFullName();
+        String jobName = command.getJobName();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), 
jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> runningSyncJobs = 
matchJobs.stream().filter(SyncJob::isRunning)
+                    .collect(Collectors.toList());
+            if (runningSyncJobs.isEmpty()) {
+                throw new DdlException("There is no running job with jobName `"
+                    + command.getJobName() + "` to pause");
+            }
+
+            syncJobs.addAll(runningSyncJobs);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.pause();
+        }
+    }
+
+    public void resumeSyncJob(ResumeDataSyncJobCommand command) throws 
UserException {
+        String dbName = command.getDbFullName();
+        String jobName = command.getJobName();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), 
jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> pausedSyncJob = 
matchJobs.stream().filter(SyncJob::isPaused)
+                    .collect(Collectors.toList());
+            if (pausedSyncJob.isEmpty()) {
+                throw new DdlException("There is no paused job with jobName `"
+                    + command.getJobName() + "` to resume");
+            }
+
+            syncJobs.addAll(pausedSyncJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.resume();
+        }
+    }
+
+    public void stopSyncJob(StopDataSyncJobCommand command) throws 
UserException {
+        String dbName = command.getDbFullName();
+        String jobName = command.getJobName();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+
+        // List of sync jobs waiting to be cancelled
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), 
jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> uncompletedSyncJob = 
matchJobs.stream().filter(entity -> !entity.isCompleted())
+                    .collect(Collectors.toList());
+            if (uncompletedSyncJob.isEmpty()) {
+                throw new DdlException("There is no uncompleted job with 
jobName `"
+                    + command.getJobName() + "`");
+            }
+
+            syncJobs.addAll(uncompletedSyncJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
+        }
+    }
+
     public void pauseSyncJob(PauseSyncJobStmt stmt) throws UserException {
         String dbName = stmt.getDbFullName();
         String jobName = stmt.getJobName();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 44abc65c615..474595caaad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -833,6 +833,10 @@ import 
org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
 import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
 import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause;
 import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
+import 
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshDatabaseCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshTableCommand;
@@ -6640,6 +6644,57 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             properties);
     }
 
+    @Override
+    public LogicalPlan visitStopDataSyncJob(DorisParser.StopDataSyncJobContext 
ctx) {
+        List<String> nameParts = visitMultipartIdentifier(ctx.name);
+        int size = nameParts.size();
+        String jobName = nameParts.get(size - 1);
+        String dbName;
+        if (size == 1) {
+            dbName = null;
+        } else if (size == 2) {
+            dbName = nameParts.get(0);
+        } else {
+            throw new ParseException("only support [<db>.]<job_name>", 
ctx.name);
+        }
+        SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+        return new StopDataSyncJobCommand(syncJobName);
+    }
+
+    @Override
+    public LogicalPlan 
visitResumeDataSyncJob(DorisParser.ResumeDataSyncJobContext ctx) {
+        List<String> nameParts = visitMultipartIdentifier(ctx.name);
+        int size = nameParts.size();
+        String jobName = nameParts.get(size - 1);
+        String dbName;
+        if (size == 1) {
+            dbName = null;
+        } else if (size == 2) {
+            dbName = nameParts.get(0);
+        } else {
+            throw new ParseException("only support [<db>.]<job_name>", 
ctx.name);
+        }
+        SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+        return new ResumeDataSyncJobCommand(syncJobName);
+    }
+
+    @Override
+    public LogicalPlan 
visitPauseDataSyncJob(DorisParser.PauseDataSyncJobContext ctx) {
+        List<String> nameParts = visitMultipartIdentifier(ctx.name);
+        int size = nameParts.size();
+        String jobName = nameParts.get(size - 1);
+        String dbName;
+        if (size == 1) {
+            dbName = null;
+        } else if (size == 2) {
+            dbName = nameParts.get(0);
+        } else {
+            throw new ParseException("only support [<db>.]<job_name>", 
ctx.name);
+        }
+        SyncJobName syncJobName = new SyncJobName(jobName, dbName);
+        return new PauseDataSyncJobCommand(syncJobName);
+    }
+
     @Override
     public List<ChannelDescription> 
visitChannelDescriptions(DorisParser.ChannelDescriptionsContext ctx) {
         List<ChannelDescription> channelDescriptions = new ArrayList<>();
@@ -6685,7 +6740,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     @Override
     public LogicalPlan 
visitCreateDataSyncJob(DorisParser.CreateDataSyncJobContext ctx) {
         List<ChannelDescription> channelDescriptions = 
visitChannelDescriptions(ctx.channelDescriptions());
-        List<String> labelParts = 
visitMultipartIdentifier(ctx.multipartIdentifier());
+        List<String> labelParts = visitMultipartIdentifier(ctx.label);
         int size = labelParts.size();
         String jobName = labelParts.get(size - 1);
         String dbName;
@@ -6694,7 +6749,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         } else if (size == 2) {
             dbName = labelParts.get(0);
         } else {
-            throw new ParseException("only support <mysql_db>.<src_tbl>", 
ctx.multipartIdentifier());
+            throw new ParseException("only support [<db>.]<job_name>", 
ctx.label);
         }
 
         Map<String, String> propertieItem = 
visitPropertyItemList(ctx.propertyItemList());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 2276280e83b..70ef5d782be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -339,6 +339,9 @@ public enum PlanType {
     DROP_EXPIRED_STATS_COMMAND,
     ALTER_TABLE_STATS_COMMAND,
     ALTER_COLUMN_STATS_COMMAND,
+    PAUSE_DATA_SYNC_JOB_COMMAND,
+    RESUME_DATA_SYNC_JOB_COMMAND,
+    STOP_DATA_SYNC_JOB_COMMAND,
     CREATE_DATA_SYNC_JOB_COMMAND,
     DROP_RESOURCE_COMMAND,
     DROP_ROW_POLICY_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
new file mode 100644
index 00000000000..44083df1563
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/PauseDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * PAUSE SYNC statement used to cancel sync job.
+ * syntax:
+ *      PAUSE SYNC JOB [db.]jobName
+ */
+public class PauseDataSyncJobCommand extends Command implements 
ForwardWithSync {
+    private SyncJobName syncJobName;
+
+    public PauseDataSyncJobCommand(SyncJobName syncJobName) {
+        super(PlanType.PAUSE_DATA_SYNC_JOB_COMMAND);
+        this.syncJobName = syncJobName;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate(ctx);
+        ctx.getEnv().getSyncJobManager().pauseSyncJob(this);
+    }
+
+    public String getJobName() {
+        return syncJobName.getName();
+    }
+
+    public String getDbFullName() {
+        return syncJobName.getDbName();
+    }
+
+    public void validate(ConnectContext ctx) throws AnalysisException {
+        syncJobName.analyze(ctx);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPauseDataSyncJobCommand(this, context);
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.PAUSE;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
new file mode 100644
index 00000000000..e5e518beb7b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/ResumeDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * RESUME SYNC JOB statement used to resume sync job.
+ * syntax:
+ *      RESUME SYNC JOB [db.]jobName
+ */
+public class ResumeDataSyncJobCommand extends Command implements 
ForwardWithSync {
+    private SyncJobName syncJobName;
+
+    public ResumeDataSyncJobCommand(SyncJobName syncJobName) {
+        super(PlanType.RESUME_DATA_SYNC_JOB_COMMAND);
+        this.syncJobName = syncJobName;
+    }
+
+    public String getJobName() {
+        return syncJobName.getName();
+    }
+
+    public String getDbFullName() {
+        return syncJobName.getDbName();
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate(ctx);
+        ctx.getEnv().getSyncJobManager().resumeSyncJob(this);
+    }
+
+    public void validate(ConnectContext ctx) throws AnalysisException {
+        syncJobName.analyze(ctx);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitResumeDataSyncJobCommand(this, context);
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.RESUME;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
new file mode 100644
index 00000000000..692468a2ef4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/StopDataSyncJobCommand.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * CANCEL SYNC statement used to cancel sync job.
+ * syntax:
+ *      STOP SYNC JOB [db.]jobName
+ */
+public class StopDataSyncJobCommand extends Command implements ForwardWithSync 
{
+    private SyncJobName syncJobName;
+
+    public StopDataSyncJobCommand(SyncJobName syncJobName) {
+        super(PlanType.STOP_DATA_SYNC_JOB_COMMAND);
+        this.syncJobName = syncJobName;
+    }
+
+    public String getJobName() {
+        return syncJobName.getName();
+    }
+
+    public String getDbFullName() {
+        return syncJobName.getDbName();
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate(ctx);
+        ctx.getEnv().getSyncJobManager().stopSyncJob(this);
+    }
+
+    public void validate(ConnectContext ctx) throws AnalysisException {
+        syncJobName.analyze(ctx);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitStopDataSyncJobCommand(this, context);
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.STOP;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
new file mode 100644
index 00000000000..783792d922b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/SyncJobName.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.load;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+/**
+ * SyncJobName
+ */
+public class SyncJobName {
+    private String jobName;
+    private String dbName;
+
+    public SyncJobName(String dbName, String jobName) {
+        this.dbName = dbName;
+        this.jobName = jobName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getName() {
+        return jobName;
+    }
+
+    /**
+     * analyze
+     */
+    public void analyze(ConnectContext ctx) throws AnalysisException {
+        if (Strings.isNullOrEmpty(dbName)) {
+            if (Strings.isNullOrEmpty(ctx.getDatabase())) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+            dbName = ctx.getDatabase();
+        }
+    }
+
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        
sb.append("`").append(dbName).append("`.`").append(jobName).append("`");
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index 2cdd56989f2..ff96b0ae75e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -207,6 +207,9 @@ import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshDatabaseCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.refresh.RefreshTableCommand;
@@ -965,6 +968,18 @@ public interface CommandVisitor<R, C> {
         return visitCommand(alterColumnStatsCommand, context);
     }
 
+    default R visitPauseDataSyncJobCommand(PauseDataSyncJobCommand 
pauseDataSyncJobCommand, C context) {
+        return visitCommand(pauseDataSyncJobCommand, context);
+    }
+
+    default R visitResumeDataSyncJobCommand(ResumeDataSyncJobCommand 
resumeDataSyncJobCommand, C context) {
+        return visitCommand(resumeDataSyncJobCommand, context);
+    }
+
+    default R visitStopDataSyncJobCommand(StopDataSyncJobCommand 
stopDataSyncJobCommand, C context) {
+        return visitCommand(stopDataSyncJobCommand, context);
+    }
+
     default R visitCreateDataSyncJobCommand(CreateDataSyncJobCommand 
createDataSyncJobCommand, C context) {
         return visitCommand(createDataSyncJobCommand, context);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..a54904f9124
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/PauseDataSyncJobCommandTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import 
org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PauseDataSyncJobCommandTest {
+    @Mocked
+    private ConnectContext connectContext;
+
+    private void runBefore() {
+        new Expectations() {
+            {
+                ConnectContext.get();
+                minTimes = 0;
+                result = connectContext;
+            }
+        };
+    }
+
+    @Test
+    public void testValidateNormal() throws Exception {
+        runBefore();
+        SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+        PauseDataSyncJobCommand command = new 
PauseDataSyncJobCommand(syncJobName);
+        Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..fe89853dc31
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/ResumeDataSyncJobCommandTest.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import 
org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ResumeDataSyncJobCommandTest {
+    @Mocked
+    private ConnectContext connectContext;
+
+    private void runBefore() {
+        new Expectations() {
+            {
+                ConnectContext.get();
+                minTimes = 0;
+                result = connectContext;
+            }
+        };
+    }
+
+    @Test
+    public void testValidateNormal() throws Exception {
+        runBefore();
+        SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+        ResumeDataSyncJobCommand command = new 
ResumeDataSyncJobCommand(syncJobName);
+        Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
new file mode 100644
index 00000000000..7b335cec21f
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/StopDataSyncJobCommandTest.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import 
org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;
+import org.apache.doris.nereids.trees.plans.commands.load.SyncJobName;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class StopDataSyncJobCommandTest {
+
+    @Mocked
+    private ConnectContext connectContext;
+
+    private void runBefore() {
+        new Expectations() {
+            {
+                ConnectContext.get();
+                minTimes = 0;
+                result = connectContext;
+            }
+        };
+    }
+
+    @Test
+    public void testValidateNormal() throws Exception {
+        runBefore();
+        SyncJobName syncJobName = new SyncJobName("testDb", "job0");
+        StopDataSyncJobCommand command = new 
StopDataSyncJobCommand(syncJobName);
+        Assertions.assertDoesNotThrow(() -> command.validate(connectContext));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to