This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 17448dd989 [Feature][Zeta] Implement force stop functionality for jobs
(#10075)
17448dd989 is described below
commit 17448dd989079ef3cded93ac3ebe67df90dcb2bb
Author: dy102 <[email protected]>
AuthorDate: Wed Mar 18 17:52:40 2026 +0900
[Feature][Zeta] Implement force stop functionality for jobs (#10075)
---
docs/en/engines/zeta/rest-api-v1.md | 24 ++++-
docs/en/engines/zeta/rest-api-v2.md | 26 ++++-
docs/en/engines/zeta/user-command.md | 54 ++++++----
docs/zh/engines/zeta/rest-api-v1.md | 24 ++++-
docs/zh/engines/zeta/rest-api-v2.md | 22 +++-
docs/zh/engines/zeta/user-command.md | 83 +++++++++------
.../core/starter/utils/CommandLineUtils.java | 7 +-
.../starter/seatunnel/args/ClientCommandArgs.java | 16 ++-
.../seatunnel/command/ClientExecuteCommand.java | 5 +
.../e2e/ClusterSeaTunnelEngineContainer.java | 115 +++++++++++++++++++++
.../engine/client/job/ClientJobProxy.java | 2 +-
.../seatunnel/engine/client/job/JobClient.java | 6 +-
.../protocol/codec/SeaTunnelCancelJobCodec.java | 23 ++++-
.../SeaTunnelEngine.yaml | 5 +
.../engine/server/CoordinatorService.java | 22 ++++
.../engine/server/dag/physical/PhysicalPlan.java | 51 ++++++---
.../engine/server/dag/physical/PhysicalVertex.java | 11 ++
.../engine/server/dag/physical/SubPlan.java | 22 ++++
.../seatunnel/engine/server/master/JobMaster.java | 8 +-
.../server/operation/CancelJobOperation.java | 25 ++++-
.../engine/server/protocol/task/CancelJobTask.java | 5 +-
.../seatunnel/engine/server/rest/RestConstant.java | 2 +
.../engine/server/rest/service/BaseService.java | 17 ++-
.../ClientToServerOperationDataSerializerHook.java | 1 +
.../engine/server/CoordinatorServiceTest.java | 78 +++++++++++++-
25 files changed, 554 insertions(+), 100 deletions(-)
diff --git a/docs/en/engines/zeta/rest-api-v1.md
b/docs/en/engines/zeta/rest-api-v1.md
index 91c3773435..51fd831f7f 100644
--- a/docs/en/engines/zeta/rest-api-v1.md
+++ b/docs/en/engines/zeta/rest-api-v1.md
@@ -594,14 +594,24 @@ When we can't get the job info, the response will be:
### Stop A Job
<details>
-<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code>
<code>(Returns jobId if job stoped successfully.)</code></summary>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code>
<code>(Returns jobId if job stopped successfully.)</code></summary>
+
+#### Parameters
+
+> | name | required | data type | description
|
+>
|---------------------|----------|-----------|------------------------------------------------------------------|
+> | jobId | yes | long | job id
|
+> | isStopWithSavePoint | no | boolean | If the job is stopped with a
savepoint. |
+> | force | no | boolean | If true, the job is
force-stopped (ignores isStopWithSavePoint). |
+
#### Body
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false,
+ "force": false
}
```
@@ -613,6 +623,10 @@ When we can't get the job info, the response will be:
}
```
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete
successfully, a forced stop (When the `force` option is enabled) will set the
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent
state. It should be used only for exceptional or abnormal situations.
+
</details>
------------------------------------------------------------------------------------------
@@ -627,11 +641,13 @@ When we can't get the job info, the response will be:
[
{
"jobId": 881432421482889220,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
},
{
"jobId": 881432456517910529,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
}
]
```
diff --git a/docs/en/engines/zeta/rest-api-v2.md
b/docs/en/engines/zeta/rest-api-v2.md
index e9d49a3077..88c68c00d1 100644
--- a/docs/en/engines/zeta/rest-api-v2.md
+++ b/docs/en/engines/zeta/rest-api-v2.md
@@ -858,14 +858,24 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
### Stop A Job
<details>
-<summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId
if job stoped successfully.)</code></summary>
+<summary><code>POST</code> <code><b>/stop-job</b></code> <code>(Returns jobId
if job stopped successfully.)</code></summary>
+
+#### Parameters
+
+> | name | required | data type | description
|
+>
|---------------------|----------|-----------|------------------------------------------------------------------|
+> | jobId | yes | long | job id
|
+> | isStopWithSavePoint | no | boolean | If the job is stopped with a
savepoint. |
+> | force | no | boolean | If true, the job is
force-stopped (ignores isStopWithSavePoint). |
+
#### Body
```json
{
- "jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "jobId": 733584788375666689,
+ "isStopWithSavePoint": false,
+ "force": false
}
```
@@ -877,6 +887,10 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
}
```
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete
successfully, a forced stop (When the `force` option is enabled) will set the
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent
state. It should be used only for exceptional or abnormal situations.
+
</details>
------------------------------------------------------------------------------------------
@@ -891,11 +905,13 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
[
{
"jobId": 881432421482889220,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
},
{
"jobId": 881432456517910529,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
}
]
```
diff --git a/docs/en/engines/zeta/user-command.md
b/docs/en/engines/zeta/user-command.md
index 6730b793ba..ad7fd3cb75 100644
--- a/docs/en/engines/zeta/user-command.md
+++ b/docs/en/engines/zeta/user-command.md
@@ -18,24 +18,25 @@ The output is as follows:
Usage: seatunnel.sh [options]
Options:
- --async Run the job asynchronously. When the job
is submitted, the client will exit (default: false).
- -can, --cancel-job Cancel the job by JobId.
- --check Whether to check the config (default:
false).
- -cj, --close-job Close the client and the task will also be
closed (default: true).
- -cn, --cluster The name of the cluster.
- -c, --config Config file.
- --decrypt Decrypt the config file. When both
--decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
- -m, --master, -e, --deploy-mode SeaTunnel job submit master, support
[local, cluster] (default: cluster).
- --encrypt Encrypt the config file. When both
--decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
- --get_running_job_metrics Get metrics for running jobs (default:
false).
- -h, --help Show the usage message.
- -j, --job-id Get the job status by JobId.
- -l, --list List the job status (default: false).
- --metrics Get the job metrics by JobId.
- -n, --name The SeaTunnel job name (default:
SeaTunnel).
- -r, --restore Restore with savepoint by jobId.
- -s, --savepoint Savepoint the job by jobId.
- -i, --variable Variable substitution, such as -i
city=beijing, or -i date=20190318. We use ',' as a separator. When inside "",
',' are treated as normal characters instead of delimiters. (default: []).
+ --async Run the job asynchronously.
When the job is submitted, the client will exit (default: false).
+ -can, --cancel, --cancel-job Cancel the job(s) by JobId.
+ -f, --force-cancel, --force-cancel-job Force Cancel job(s) by JobId.
+ --check Whether to check the config
(default: false).
+ -cj, --close, --close-job Close the client and the task
will also be closed (default: true).
+ -cn, --cluster The name of the cluster.
+ -c, --config Config file.
+ --decrypt Decrypt the config file. When
both --decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
+ -m, --master, -e, --deploy-mode SeaTunnel job submit master,
support [local, cluster] (default: cluster).
+ --encrypt Encrypt the config file. When
both --decrypt and --encrypt are specified, only --encrypt will take effect
(default: false).
+ --get_running_job_metrics Get metrics for running jobs
(default: false).
+ -h, --help Show the usage message.
+ -j, --job-id Get the job status by JobId.
+ -l, --list List the job status (default:
false).
+ --metrics Get the job metrics by JobId.
+ -n, --name The SeaTunnel job name
(default: SeaTunnel).
+ -r, --restore, --restore-job Restore with savepoint by
jobId.
+ -s, --savepoint, --savepoint-job Savepoint the job by jobId.
+ -i, --variable Variable substitution, such as
-i city=beijing, or -i date=20190318. We use ',' as a separator. When inside
"", ',' are treated as normal characters instead of delimiters. (default: []).
```
@@ -123,6 +124,23 @@ Supports batch cancellation of jobs, and can cancel
multiple jobs at one time.
All breakpoint information of the canceled job will be deleted and cannot be
resumed by seatunnel.sh -r <jobId>.
+## Force Canceling Jobs
+
+```shell
+sh bin/seatunnel.sh -f <jobId1> [<jobId2> <jobId3> ...]
+```
+
+This command forcefully cancels the specified job(s).
+After cancellation, the job will be stopped and its status will be set to
`CANCELED`.
+
+This command supports batch operations and allows multiple jobs to be
force-canceled at once.
+
+All breakpoint information of the canceled job will be deleted and cannot be
resumed by seatunnel.sh -r <jobId>.
+
+**Notes:**
+- If the job status is `DOING_SAVEPOINT` and the savepoint does not complete
successfully, a forced stop (When the `force` option is enabled) will set the
job status to `CANCELED`.
+- A forced stop may leave checkpoint data incomplete or in an inconsistent
state. It should be used only for exceptional or abnormal situations.
+
## Configure The JVM Options
We can configure the JVM options for the SeaTunnel Engine client in the
following ways:
diff --git a/docs/zh/engines/zeta/rest-api-v1.md
b/docs/zh/engines/zeta/rest-api-v1.md
index 83a6693aa0..5f43e7854d 100644
--- a/docs/zh/engines/zeta/rest-api-v1.md
+++ b/docs/zh/engines/zeta/rest-api-v1.md
@@ -597,12 +597,22 @@ network:
<details>
<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code>
<code>(如果作业成功停止,返回jobId。)</code></summary>
+#### 参数
+
+| 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+|------------------------|----------|----------|----------|
+| jobId | yes | long | 作业 ID |
+| isStopWithSavePoint | no | boolean | 是否通过 savepoint 方式停止作业 |
+| force | no | boolean | 是否强制停止作业(忽略
isStopWithSavePoint 参数) |
+
+
#### 请求体
```json
{
- "jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "jobId": 733584788375666689,
+ "isStopWithSavePoint": false,
+ "force": false
}
```
@@ -614,6 +624,10 @@ network:
}
```
+**Notes(注意事项):**
+- 如果作业状态为 DOING_SAVEPOINT 且保存点未成功完成,在启用 force 选项时执行的强制停止操作会将作业状态设置为 CANCELED。
+- 强制停止可能导致检查点数据不完整或处于不一致状态,仅应在异常或非正常情况下使用。
+
</details>
@@ -630,11 +644,13 @@ network:
[
{
"jobId": 881432421482889220,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
},
{
"jobId": 881432456517910529,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
}
]
```
diff --git a/docs/zh/engines/zeta/rest-api-v2.md
b/docs/zh/engines/zeta/rest-api-v2.md
index 48b58b894a..2db79129c1 100644
--- a/docs/zh/engines/zeta/rest-api-v2.md
+++ b/docs/zh/engines/zeta/rest-api-v2.md
@@ -842,12 +842,22 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
<details>
<summary><code>POST</code> <code><b>/stop-job</b></code>
<code>(如果作业成功停止,返回jobId。)</code></summary>
+#### 参数
+
+| 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+|------------------------|----------|----------|----------|
+| jobId | yes | long | 作业 ID |
+| isStopWithSavePoint | no | boolean | 是否通过 savepoint 方式停止作业 |
+| force | no | boolean | 是否强制停止作业(忽略
isStopWithSavePoint 参数) |
+
+
#### 请求体
```json
{
"jobId": 733584788375666689,
- "isStopWithSavePoint": false # if job is stopped with save point
+ "isStopWithSavePoint": false,
+ "force": false
}
```
@@ -859,6 +869,10 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
}
```
+**Notes(注意事项):**
+- 如果作业状态为 DOING_SAVEPOINT 且保存点未成功完成,在启用 force 选项时执行的强制停止操作会将作业状态设置为 CANCELED。
+- 强制停止可能导致检查点数据不完整或处于不一致状态,仅应在异常或非正常情况下使用。
+
</details>
@@ -875,11 +889,13 @@ curl --location 'http://127.0.0.1:8080/submit-job/upload'
--form 'config_file=@"
[
{
"jobId": 881432421482889220,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
},
{
"jobId": 881432456517910529,
- "isStopWithSavePoint": false
+ "isStopWithSavePoint": false,
+ "force": false
}
]
```
diff --git a/docs/zh/engines/zeta/user-command.md
b/docs/zh/engines/zeta/user-command.md
index 8891d81361..65accdb7d6 100644
--- a/docs/zh/engines/zeta/user-command.md
+++ b/docs/zh/engines/zeta/user-command.md
@@ -18,39 +18,40 @@ bin/seatunnel.sh -h
Usage: seatunnel.sh [options]
Options:
- --async Run the job asynchronously, when the job
- is submitted, the client will exit
- (default: false)
- -can, --cancel-job Cancel job by JobId
- --check Whether check config (default: false)
- -cj, --close-job Close client the task will also be closed
- (default: true)
- -cn, --cluster The name of cluster
- -c, --config Config file
- --decrypt Decrypt config file, When both --decrypt
- and --encrypt are specified, only
- --encrypt will take effect (default:
- false)
- -m, --master, -e, --deploy-mode SeaTunnel job submit master, support
- [local, cluster] (default: cluster)
- --encrypt Encrypt config file, when both --decrypt
- and --encrypt are specified, only
- --encrypt will take effect (default:
- false)
- --get_running_job_metrics Gets metrics for running jobs (default:
- false)
- -h, --help Show the usage message
- -j, --job-id Get job status by JobId
- -l, --list list job status (default: false)
- --metrics Get job metrics by JobId
- -n, --name SeaTunnel job name (default: SeaTunnel)
- -r, --restore restore with savepoint by jobId
- -s, --savepoint savepoint job by jobId
- -i, --variable Variable substitution, such as -i
- city=beijing, or -i date=20190318.We use
- ',' as separator, when inside "", ',' are
- treated as normal characters instead of
- delimiters. (default: [])
+ --async Run the job asynchronously, when
the job
+ is submitted, the client will
exit
+ (default: false)
+ -can, --cancel, --cancel-job Cancel job(s) by JobId
+ -f, --force-cancel, --force-cancel-job Force Cancel job(s) by jobId
+ --check Whether check config (default:
false)
+ -cj, --close, --close-job Close client the task will also
be closed
+ (default: true)
+ -cn, --cluster The name of cluster
+ -c, --config Config file
+ --decrypt Decrypt config file, When both
--decrypt
+ and --encrypt are specified,
only
+ --encrypt will take effect
(default:
+ false)
+ -m, --master, -e, --deploy-mode SeaTunnel job submit master,
support
+ [local, cluster] (default:
cluster)
+ --encrypt Encrypt config file, when both
--decrypt
+ and --encrypt are specified,
only
+ --encrypt will take effect
(default:
+ false)
+ --get_running_job_metrics Gets metrics for running jobs
(default:
+ false)
+ -h, --help Show the usage message
+ -j, --job-id Get job status by JobId
+ -l, --list list job status (default: false)
+ --metrics Get job metrics by JobId
+ -n, --name SeaTunnel job name (default:
SeaTunnel)
+ -r, --restore, --restore-job restore with savepoint by jobId
+ -s, --savepoint, --savepoint-job savepoint job by jobId
+ -i, --variable Variable substitution, such as
-i
+ city=beijing, or -i
date=20190318.We use
+ ',' as separator, when inside
"", ',' are
+ treated as normal characters
instead of
+ delimiters. (default: [])
```
@@ -138,6 +139,22 @@ bin/seatunnel.sh --config
$SEATUNNEL_HOME/config/v2.batch.config.template
被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r <jobId>恢复。
+## 强制取消作业
+
+```shell
+./bin/seatunnel.sh -f <jobId1> [<jobId2> <jobId3> ...]
+```
+
+该命令用于强制取消指定的作业。
+作业被取消后,将立即停止执行,其状态将变更为 `CANCELED`。
+
+该命令支持批量操作,可一次性强制取消多个作业。
+
+被cancel的作业的所有断点信息都将被删除,无法通过seatunnel.sh -r <jobId>恢复。
+
+**注意事项**
+- 当作业状态为 `DOING_SAVEPOINT` 且 Savepoint 未能成功完成时,启用强制取消(force 选项生效)将直接把作业状态设置为
CANCELED。
+- 强制取消可能会导致 Checkpoint 或 Savepoint 数据不完整或处于不一致状态, 仅建议在异常或紧急情况下使用该操作。
## 配置JVM参数
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
index 300555a86f..24734ab6f4 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
@@ -42,7 +42,12 @@ public class CommandLineUtils {
public static <T extends CommandArgs> T parse(
String[] args, T obj, String programName, boolean
acceptUnknownOptions) {
List<String> list = Arrays.asList(args);
- if (list.contains("-can") || list.contains("--cancel-job")) {
+ if (list.contains("-can")
+ || list.contains("--cancel")
+ || list.contains("--cancel-job")
+ || list.contains("-f")
+ || list.contains("--force-cancel")
+ || list.contains("--force-cancel-job")) {
// When acceptUnknown Options is true, the List parameter cannot
be parsed.
// For details, please refer to the official code
JCommander.class#DefaultVariableArity
acceptUnknownOptions = false;
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 7cdc78ff05..deba32b249 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -49,12 +49,12 @@ public class ClientCommandArgs extends AbstractCommandArgs {
private MasterType masterType = MasterType.CLUSTER;
@Parameter(
- names = {"-r", "--restore"},
+ names = {"-r", "--restore", "--restore-job"},
description = "restore with savepoint by jobId")
private String restoreJobId;
@Parameter(
- names = {"-s", "--savepoint"},
+ names = {"-s", "--savepoint", "--savepoint-job"},
description = "savepoint job by jobId")
private String savePointJobId;
@@ -69,11 +69,17 @@ public class ClientCommandArgs extends AbstractCommandArgs {
private String jobId;
@Parameter(
- names = {"-can", "--cancel-job"},
+ names = {"-can", "--cancel", "--cancel-job"},
variableArity = true,
- description = "Cancel job by JobId")
+ description = "Cancel job(s) by JobId")
private List<String> cancelJobId;
+ @Parameter(
+ names = {"-f", "--force-cancel", "--force-cancel-job"},
+ variableArity = true,
+ description = "Force Cancel job(s) by JobId")
+ private List<String> forceCancelJobId;
+
@Parameter(
names = {"--metrics"},
description = "Get job metrics by JobId")
@@ -126,7 +132,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
private boolean async = false;
@Parameter(
- names = {"-cj", "--close-job"},
+ names = {"-cj", "--close", "--close-job"},
description = "Close client the task will also be closed")
private boolean closeJob = true;
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 169d03403a..b5f5fb4327 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -126,6 +126,11 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
for (String cancelJobId : cancelJobIds) {
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId));
}
+ } else if (null != clientCommandArgs.getForceCancelJobId()) {
+ List<String> forceCancelJobIds =
clientCommandArgs.getForceCancelJobId();
+ for (String cancelJobId : forceCancelJobIds) {
+
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId), true);
+ }
} else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics =
engineClient
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
index 718d725d91..efbbda4d80 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelEngineContainer.java
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static io.restassured.RestAssured.given;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.in;
public class ClusterSeaTunnelEngineContainer extends SeaTunnelEngineContainer {
@@ -1163,6 +1164,120 @@ public class ClusterSeaTunnelEngineContainer extends
SeaTunnelEngineContainer {
});
}
+ @Test
+ public void testForceStopJob() {
+ Tuple3<Integer, String, Long> task = tasks.get(0);
+ String jobId =
+ submitJob(server, task._1(), task._2(), "STREAMING", jobName,
paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_RUNNING_JOB
+ + "/"
+ + jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"force\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ + RestConstant.REST_URL_STOP_JOB)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_FINISHED_JOBS
+ + "/CANCELED")
+ .then()
+ .statusCode(200)
+ .body("jobId", hasItem(jobId)));
+ }
+
+ @Test
+ public void testForceStopJobV2() {
+ Tuple3<Integer, String, Long> task = tasks.get(1);
+ String jobId =
+ submitJob(server, task._1(), task._2(), "STREAMING", jobName,
paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_RUNNING_JOB
+ + "/"
+ + jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"force\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ + RestConstant.REST_URL_STOP_JOB)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ + task._1()
+ + task._2()
+ +
RestConstant.REST_URL_FINISHED_JOBS
+ + "/CANCELED")
+ .then()
+ .statusCode(200)
+ .body("jobId", hasItem(jobId)));
+ }
+
private void submitJobs(
String jobMode,
GenericContainer<?> container,
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index e5d15d7346..a843de4fd5 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -140,7 +140,7 @@ public class ClientJobProxy implements Job {
public void cancelJob() {
PassiveCompletableFuture<Void> cancelFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
- SeaTunnelCancelJobCodec.encodeRequest(jobId));
+ SeaTunnelCancelJobCodec.encodeRequest(jobId, false));
cancelFuture.join();
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index c17befdfe0..9a9447a603 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -141,9 +141,13 @@ public class JobClient {
}
public void cancelJob(Long jobId) {
+ this.cancelJob(jobId, false);
+ }
+
+ public void cancelJob(Long jobId, boolean force) {
PassiveCompletableFuture<Void> cancelFuture =
hazelcastClient.requestOnMasterAndGetCompletableFuture(
- SeaTunnelCancelJobCodec.encodeRequest(jobId));
+ SeaTunnelCancelJobCodec.encodeRequest(jobId, force));
cancelFuture.join();
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
index 1563fe8af4..5e970a69b1 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
@@ -26,10 +26,13 @@ import static
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIEL
import static
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
import static
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
import static
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BOOLEAN_SIZE_IN_BYTES;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeBoolean;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeBoolean;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
@@ -46,14 +49,21 @@ public final class SeaTunnelCancelJobCodec {
public static final int RESPONSE_MESSAGE_TYPE = 14550017;
private static final int REQUEST_JOB_ID_FIELD_OFFSET =
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
- private static final int REQUEST_INITIAL_FRAME_SIZE =
+ private static final int REQUEST_FORCE_FIELD_OFFSET =
REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE =
+ REQUEST_FORCE_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE =
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
private SeaTunnelCancelJobCodec() {}
- public static ClientMessage encodeRequest(long jobId) {
+ public static class RequestParameters {
+ public long jobId;
+ public boolean force;
+ }
+
+ public static ClientMessage encodeRequest(long jobId, boolean force) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);
clientMessage.setOperationName("SeaTunnel.CancelJob");
@@ -61,14 +71,19 @@ public final class SeaTunnelCancelJobCodec {
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+ encodeBoolean(initialFrame.content, REQUEST_FORCE_FIELD_OFFSET, force);
clientMessage.add(initialFrame);
return clientMessage;
}
- public static long decodeRequest(ClientMessage clientMessage) {
+ public static SeaTunnelCancelJobCodec.RequestParameters decodeRequest(
+ ClientMessage clientMessage) {
ForwardFrameIterator iterator = clientMessage.frameIterator();
Frame initialFrame = iterator.next();
- return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ RequestParameters requestParameters = new RequestParameters();
+ requestParameters.jobId = decodeLong(initialFrame.content,
REQUEST_JOB_ID_FIELD_OFFSET);
+ requestParameters.force = decodeBoolean(initialFrame.content,
REQUEST_FORCE_FIELD_OFFSET);
+ return requestParameters;
}
public static ClientMessage encodeResponse() {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index da23e66e42..225e543fe8 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -99,6 +99,11 @@ methods:
nullable: false
since: 2.0
doc: ''
+ - name: force
+ type: boolean
+ nullable: false
+ since: 2.0
+ doc: ''
response: {}
- id: 5
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 1053a7a144..77c281a168 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -809,6 +809,28 @@ public class CoordinatorService {
}
}
+ public PassiveCompletableFuture<Void> stopJob(long jobId) {
+ JobMaster runningJobMaster = getJobMaster(jobId);
+ if (runningJobMaster == null) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.complete(null);
+ return new PassiveCompletableFuture<>(future);
+ } else {
+ boolean isPendingJob = pendingJobQueue.contains(jobId);
+ if (isPendingJob) {
+ pendingJobQueue.removeById(jobId);
+ logger.fine(String.format("Stop pending tasks : %s", jobId));
+ }
+ return new PassiveCompletableFuture<>(
+ CompletableFuture.supplyAsync(
+ () -> {
+ runningJobMaster.stopJob();
+ return null;
+ },
+ executorService));
+ }
+ }
+
public JobStatus getJobStatus(long jobId) {
if (pendingJobQueue.contains(jobId)) {
return JobStatus.PENDING;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 439a1f2048..a7578194bf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -221,10 +221,46 @@ public class PhysicalPlan {
updateJobState(JobStatus.DOING_SAVEPOINT);
}
+ public void stopJob() {
+ JobStatus jobStatus = getJobStatus();
+ if (jobStatus.isEndState()) {
+ log.warn("{} is in end state {}, can not be stop", jobFullName,
jobStatus);
+ return;
+ }
+
+ if (jobStatus.ordinal() <= JobStatus.PENDING.ordinal()) {
+ // Tasks with the status 'INITIALIZING', 'CREATED', 'PENDING' need
to be set directly to
+ // the 'CANCELLED' state because it has not yet started running
+ updateJobState(JobStatus.CANCELED);
+ completeJobEndFuture(new JobResult(JobStatus.CANCELED, null));
+ } else if (jobStatus == JobStatus.DOING_SAVEPOINT) {
+
this.pipelineList.forEach(SubPlan::stopPipelineWithCheckpointFallback);
+ } else {
+ updateJobState(JobStatus.CANCELING);
+ this.pipelineList.forEach(SubPlan::forceStopPipeline);
+ }
+ }
+
public List<SubPlan> getPipelineList() {
return pipelineList;
}
+ private void updateStateInfo(JobStatus current, JobStatus targetState)
throws Exception {
+ RetryUtils.retryWithException(
+ () -> {
+ updateStateTimestamps(targetState);
+ runningJobStateIMap.set(jobId, targetState);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ ExceptionUtil::isOperationNeedRetryException,
+ Constant.OPERATION_RETRY_SLEEP));
+ log.info(
+ String.format("%s turned from state %s to %s.", jobFullName,
current, targetState));
+ }
+
private void updateStateTimestamps(@NonNull JobStatus targetState) {
// we must update runningJobStateTimestampsIMap first and then can
update
// runningJobStateIMap
@@ -261,20 +297,7 @@ public class PhysicalPlan {
// Now do the actual state transition, we must update
runningJobStateTimestampsIMap
// first and then can update runningJobStateIMap
- RetryUtils.retryWithException(
- () -> {
- updateStateTimestamps(targetState);
- runningJobStateIMap.set(jobId, targetState);
- return null;
- },
- new RetryUtils.RetryMaterial(
- Constant.OPERATION_RETRY_TIME,
- true,
- ExceptionUtil::isOperationNeedRetryException,
- Constant.OPERATION_RETRY_SLEEP));
- log.info(
- String.format(
- "%s turned from state %s to %s.", jobFullName,
current, targetState));
+ updateStateInfo(current, targetState);
stateProcess();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 368e0ba5ce..ac30be68ea 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -517,6 +517,17 @@ public class PhysicalVertex {
updateTaskState(taskExecutionState.getExecutionState());
}
+ public synchronized void forceStop() {
+ ExecutionState executionState = getExecutionState();
+ if (executionState == null || executionState.isEndState()) {
+ return;
+ }
+ noticeTaskExecutionServiceCancel();
+ if (!taskFuture.isDone()) {
+ updateTaskState(ExecutionState.CANCELED);
+ }
+ }
+
public Address getCurrentExecutionAddress() {
SlotProfile ownedSlotProfiles =
jobMaster.getOwnedSlotProfiles(taskGroupLocation);
if (ownedSlotProfiles == null) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 3d683b3265..ee54911142 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -396,6 +396,12 @@ public class SubPlan {
}
}
+ public void forceStopPipeline() {
+ jobMaster.neverNeedRestore();
+ coordinatorVertexList.forEach(PhysicalVertex::forceStop);
+ physicalVertexList.forEach(PhysicalVertex::forceStop);
+ }
+
private void cancelCheckpointCoordinator() {
if (jobMaster.getCheckpointManager() != null) {
jobMaster.getCheckpointManager().cancelCheckpoint(pipelineId).join();
@@ -503,6 +509,22 @@ public class SubPlan {
}
}
+ public void stopPipelineWithCheckpointFallback() {
+ if (jobMaster.getCheckpointManager() == null) {
+ forceStopPipeline();
+ return;
+ }
+ if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) {
+ forcePipelineFinish();
+ } else {
+ log.warn(
+ "Failed to stop the pipeline gracefully. Falling back to
forced stop: {}",
+ pipelineFullName);
+ cancelCheckpointCoordinator();
+ forceStopPipeline();
+ }
+ }
+
/** If the job state in CheckpointManager is complete, we need force this
pipeline finish */
private void forcePipelineFinish() {
coordinatorVertexList.forEach(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c12464bc01..ee1edc826c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -114,6 +114,8 @@ import static
org.apache.seatunnel.common.constants.JobMode.BATCH;
public class JobMaster {
private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
+ private final Object metricsLock = new Object();
+
private PhysicalPlan physicalPlan;
private final Data jobImmutableInformationData;
@@ -773,6 +775,10 @@ public class JobMaster {
physicalPlan.cancelJob();
}
+ public synchronized void stopJob() {
+ physicalPlan.stopJob();
+ }
+
public ResourceManager getResourceManager() {
return resourceManager;
}
@@ -883,7 +889,7 @@ public class JobMaster {
this.getCurrJobMetrics(Collections.singletonList(pipelineLocation));
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
long jobId = this.getJobImmutableInformation().getJobId();
- synchronized (this) {
+ synchronized (metricsLock) {
jobHistoryService.storeFinishedPipelineMetrics(jobId, jobMetrics);
}
// Clean TaskGroupContext for TaskExecutionServer
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
index 90bced66c4..9bc383a567 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
@@ -21,21 +21,44 @@ import
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
public class CancelJobOperation extends AbstractJobAsyncOperation {
+ private boolean force;
+
public CancelJobOperation() {
super();
}
- public CancelJobOperation(long jobId) {
+ public CancelJobOperation(long jobId, boolean force) {
super(jobId);
+ this.force = force;
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer service = getService();
+ if (force) {
+ return service.getCoordinatorService().stopJob(jobId);
+ }
return service.getCoordinatorService().cancelJob(jobId);
}
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeBoolean(force);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ force = in.readBoolean();
+ }
+
@Override
public int getClassId() {
return ClientToServerOperationDataSerializerHook.CANCEL_JOB_OPERATOR;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
index 6638b63086..4a6460951c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
@@ -25,7 +25,8 @@ import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class CancelJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+public class CancelJobTask
+ extends
AbstractSeaTunnelMessageTask<SeaTunnelCancelJobCodec.RequestParameters, Void> {
protected CancelJobTask(ClientMessage clientMessage, Node node, Connection
connection) {
super(
clientMessage,
@@ -37,7 +38,7 @@ public class CancelJobTask extends
AbstractSeaTunnelMessageTask<Long, Void> {
@Override
protected Operation prepareOperation() {
- return new CancelJobOperation(parameters);
+ return new CancelJobOperation(parameters.jobId, parameters.force);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index b8f9919665..9cdc746a5a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -27,6 +27,8 @@ public class RestConstant {
public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
+ public static final String FORCE = "force";
+
public static final String CONFIG_FORMAT = "format";
public static final String JOB_STATUS = "jobStatus";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index d104cb9103..39901e0c07 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -808,21 +808,34 @@ public abstract class BaseService {
isStopWithSavePoint =
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
}
+ boolean forceStop = false;
+ if (map.get(RestConstant.FORCE) != null) {
+ forceStop =
Boolean.parseBoolean(map.get(RestConstant.FORCE).toString());
+ }
if (!seaTunnelServer.isMasterNode()) {
+ if (forceStop) {
+ NodeEngineUtil.sendOperationToMasterNode(
+ node.nodeEngine, new CancelJobOperation(jobId,
true))
+ .join();
+ return;
+ }
if (isStopWithSavePoint) {
NodeEngineUtil.sendOperationToMasterNode(
node.nodeEngine, new
SavePointJobOperation(jobId))
.join();
} else {
NodeEngineUtil.sendOperationToMasterNode(
- node.nodeEngine, new CancelJobOperation(jobId))
+ node.nodeEngine, new CancelJobOperation(jobId,
false))
.join();
}
} else {
CoordinatorService coordinatorService =
seaTunnelServer.getCoordinatorService();
-
+ if (forceStop) {
+ coordinatorService.stopJob(jobId);
+ return;
+ }
if (isStopWithSavePoint) {
coordinatorService.savePoint(jobId);
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index cd1d278358..98086b50d5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -68,6 +68,7 @@ public final class ClientToServerOperationDataSerializerHook
implements DataSeri
public static final int GET_RUNNING_JOB_METRICS_OPERATOR = 10;
public static final int UPLOAD_CONNECTOR_JAR_OPERATION = 11;
+
public static final int GET_JOB_CHECKPOINT_OPERATION = 12;
public static final int GET_CHECKPOINT_OVERVIEW_OPERATION = 13;
public static final int GET_CHECKPOINT_HISTORY_OPERATION = 14;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index ab488b2dd3..966cacde2b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -180,6 +180,82 @@ public class CoordinatorServiceTest {
instance.shutdown();
}
+ @Test
+ void testForceStopRunningJob() {
+ JobInformation jobInformation =
+ submitJob(
+ "CoordinatorServiceTest_testForceStopRunningJob",
+ "stream_fake_to_console.conf",
+ "test_force_stop_running_job");
+ CoordinatorService coordinatorService =
jobInformation.coordinatorService;
+
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+
coordinatorService.getJobStatus(jobInformation.jobId));
+ JobMaster jobMaster =
+
coordinatorService.getJobMaster(jobInformation.jobId);
+ Assertions.assertNotNull(jobMaster);
+ Assertions.assertTrue(
+ jobMaster
+ .getRunningJobStateIMap()
+
.containsKey(jobInformation.jobId));
+ });
+
+ coordinatorService.stopJob(jobInformation.jobId).join();
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
+
coordinatorService.getJobStatus(jobInformation.jobId));
+ });
+ jobInformation.coordinatorService.clearCoordinatorService();
+ jobInformation.coordinatorServiceTest.shutdown();
+ }
+
+ @Test
+ void testForceStopAbnormalSavepointJob() {
+ JobInformation jobInformation =
+ submitJob(
+
"CoordinatorServiceTest_testForceStopAbnormalSavepointJob",
+ "stream_fake_to_console.conf",
+ "test_force_stop_abnormal_savepoint_job");
+ CoordinatorService coordinatorService =
jobInformation.coordinatorService;
+
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+
coordinatorService.getJobStatus(jobInformation.jobId));
+ JobMaster jobMaster =
+
coordinatorService.getJobMaster(jobInformation.jobId);
+ Assertions.assertNotNull(jobMaster);
+ Assertions.assertTrue(
+ jobMaster
+ .getRunningJobStateIMap()
+
.containsKey(jobInformation.jobId));
+ });
+
+ coordinatorService
+ .getJobMaster(jobInformation.jobId)
+ .getPhysicalPlan()
+ .updateJobState(JobStatus.DOING_SAVEPOINT);
+ coordinatorService.stopJob(jobInformation.jobId).join();
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
+
coordinatorService.getJobStatus(jobInformation.jobId));
+ });
+ jobInformation.coordinatorService.clearCoordinatorService();
+ jobInformation.coordinatorServiceTest.shutdown();
+ }
+
@Test
void testCleanupPendingJobMasterMapAfterJobFailed() {
setConfigFile("seatunnel_fixed_slots.yaml");
@@ -482,7 +558,7 @@ public class CoordinatorServiceTest {
@Test
@Disabled("Disabled because we can't know when the master node switches in
the unit tests")
- public void testJobRestoreWhenMasterNodeSwitch() throws
InterruptedException {
+ void testJobRestoreWhenMasterNodeSwitch() {
HazelcastInstanceImpl instance1 =
SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(