This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 17448dd989 [Feature][Zeta] Implement force stop functionality for jobs 
(#10075)
17448dd989 is described below

commit 17448dd989079ef3cded93ac3ebe67df90dcb2bb
Author: dy102 <[email protected]>
AuthorDate: Wed Mar 18 17:52:40 2026 +0900

    [Feature][Zeta] Implement force stop functionality for jobs (#10075)
---
 docs/en/engines/zeta/rest-api-v1.md                |  24 ++++-
 docs/en/engines/zeta/rest-api-v2.md                |  26 ++++-
 docs/en/engines/zeta/user-command.md               |  54 ++++++----
 docs/zh/engines/zeta/rest-api-v1.md                |  24 ++++-
 docs/zh/engines/zeta/rest-api-v2.md                |  22 +++-
 docs/zh/engines/zeta/user-command.md               |  83 +++++++++------
 .../core/starter/utils/CommandLineUtils.java       |   7 +-
 .../starter/seatunnel/args/ClientCommandArgs.java  |  16 ++-
 .../seatunnel/command/ClientExecuteCommand.java    |   5 +
 .../e2e/ClusterSeaTunnelEngineContainer.java       | 115 +++++++++++++++++++++
 .../engine/client/job/ClientJobProxy.java          |   2 +-
 .../seatunnel/engine/client/job/JobClient.java     |   6 +-
 .../protocol/codec/SeaTunnelCancelJobCodec.java    |  23 ++++-
 .../SeaTunnelEngine.yaml                           |   5 +
 .../engine/server/CoordinatorService.java          |  22 ++++
 .../engine/server/dag/physical/PhysicalPlan.java   |  51 ++++++---
 .../engine/server/dag/physical/PhysicalVertex.java |  11 ++
 .../engine/server/dag/physical/SubPlan.java        |  22 ++++
 .../seatunnel/engine/server/master/JobMaster.java  |   8 +-
 .../server/operation/CancelJobOperation.java       |  25 ++++-
 .../engine/server/protocol/task/CancelJobTask.java |   5 +-
 .../seatunnel/engine/server/rest/RestConstant.java |   2 +
 .../engine/server/rest/service/BaseService.java    |  17 ++-
 .../ClientToServerOperationDataSerializerHook.java |   1 +
 .../engine/server/CoordinatorServiceTest.java      |  78 +++++++++++++-
 25 files changed, 554 insertions(+), 100 deletions(-)

diff --git a/docs/en/engines/zeta/rest-api-v1.md 
b/docs/en/engines/zeta/rest-api-v1.md
index 91c3773435..51fd831f7f 100644
--- a/docs/en/engines/zeta/rest-api-v1.md
+++ b/docs/en/engines/zeta/rest-api-v1.md
@@ -594,14 +594,24 @@ When we can't get the job info, the response will be:
 ### Stop A Job
 
 <details>
-<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(Returns jobId if job stoped successfully.)</code></summary>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(Returns jobId if job stopped successfully.)</code></summary>
+
+#### Parameters
+
+> | name                | required | data type | description                   
                                   |
+> 
|---------------------|----------|-----------|------------------------------------------------------------------|
+> | jobId               | yes      | long      | job id                        
                                   |
+> | isStopWithSavePoint | no       | boolean   | If the job is stopped with a 
savepoint.                          |
+> | force               | no       | boolean   | If true, the job is 
force-stopped (ignores isStopWithSavePoint). |
+
 
 #### Body
 
 ```json
 {
     "jobId": 733584788375666689,
-    "isStopWithSavePoint": false # if job is stopped with save point
+    "isStopWithSavePoint": false,
+    "force": false
 }
 ```
 
@@ -613,6 +623,10 @@ When we can't get the job info, the response will be:
 }
 ```
 
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete 
successfully, a forced stop (When the `force` option is enabled) will set the 
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent 
state. It should be used only for exceptional or abnormal situations.
+
 </details>
 
 
------------------------------------------------------------------------------------------
@@ -627,11 +641,13 @@ When we can't get the job info, the response will be:
 [
   {
     "jobId": 881432421482889220,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   },
   {
     "jobId": 881432456517910529,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   }
 ]
 ```
diff --git a/docs/en/engines/zeta/rest-api-v2.md 
b/docs/en/engines/zeta/rest-api-v2.md
index e9d49a3077..88c68c00d1 100644
--- a/docs/en/engines/zeta/rest-api-v2.md
+++ b/docs/en/engines/zeta/rest-api-v2.md
@@ -858,14 +858,24 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 ### Stop A Job
 
 <details>
-<summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId 
if job stoped successfully.)</code></summary>
+<summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId 
if job stopped successfully.)</code></summary>
+
+#### Parameters
+
+> | name                | required | data type | description                   
                                   |
+> 
|---------------------|----------|-----------|------------------------------------------------------------------|
+> | jobId               | yes      | long      | job id                        
                                   |
+> | isStopWithSavePoint | no       | boolean   | If the job is stopped with a 
savepoint.                          |
+> | force               | no       | boolean   | If true, the job is 
force-stopped (ignores isStopWithSavePoint). |
+
 
 #### Body
 
 ```json
 {
-    "jobId": 733584788375666689,
-    "isStopWithSavePoint": false # if job is stopped with save point
+  "jobId": 733584788375666689,
+  "isStopWithSavePoint": false,
+  "force": false
 }
 ```
 
@@ -877,6 +887,10 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 }
 ```
 
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete 
successfully, a forced stop (When the `force` option is enabled) will set the 
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent 
state. It should be used only for exceptional or abnormal situations.
+
 </details>
 
 
------------------------------------------------------------------------------------------
@@ -891,11 +905,13 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 [
   {
     "jobId": 881432421482889220,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   },
   {
     "jobId": 881432456517910529,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   }
 ]
 ```
diff --git a/docs/en/engines/zeta/user-command.md 
b/docs/en/engines/zeta/user-command.md
index 6730b793ba..ad7fd3cb75 100644
--- a/docs/en/engines/zeta/user-command.md
+++ b/docs/en/engines/zeta/user-command.md
@@ -18,24 +18,25 @@ The output is as follows:
 
 Usage: seatunnel.sh [options]
   Options:
-    --async                         Run the job asynchronously. When the job 
is submitted, the client will exit (default: false).
-    -can, --cancel-job              Cancel the job by JobId.
-    --check                         Whether to check the config (default: 
false).
-    -cj, --close-job                Close the client and the task will also be 
closed (default: true).
-    -cn, --cluster                  The name of the cluster.
-    -c, --config                    Config file.
-    --decrypt                       Decrypt the config file. When both 
--decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
-    -m, --master, -e, --deploy-mode SeaTunnel job submit master, support 
[local, cluster] (default: cluster).
-    --encrypt                       Encrypt the config file. When both 
--decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
-    --get_running_job_metrics       Get metrics for running jobs (default: 
false).
-    -h, --help                      Show the usage message.
-    -j, --job-id                    Get the job status by JobId.
-    -l, --list                      List the job status (default: false).
-    --metrics                       Get the job metrics by JobId.
-    -n, --name                      The SeaTunnel job name (default: 
SeaTunnel).
-    -r, --restore                   Restore with savepoint by jobId.
-    -s, --savepoint                 Savepoint the job by jobId.
-    -i, --variable                  Variable substitution, such as -i 
city=beijing, or -i date=20190318. We use ',' as a separator. When inside "", 
',' are treated as normal characters instead of delimiters. (default: []).
+    --async                                     Run the job asynchronously. 
When the job is submitted, the client will exit (default: false).
+    -can, --cancel, --cancel-job                Cancel the job(s) by JobId.
+    -f, --force-cancel, --force-cancel-job      Force Cancel job(s) by JobId.
+    --check                                     Whether to check the config 
(default: false).
+    -cj, --close, --close-job                   Close the client and the task 
will also be closed (default: true).
+    -cn, --cluster                              The name of the cluster.
+    -c, --config                                Config file.
+    --decrypt                                   Decrypt the config file. When 
both --decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
+    -m, --master, -e, --deploy-mode             SeaTunnel job submit master, 
support [local, cluster] (default: cluster).
+    --encrypt                                   Encrypt the config file. When 
both --decrypt and --encrypt are specified, only --encrypt will take effect 
(default: false). 
+    --get_running_job_metrics                   Get metrics for running jobs 
(default: false).
+    -h, --help                                  Show the usage message.
+    -j, --job-id                                Get the job status by JobId.
+    -l, --list                                  List the job status (default: 
false).
+    --metrics                                   Get the job metrics by JobId.
+    -n, --name                                  The SeaTunnel job name 
(default: SeaTunnel).
+    -r, --restore, --restore-job                Restore with savepoint by 
jobId.
+    -s, --savepoint, --savepoint-job            Savepoint the job by jobId.
+    -i, --variable                              Variable substitution, such as 
-i city=beijing, or -i date=20190318. We use ',' as a separator. When inside 
"", ',' are treated as normal characters instead of delimiters. (default: []).
 
 ```
 
@@ -123,6 +124,23 @@ Supports batch cancellation of jobs, and can cancel 
multiple jobs at one time.
 
 All breakpoint information of the canceled job will be deleted and cannot be 
resumed by seatunnel.sh -r &lt;jobId&gt;.
 
+## Force Canceling Jobs
+
+```shell
+sh bin/seatunnel.sh -f <jobId1> [<jobId2> <jobId3> ...]
+```
+
+This command forcefully cancels the specified job(s).
+After cancellation, the job will be stopped and its status will be set to 
`CANCELED`.
+
+This command supports batch operations and allows multiple jobs to be 
force-canceled at once.
+
+All breakpoint information of the canceled job will be deleted and cannot be 
resumed by seatunnel.sh -r &lt;jobId&gt;.
+
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete 
successfully, a forced stop (When the `force` option is enabled) will set the 
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent 
state. It should be used only for exceptional or abnormal situations.
+
 ## Configure The JVM Options
 
 We can configure the JVM options for the SeaTunnel Engine client in the 
following ways:
diff --git a/docs/zh/engines/zeta/rest-api-v1.md 
b/docs/zh/engines/zeta/rest-api-v1.md
index 83a6693aa0..5f43e7854d 100644
--- a/docs/zh/engines/zeta/rest-api-v1.md
+++ b/docs/zh/engines/zeta/rest-api-v1.md
@@ -597,12 +597,22 @@ network:
 <details>
 <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(如果作业成功停止,返回jobId。)</code></summary>
 
+#### 参数
+
+| 参数名称                | 是否必传 | 参数类型 | 参数描述 |
+|------------------------|----------|----------|----------|
+| jobId                  | yes      | long     | 作业 ID |
+| isStopWithSavePoint    | no       | boolean  | 是否通过 savepoint 方式停止作业 |
+| force                  | no       | boolean  | 是否强制停止作业(忽略 
isStopWithSavePoint 参数) |
+
+
 #### 请求体
 
 ```json
 {
-    "jobId": 733584788375666689,
-    "isStopWithSavePoint": false # if job is stopped with save point
+  "jobId": 733584788375666689,
+  "isStopWithSavePoint": false,
+  "force": false
 }
 ```
 
@@ -614,6 +624,10 @@ network:
 }
 ```
 
+**Notes(注意事项):**
+- 如果作业状态为 DOING_SAVEPOINT 且保存点未成功完成,在启用 force 选项时执行的强制停止操作会将作业状态设置为 CANCELED。
+- 强制停止可能导致检查点数据不完整或处于不一致状态,仅应在异常或非正常情况下使用。
+
 </details>
 
 
@@ -630,11 +644,13 @@ network:
 [
   {
     "jobId": 881432421482889220,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   },
   {
     "jobId": 881432456517910529,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   }
 ]
 ```
diff --git a/docs/zh/engines/zeta/rest-api-v2.md 
b/docs/zh/engines/zeta/rest-api-v2.md
index 48b58b894a..2db79129c1 100644
--- a/docs/zh/engines/zeta/rest-api-v2.md
+++ b/docs/zh/engines/zeta/rest-api-v2.md
@@ -842,12 +842,22 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 <details>
 <summary><code>POST</code> <code><b>/stop-job</b></code> 
<code>(如果作业成功停止,返回jobId。)</code></summary>
 
+#### 参数
+
+| 参数名称                | 是否必传 | 参数类型 | 参数描述 |
+|------------------------|----------|----------|----------|
+| jobId                  | yes      | long     | 作业 ID |
+| isStopWithSavePoint    | no       | boolean  | 是否通过 savepoint 方式停止作业 |
+| force                  | no       | boolean  | 是否强制停止作业(忽略 
isStopWithSavePoint 参数) |
+
+
 #### 请求体
 
 ```json
 {
     "jobId": 733584788375666689,
-    "isStopWithSavePoint": false # if job is stopped with save point
+    "isStopWithSavePoint": false,
+    "force": false
 }
 ```
 
@@ -859,6 +869,10 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 }
 ```
 
+**Notes(注意事项):**
+- 如果作业状态为 DOING_SAVEPOINT 且保存点未成功完成,在启用 force 选项时执行的强制停止操作会将作业状态设置为 CANCELED。
+- 强制停止可能导致检查点数据不完整或处于不一致状态,仅应在异常或非正常情况下使用。
+
 </details>
 
 
@@ -875,11 +889,13 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload' 
--form 'config_file=@"
 [
   {
     "jobId": 881432421482889220,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   },
   {
     "jobId": 881432456517910529,
-    "isStopWithSavePoint": false
+    "isStopWithSavePoint": false,
+    "force": false
   }
 ]
 ```
diff --git a/docs/zh/engines/zeta/user-command.md 
b/docs/zh/engines/zeta/user-command.md
index 8891d81361..65accdb7d6 100644
--- a/docs/zh/engines/zeta/user-command.md
+++ b/docs/zh/engines/zeta/user-command.md
@@ -18,39 +18,40 @@ bin/seatunnel.sh -h
 
 Usage: seatunnel.sh [options]
   Options:
-    --async                         Run the job asynchronously, when the job 
-                                    is submitted, the client will exit 
-                                    (default: false)
-    -can, --cancel-job              Cancel job by JobId
-    --check                         Whether check config (default: false)
-    -cj, --close-job                Close client the task will also be closed 
-                                    (default: true)
-    -cn, --cluster                  The name of cluster
-    -c, --config                    Config file
-    --decrypt                       Decrypt config file, When both --decrypt 
-                                    and --encrypt are specified, only 
-                                    --encrypt will take effect (default: 
-                                    false) 
-    -m, --master, -e, --deploy-mode SeaTunnel job submit master, support 
-                                    [local, cluster] (default: cluster)
-    --encrypt                       Encrypt config file, when both --decrypt 
-                                    and --encrypt are specified, only 
-                                    --encrypt will take effect (default: 
-                                    false) 
-    --get_running_job_metrics       Gets metrics for running jobs (default: 
-                                    false) 
-    -h, --help                      Show the usage message
-    -j, --job-id                    Get job status by JobId
-    -l, --list                      list job status (default: false)
-    --metrics                       Get job metrics by JobId
-    -n, --name                      SeaTunnel job name (default: SeaTunnel)
-    -r, --restore                   restore with savepoint by jobId
-    -s, --savepoint                 savepoint job by jobId
-    -i, --variable                  Variable substitution, such as -i 
-                                    city=beijing, or -i date=20190318.We use 
-                                    ',' as separator, when inside "", ',' are 
-                                    treated as normal characters instead of 
-                                    delimiters. (default: [])
+    --async                                   Run the job asynchronously, when 
the job 
+                                              is submitted, the client will 
exit 
+                                              (default: false)
+    -can, --cancel, --cancel-job              Cancel job(s) by JobId
+    -f, --force-cancel, --force-cancel-job    Force Cancel job(s) by jobId
+    --check                                   Whether check config (default: 
false)
+    -cj, --close, --close-job                 Close client the task will also 
be closed 
+                                              (default: true)
+    -cn, --cluster                            The name of cluster
+    -c, --config                              Config file
+    --decrypt                                 Decrypt config file, When both 
--decrypt 
+                                              and --encrypt are specified, 
only 
+                                              --encrypt will take effect 
(default: 
+                                              false) 
+    -m, --master, -e, --deploy-mode           SeaTunnel job submit master, 
support 
+                                              [local, cluster] (default: 
cluster)
+    --encrypt                                 Encrypt config file, when both 
--decrypt 
+                                              and --encrypt are specified, 
only 
+                                              --encrypt will take effect 
(default: 
+                                              false) 
+    --get_running_job_metrics                 Gets metrics for running jobs 
(default: 
+                                              false) 
+    -h, --help                                Show the usage message
+    -j, --job-id                              Get job status by JobId
+    -l, --list                                list job status (default: false)
+    --metrics                                 Get job metrics by JobId
+    -n, --name                                SeaTunnel job name (default: 
SeaTunnel)
+    -r, --restore, --restore-job              restore with savepoint by jobId
+    -s, --savepoint, --savepoint-job          savepoint job by jobId
+    -i, --variable                            Variable substitution, such as 
-i 
+                                              city=beijing, or -i 
date=20190318.We use 
+                                              ',' as separator, when inside 
"", ',' are 
+                                              treated as normal characters 
instead of 
+                                              delimiters. (default: [])
 
 ```
 
@@ -138,6 +139,22 @@ bin/seatunnel.sh --config 
$SEATUNNEL_HOME/config/v2.batch.config.template
 
 被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r &lt;jobId&gt;恢复。
 
+## 强制取消作业
+
+```shell
+./bin/seatunnel.sh -f <jobId1> [<jobId2> <jobId3> ...]
+```
+
+该命令用于强制取消指定的作业。
+作业被取消后,将立即停止执行,其状态将变更为 `CANCELED`。
+
+该命令支持批量操作,可一次性强制取消多个作业。
+
+被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r &lt;jobId&gt;恢复。
+
+**注意事项**
+- 当作业状态为 `DOING_SAVEPOINT` 且 Savepoint 未能成功完成时,启用强制取消(force 选项生效)将直接把作业状态设置为 
CANCELED。
+- 强制取消可能会导致 Checkpoint 或 Savepoint 数据不完整或处于不一致状态, 仅建议在异常或紧急情况下使用该操作。
 
 ## 配置JVM参数
 
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
index 300555a86f..24734ab6f4 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
@@ -42,7 +42,12 @@ public class CommandLineUtils {
     public static <T extends CommandArgs> T parse(
             String[] args, T obj, String programName, boolean 
acceptUnknownOptions) {
         List<String> list = Arrays.asList(args);
-        if (list.contains("-can") || list.contains("--cancel-job")) {
+        if (list.contains("-can")
+                || list.contains("--cancel")
+                || list.contains("--cancel-job")
+                || list.contains("-f")
+                || list.contains("--force-cancel")
+                || list.contains("--force-cancel-job")) {
             // When acceptUnknown Options is true, the List parameter cannot 
be parsed.
             // For details, please refer to the official code 
JCommander.class#DefaultVariableArity
             acceptUnknownOptions = false;
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 7cdc78ff05..deba32b249 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -49,12 +49,12 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     private MasterType masterType = MasterType.CLUSTER;
 
     @Parameter(
-            names = {"-r", "--restore"},
+            names = {"-r", "--restore", "--restore-job"},
             description = "restore with savepoint by jobId")
     private String restoreJobId;
 
     @Parameter(
-            names = {"-s", "--savepoint"},
+            names = {"-s", "--savepoint", "--savepoint-job"},
             description = "savepoint job by jobId")
     private String savePointJobId;
 
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     private String jobId;
 
     @Parameter(
-            names = {"-can", "--cancel-job"},
+            names = {"-can", "--cancel", "--cancel-job"},
             variableArity = true,
-            description = "Cancel job by JobId")
+            description = "Cancel job(s) by JobId")
     private List<String> cancelJobId;
 
+    @Parameter(
+            names = {"-f", "--force-cancel", "--force-cancel-job"},
+            variableArity = true,
+            description = "Force Cancel job(s) by JobId")
+    private List<String> forceCancelJobId;
+
     @Parameter(
             names = {"--metrics"},
             description = "Get job metrics by JobId")
@@ -126,7 +132,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
     private boolean async = false;
 
     @Parameter(
-            names = {"-cj", "--close-job"},
+            names = {"-cj", "--close", "--close-job"},
             description = "Close client the task will also be closed")
     private boolean closeJob = true;
 
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 169d03403a..b5f5fb4327 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -126,6 +126,11 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
                 for (String cancelJobId : cancelJobIds) {
                     
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId));
                 }
+            } else if (null != clientCommandArgs.getForceCancelJobId()) {
+                List<String> forceCancelJobIds = 
clientCommandArgs.getForceCancelJobId();
+                for (String cancelJobId : forceCancelJobIds) {
+                    
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId), true);
+                }
             } else if (null != clientCommandArgs.getMetricsJobId()) {
                 String jobMetrics =
                         engineClient
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
index 718d725d91..efbbda4d80 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static io.restassured.RestAssured.given;
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.in;
 
 public class ClusterSeaTunnelEngineContainer extends SeaTunnelEngineContainer {
@@ -1163,6 +1164,120 @@ public class ClusterSeaTunnelEngineContainer extends 
SeaTunnelEngineContainer {
                         });
     }
 
+    @Test
+    public void testForceStopJob() {
+        Tuple3<Integer, String, Long> task = tasks.get(0);
+        String jobId =
+                submitJob(server, task._1(), task._2(), "STREAMING", jobName, 
paramJobName)
+                        .getBody()
+                        .jsonPath()
+                        .getString("jobId");
+
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                given().get(
+                                                http
+                                                        + server.getHost()
+                                                        + colon
+                                                        + task._1()
+                                                        + task._2()
+                                                        + 
RestConstant.REST_URL_RUNNING_JOB
+                                                        + "/"
+                                                        + jobId)
+                                        .then()
+                                        .statusCode(200)
+                                        .body("jobStatus", 
equalTo("RUNNING")));
+
+        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"force\":true}";
+
+        given().body(parameters)
+                .post(
+                        http
+                                + server.getHost()
+                                + colon
+                                + task._1()
+                                + task._2()
+                                + RestConstant.REST_URL_STOP_JOB)
+                .then()
+                .statusCode(200)
+                .body("jobId", equalTo(jobId));
+
+        Awaitility.await()
+                .atMost(6, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                given().get(
+                                                http
+                                                        + server.getHost()
+                                                        + colon
+                                                        + task._1()
+                                                        + task._2()
+                                                        + 
RestConstant.REST_URL_FINISHED_JOBS
+                                                        + "/CANCELED")
+                                        .then()
+                                        .statusCode(200)
+                                        .body("jobId", hasItem(jobId)));
+    }
+
+    @Test
+    public void testForceStopJobV2() {
+        Tuple3<Integer, String, Long> task = tasks.get(1);
+        String jobId =
+                submitJob(server, task._1(), task._2(), "STREAMING", jobName, 
paramJobName)
+                        .getBody()
+                        .jsonPath()
+                        .getString("jobId");
+
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                given().get(
+                                                http
+                                                        + server.getHost()
+                                                        + colon
+                                                        + task._1()
+                                                        + task._2()
+                                                        + 
RestConstant.REST_URL_RUNNING_JOB
+                                                        + "/"
+                                                        + jobId)
+                                        .then()
+                                        .statusCode(200)
+                                        .body("jobStatus", 
equalTo("RUNNING")));
+
+        String parameters = "{" + "\"jobId\":" + jobId + "," + 
"\"force\":true}";
+
+        given().body(parameters)
+                .post(
+                        http
+                                + server.getHost()
+                                + colon
+                                + task._1()
+                                + task._2()
+                                + RestConstant.REST_URL_STOP_JOB)
+                .then()
+                .statusCode(200)
+                .body("jobId", equalTo(jobId));
+
+        Awaitility.await()
+                .atMost(6, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                given().get(
+                                                http
+                                                        + server.getHost()
+                                                        + colon
+                                                        + task._1()
+                                                        + task._2()
+                                                        + 
RestConstant.REST_URL_FINISHED_JOBS
+                                                        + "/CANCELED")
+                                        .then()
+                                        .statusCode(200)
+                                        .body("jobId", hasItem(jobId)));
+    }
+
     private void submitJobs(
             String jobMode,
             GenericContainer<?> container,
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index e5d15d7346..a843de4fd5 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -140,7 +140,7 @@ public class ClientJobProxy implements Job {
     public void cancelJob() {
         PassiveCompletableFuture<Void> cancelFuture =
                 
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
-                        SeaTunnelCancelJobCodec.encodeRequest(jobId));
+                        SeaTunnelCancelJobCodec.encodeRequest(jobId, false));
 
         cancelFuture.join();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index c17befdfe0..9a9447a603 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -141,9 +141,13 @@ public class JobClient {
     }
 
     public void cancelJob(Long jobId) {
+        this.cancelJob(jobId, false);
+    }
+
+    public void cancelJob(Long jobId, boolean force) {
         PassiveCompletableFuture<Void> cancelFuture =
                 hazelcastClient.requestOnMasterAndGetCompletableFuture(
-                        SeaTunnelCancelJobCodec.encodeRequest(jobId));
+                        SeaTunnelCancelJobCodec.encodeRequest(jobId, force));
 
         cancelFuture.join();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
index 1563fe8af4..5e970a69b1 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
@@ -26,10 +26,13 @@ import static 
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIEL
 import static 
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
 import static 
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
 import static 
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BOOLEAN_SIZE_IN_BYTES;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeBoolean;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeBoolean;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
 import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
 
@@ -46,14 +49,21 @@ public final class SeaTunnelCancelJobCodec {
     public static final int RESPONSE_MESSAGE_TYPE = 14550017;
     private static final int REQUEST_JOB_ID_FIELD_OFFSET =
             PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
-    private static final int REQUEST_INITIAL_FRAME_SIZE =
+    private static final int REQUEST_FORCE_FIELD_OFFSET =
             REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+    private static final int REQUEST_INITIAL_FRAME_SIZE =
+            REQUEST_FORCE_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
     private static final int RESPONSE_INITIAL_FRAME_SIZE =
             RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
 
     private SeaTunnelCancelJobCodec() {}
 
-    public static ClientMessage encodeRequest(long jobId) {
+    public static class RequestParameters {
+        public long jobId;
+        public boolean force;
+    }
+
+    public static ClientMessage encodeRequest(long jobId, boolean force) {
         ClientMessage clientMessage = ClientMessage.createForEncode();
         clientMessage.setRetryable(true);
         clientMessage.setOperationName("SeaTunnel.CancelJob");
@@ -61,14 +71,19 @@ public final class SeaTunnelCancelJobCodec {
         encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
REQUEST_MESSAGE_TYPE);
         encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
         encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+        encodeBoolean(initialFrame.content, REQUEST_FORCE_FIELD_OFFSET, force);
         clientMessage.add(initialFrame);
         return clientMessage;
     }
 
-    public static long decodeRequest(ClientMessage clientMessage) {
+    public static SeaTunnelCancelJobCodec.RequestParameters decodeRequest(
+            ClientMessage clientMessage) {
         ForwardFrameIterator iterator = clientMessage.frameIterator();
         Frame initialFrame = iterator.next();
-        return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+        RequestParameters requestParameters = new RequestParameters();
+        requestParameters.jobId = decodeLong(initialFrame.content, 
REQUEST_JOB_ID_FIELD_OFFSET);
+        requestParameters.force = decodeBoolean(initialFrame.content, 
REQUEST_FORCE_FIELD_OFFSET);
+        return requestParameters;
     }
 
     public static ClientMessage encodeResponse() {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index da23e66e42..225e543fe8 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -99,6 +99,11 @@ methods:
           nullable: false
           since: 2.0
           doc: ''
+        - name: force
+          type: boolean
+          nullable: false
+          since: 2.0
+          doc: ''
     response: {}
 
   - id: 5
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 1053a7a144..77c281a168 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -809,6 +809,28 @@ public class CoordinatorService {
         }
     }
 
+    public PassiveCompletableFuture<Void> stopJob(long jobId) {
+        JobMaster runningJobMaster = getJobMaster(jobId);
+        if (runningJobMaster == null) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.complete(null);
+            return new PassiveCompletableFuture<>(future);
+        } else {
+            boolean isPendingJob = pendingJobQueue.contains(jobId);
+            if (isPendingJob) {
+                pendingJobQueue.removeById(jobId);
+                logger.fine(String.format("Stop pending tasks : %s", jobId));
+            }
+            return new PassiveCompletableFuture<>(
+                    CompletableFuture.supplyAsync(
+                            () -> {
+                                runningJobMaster.stopJob();
+                                return null;
+                            },
+                            executorService));
+        }
+    }
+
     public JobStatus getJobStatus(long jobId) {
         if (pendingJobQueue.contains(jobId)) {
             return JobStatus.PENDING;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 439a1f2048..a7578194bf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -221,10 +221,46 @@ public class PhysicalPlan {
         updateJobState(JobStatus.DOING_SAVEPOINT);
     }
 
+    public void stopJob() {
+        JobStatus jobStatus = getJobStatus();
+        if (jobStatus.isEndState()) {
+            log.warn("{} is in end state {}, can not be stop", jobFullName, 
jobStatus);
+            return;
+        }
+
+        if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) {
+            // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need 
to be set directly to
+            // the 'CANCELLED' state because it has not yet started running
+            updateJobState(JobStatus.CANCELED);
+            completeJobEndFuture(new JobResult(JobStatus.CANCELED, null));
+        } else if (jobStatus == JobStatus.DOING_SAVEPOINT) {
+            
this.pipelineList.forEach(SubPlan::stopPipelineWithCheckpointFallback);
+        } else {
+            updateJobState(JobStatus.CANCELING);
+            this.pipelineList.forEach(SubPlan::forceStopPipeline);
+        }
+    }
+
     public List<SubPlan> getPipelineList() {
         return pipelineList;
     }
 
+    private void updateStateInfo(JobStatus current, JobStatus targetState) 
throws Exception {
+        RetryUtils.retryWithException(
+                () -> {
+                    updateStateTimestamps(targetState);
+                    runningJobStateIMap.set(jobId, targetState);
+                    return null;
+                },
+                new RetryUtils.RetryMaterial(
+                        Constant.OPERATION_RETRY_TIME,
+                        true,
+                        ExceptionUtil::isOperationNeedRetryException,
+                        Constant.OPERATION_RETRY_SLEEP));
+        log.info(
+                String.format("%s turned from state %s to %s.", jobFullName, 
current, targetState));
+    }
+
     private void updateStateTimestamps(@NonNull JobStatus targetState) {
         // we must update runningJobStateTimestampsIMap first and then can 
update
         // runningJobStateIMap
@@ -261,20 +297,7 @@ public class PhysicalPlan {
 
             // Now do the actual state transition, we must update 
runningJobStateTimestampsIMap
             // first and then can update runningJobStateIMap
-            RetryUtils.retryWithException(
-                    () -> {
-                        updateStateTimestamps(targetState);
-                        runningJobStateIMap.set(jobId, targetState);
-                        return null;
-                    },
-                    new RetryUtils.RetryMaterial(
-                            Constant.OPERATION_RETRY_TIME,
-                            true,
-                            ExceptionUtil::isOperationNeedRetryException,
-                            Constant.OPERATION_RETRY_SLEEP));
-            log.info(
-                    String.format(
-                            "%s turned from state %s to %s.", jobFullName, 
current, targetState));
+            updateStateInfo(current, targetState);
             stateProcess();
         } catch (Exception e) {
             log.error(ExceptionUtils.getMessage(e));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 368e0ba5ce..ac30be68ea 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -517,6 +517,17 @@ public class PhysicalVertex {
         updateTaskState(taskExecutionState.getExecutionState());
     }
 
+    public synchronized void forceStop() {
+        ExecutionState executionState = getExecutionState();
+        if (executionState == null || executionState.isEndState()) {
+            return;
+        }
+        noticeTaskExecutionServiceCancel();
+        if (!taskFuture.isDone()) {
+            updateTaskState(ExecutionState.CANCELED);
+        }
+    }
+
     public Address getCurrentExecutionAddress() {
         SlotProfile ownedSlotProfiles = 
jobMaster.getOwnedSlotProfiles(taskGroupLocation);
         if (ownedSlotProfiles == null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 3d683b3265..ee54911142 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -396,6 +396,12 @@ public class SubPlan {
         }
     }
 
+    public void forceStopPipeline() {
+        jobMaster.neverNeedRestore();
+        coordinatorVertexList.forEach(PhysicalVertex::forceStop);
+        physicalVertexList.forEach(PhysicalVertex::forceStop);
+    }
+
     private void cancelCheckpointCoordinator() {
         if (jobMaster.getCheckpointManager() != null) {
             
jobMaster.getCheckpointManager().cancelCheckpoint(pipelineId).join();
@@ -503,6 +509,22 @@ public class SubPlan {
         }
     }
 
+    public void stopPipelineWithCheckpointFallback() {
+        if (jobMaster.getCheckpointManager() == null) {
+            forceStopPipeline();
+            return;
+        }
+        if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+            forcePipelineFinish();
+        } else {
+            log.warn(
+                    "Failed to stop the pipeline gracefully. Falling back to 
forced stop: {}",
+                    pipelineFullName);
+            cancelCheckpointCoordinator();
+            forceStopPipeline();
+        }
+    }
+
     /** If the job state in CheckpointManager is complete, we need force this 
pipeline finish */
     private void forcePipelineFinish() {
         coordinatorVertexList.forEach(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c12464bc01..ee1edc826c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -114,6 +114,8 @@ import static 
org.apache.seatunnel.common.constants.JobMode.BATCH;
 public class JobMaster {
     private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
 
+    private final Object metricsLock = new Object();
+
     private PhysicalPlan physicalPlan;
 
     private final Data jobImmutableInformationData;
@@ -773,6 +775,10 @@ public class JobMaster {
         physicalPlan.cancelJob();
     }
 
+    public synchronized void stopJob() {
+        physicalPlan.stopJob();
+    }
+
     public ResourceManager getResourceManager() {
         return resourceManager;
     }
@@ -883,7 +889,7 @@ public class JobMaster {
                 
this.getCurrJobMetrics(Collections.singletonList(pipelineLocation));
         JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
         long jobId = this.getJobImmutableInformation().getJobId();
-        synchronized (this) {
+        synchronized (metricsLock) {
             jobHistoryService.storeFinishedPipelineMetrics(jobId, jobMetrics);
         }
         // Clean TaskGroupContext for TaskExecutionServer
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
index 90bced66c4..9bc383a567 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
@@ -21,21 +21,44 @@ import 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
 
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
 public class CancelJobOperation extends AbstractJobAsyncOperation {
+    private boolean force;
+
     public CancelJobOperation() {
         super();
     }
 
-    public CancelJobOperation(long jobId) {
+    public CancelJobOperation(long jobId, boolean force) {
         super(jobId);
+        this.force = force;
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
         SeaTunnelServer service = getService();
+        if (force) {
+            return service.getCoordinatorService().stopJob(jobId);
+        }
         return service.getCoordinatorService().cancelJob(jobId);
     }
 
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeBoolean(force);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        force = in.readBoolean();
+    }
+
     @Override
     public int getClassId() {
         return ClientToServerOperationDataSerializerHook.CANCEL_JOB_OPERATOR;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
index 6638b63086..4a6460951c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
@@ -25,7 +25,8 @@ import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class CancelJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+public class CancelJobTask
+        extends 
AbstractSeaTunnelMessageTask<SeaTunnelCancelJobCodec.RequestParameters, Void> {
     protected CancelJobTask(ClientMessage clientMessage, Node node, Connection 
connection) {
         super(
                 clientMessage,
@@ -37,7 +38,7 @@ public class CancelJobTask extends 
AbstractSeaTunnelMessageTask<Long, Void> {
 
     @Override
     protected Operation prepareOperation() {
-        return new CancelJobOperation(parameters);
+        return new CancelJobOperation(parameters.jobId, parameters.force);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index b8f9919665..9cdc746a5a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -27,6 +27,8 @@ public class RestConstant {
 
     public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
 
+    public static final String FORCE = "force";
+
     public static final String CONFIG_FORMAT = "format";
 
     public static final String JOB_STATUS = "jobStatus";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index d104cb9103..39901e0c07 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -808,21 +808,34 @@ public abstract class BaseService {
             isStopWithSavePoint =
                     
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
         }
+        boolean forceStop = false;
+        if (map.get(RestConstant.FORCE) != null) {
+            forceStop = 
Boolean.parseBoolean(map.get(RestConstant.FORCE).toString());
+        }
 
         if (!seaTunnelServer.isMasterNode()) {
+            if (forceStop) {
+                NodeEngineUtil.sendOperationToMasterNode(
+                                node.nodeEngine, new CancelJobOperation(jobId, 
true))
+                        .join();
+                return;
+            }
             if (isStopWithSavePoint) {
                 NodeEngineUtil.sendOperationToMasterNode(
                                 node.nodeEngine, new 
SavePointJobOperation(jobId))
                         .join();
             } else {
                 NodeEngineUtil.sendOperationToMasterNode(
-                                node.nodeEngine, new CancelJobOperation(jobId))
+                                node.nodeEngine, new CancelJobOperation(jobId, 
false))
                         .join();
             }
 
         } else {
             CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
-
+            if (forceStop) {
+                coordinatorService.stopJob(jobId);
+                return;
+            }
             if (isStopWithSavePoint) {
                 coordinatorService.savePoint(jobId);
             } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index cd1d278358..98086b50d5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -68,6 +68,7 @@ public final class ClientToServerOperationDataSerializerHook 
implements DataSeri
     public static final int GET_RUNNING_JOB_METRICS_OPERATOR = 10;
 
     public static final int UPLOAD_CONNECTOR_JAR_OPERATION = 11;
+
     public static final int GET_JOB_CHECKPOINT_OPERATION = 12;
     public static final int GET_CHECKPOINT_OVERVIEW_OPERATION = 13;
     public static final int GET_CHECKPOINT_HISTORY_OPERATION = 14;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index ab488b2dd3..966cacde2b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -180,6 +180,82 @@ public class CoordinatorServiceTest {
         instance.shutdown();
     }
 
+    @Test
+    void testForceStopRunningJob() {
+        JobInformation jobInformation =
+                submitJob(
+                        "CoordinatorServiceTest_testForceStopRunningJob",
+                        "stream_fake_to_console.conf",
+                        "test_force_stop_running_job");
+        CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
+
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.RUNNING,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                            JobMaster jobMaster =
+                                    
coordinatorService.getJobMaster(jobInformation.jobId);
+                            Assertions.assertNotNull(jobMaster);
+                            Assertions.assertTrue(
+                                    jobMaster
+                                            .getRunningJobStateIMap()
+                                            
.containsKey(jobInformation.jobId));
+                        });
+
+        coordinatorService.stopJob(jobInformation.jobId).join();
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.CANCELED,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                        });
+        jobInformation.coordinatorService.clearCoordinatorService();
+        jobInformation.coordinatorServiceTest.shutdown();
+    }
+
+    @Test
+    void testForceStopAbnormalSavepointJob() {
+        JobInformation jobInformation =
+                submitJob(
+                        
"CoordinatorServiceTest_testForceStopAbnormalSavepointJob",
+                        "stream_fake_to_console.conf",
+                        "test_force_stop_abnormal_savepoint_job");
+        CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
+
+        await().atMost(10000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.RUNNING,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                            JobMaster jobMaster =
+                                    
coordinatorService.getJobMaster(jobInformation.jobId);
+                            Assertions.assertNotNull(jobMaster);
+                            Assertions.assertTrue(
+                                    jobMaster
+                                            .getRunningJobStateIMap()
+                                            
.containsKey(jobInformation.jobId));
+                        });
+
+        coordinatorService
+                .getJobMaster(jobInformation.jobId)
+                .getPhysicalPlan()
+                .updateJobState(JobStatus.DOING_SAVEPOINT);
+        coordinatorService.stopJob(jobInformation.jobId).join();
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    JobStatus.CANCELED,
+                                    
coordinatorService.getJobStatus(jobInformation.jobId));
+                        });
+        jobInformation.coordinatorService.clearCoordinatorService();
+        jobInformation.coordinatorServiceTest.shutdown();
+    }
+
     @Test
     void testCleanupPendingJobMasterMapAfterJobFailed() {
         setConfigFile("seatunnel_fixed_slots.yaml");
@@ -482,7 +558,7 @@ public class CoordinatorServiceTest {
 
     @Test
     @Disabled("Disabled because we can't know when the master node switches in 
the unit tests")
-    public void testJobRestoreWhenMasterNodeSwitch() throws 
InterruptedException {
+    void testJobRestoreWhenMasterNodeSwitch() {
         HazelcastInstanceImpl instance1 =
                 SeaTunnelServerStarter.createHazelcastInstance(
                         TestUtils.getClusterName(

Reply via email to