This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7f0ab4fee1 [Feature][Core] Added rest-api for batch start and stop 
(#7522)
7f0ab4fee1 is described below

commit 7f0ab4fee1f6f3e5d65be3644fedd9db9e782515
Author: Jast <ad...@hadoop.wiki>
AuthorDate: Fri Aug 30 22:20:00 2024 +0800

    [Feature][Core] Added rest-api for batch start and stop (#7522)
    
    * [feature]add rest api batch submit and stop
    
    * Update 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
    
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
    
    * [feature]delete disable engine
    
    * Revert "[feature]delete disable engine"
    
    This reverts commit 40df87ebe93fcfa23ff3b9954712db27aef0ef74.
    
    * [feature]fix some problem
    
    ---------
    
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
---
 docs/en/seatunnel-engine/rest-api.md               | 135 +++++++++++++++++
 docs/zh/seatunnel-engine/rest-api.md               | 154 ++++++++++++++++++-
 .../engine/e2e/ClusterSeaTunnelContainer.java      | 167 +++++++++++++++++++++
 .../apache/seatunnel/engine/common/Constant.java   |   2 +
 .../seatunnel/engine/server/rest/RestConstant.java |   4 +-
 .../server/rest/RestHttpPostCommandProcessor.java  |  80 ++++++++--
 .../seatunnel/engine/server/utils/RestUtil.java    |  21 +++
 7 files changed, 543 insertions(+), 20 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 8b6cdb231b..c2c3e06b79 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -389,6 +389,106 @@ When we can't get the job info, the response will be:
 
 
------------------------------------------------------------------------------------------
 
+### Batch Submit Jobs
+
+<details>
+<summary><code>POST</code> 
<code><b>/hazelcast/rest/maps/submit-jobs</b></code> <code>(Returns jobId and 
jobName if the job is successfully submitted.)</code></summary>
+
+#### Parameters (add in the `params` field in the request body)
+
+> |    Parameter Name     |   Required   |  Type   |              Description  
            |
+> 
|----------------------|--------------|---------|---------------------------------------|
+> | jobId                | optional     | string  | job id                     
           |
+> | jobName              | optional     | string  | job name                   
           |
+> | isStartWithSavePoint | optional     | string  | if the job is started with 
save point |
+
+#### Request Body
+
+```json
+[
+  {
+    "params":{
+      "jobId":"123456",
+      "jobName":"SeaTunnel-01"
+    },
+    "env": {
+      "job.mode": "batch"
+    },
+    "source": [
+      {
+        "plugin_name": "FakeSource",
+        "result_table_name": "fake",
+        "row.num": 1000,
+        "schema": {
+          "fields": {
+            "name": "string",
+            "age": "int",
+            "card": "int"
+          }
+        }
+      }
+    ],
+    "transform": [
+    ],
+    "sink": [
+      {
+        "plugin_name": "Console",
+        "source_table_name": ["fake"]
+      }
+    ]
+  },
+  {
+    "params":{
+      "jobId":"1234567",
+      "jobName":"SeaTunnel-02"
+    },
+    "env": {
+      "job.mode": "batch"
+    },
+    "source": [
+      {
+        "plugin_name": "FakeSource",
+        "result_table_name": "fake",
+        "row.num": 1000,
+        "schema": {
+          "fields": {
+            "name": "string",
+            "age": "int",
+            "card": "int"
+          }
+        }
+      }
+    ],
+    "transform": [
+    ],
+    "sink": [
+      {
+        "plugin_name": "Console",
+        "source_table_name": ["fake"]
+      }
+    ]
+  }
+]
+```
+
+#### Response
+
+```json
+[
+  {
+    "jobId": "123456",
+    "jobName": "SeaTunnel-01"
+  },{
+    "jobId": "1234567",
+    "jobName": "SeaTunnel-02"
+  }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
 ### Stop A Job
 
 <details>
@@ -414,7 +514,42 @@ When we can't get the job info, the response will be:
 </details>
 
 
------------------------------------------------------------------------------------------
+### Batch Stop Jobs
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-jobs</b></code> 
<code>(Returns jobId if the job is successfully stopped.)</code></summary>
+
+#### Request Body
+
+```json
+[
+  {
+    "jobId": 881432421482889220,
+    "isStopWithSavePoint": false
+  },
+  {
+    "jobId": 881432456517910529,
+    "isStopWithSavePoint": false
+  }
+]
+```
+
+#### Response
+
+```json
+[
+  {
+    "jobId": 881432421482889220
+  },
+  {
+    "jobId": 881432456517910529
+  }
+]
+```
+
+</details>
 
+------------------------------------------------------------------------------------------
 ### Encrypt Config
 
 <details>
diff --git a/docs/zh/seatunnel-engine/rest-api.md 
b/docs/zh/seatunnel-engine/rest-api.md
index 69199cccc2..d472885b99 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -68,7 +68,7 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 返回所有作业及其当前状态的概览。
+### 返回所有作业及其当前状态的概览
 
 <details>
  <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/running-jobs</b></code> 
<code>(返回所有作业及其当前状态的概览。)</code></summary>
@@ -107,7 +107,7 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 返回作业的详细信息。
+### 返回作业的详细信息
 
 <details>
  <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/job-info/:jobId</b></code> 
<code>(返回作业的详细信息。)</code></summary>
@@ -233,7 +233,7 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 返回所有已完成的作业信息。
+### 返回所有已完成的作业信息
 
 <details>
  <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/finished-jobs/:state</b></code> 
<code>(返回所有已完成的作业信息。)</code></summary>
@@ -265,7 +265,7 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 返回系统监控信息。
+### 返回系统监控信息
 
 <details>
  <summary><code>GET</code> 
<code><b>/hazelcast/rest/maps/system-monitoring-information</b></code> 
<code>(返回系统监控信息。)</code></summary>
@@ -330,7 +330,7 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 提交作业。
+### 提交作业
 
 <details>
 <summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code> 
<code>(如果作业提交成功,返回jobId和jobName。)</code></summary>
@@ -388,7 +388,110 @@ network:
 
 
------------------------------------------------------------------------------------------
 
-### 停止作业。
+
+### 批量提交作业
+
+<details>
+<summary><code>POST</code> 
<code><b>/hazelcast/rest/maps/submit-jobs</b></code> 
<code>(如果作业提交成功,返回jobId和jobName。)</code></summary>
+
+#### 参数(在请求体中params字段中添加)
+
+> |         参数名称         |   是否必传   |  参数类型  |               参数描述              
  |
+> 
|----------------------|----------|--------|-----------------------------------|
+> | jobId                | optional | string | job id                          
  |
+> | jobName              | optional | string | job name                        
  |
+> | isStartWithSavePoint | optional | string | if job is started with save 
point |
+
+
+
+#### 请求体
+
+```json
+[
+  {
+    "params":{
+      "jobId":"123456",
+      "jobName":"SeaTunnel-01"
+    },
+    "env": {
+      "job.mode": "batch"
+    },
+    "source": [
+      {
+        "plugin_name": "FakeSource",
+        "result_table_name": "fake",
+        "row.num": 1000,
+        "schema": {
+          "fields": {
+            "name": "string",
+            "age": "int",
+            "card": "int"
+          }
+        }
+      }
+    ],
+    "transform": [
+    ],
+    "sink": [
+      {
+        "plugin_name": "Console",
+        "source_table_name": ["fake"]
+      }
+    ]
+  },
+  {
+    "params":{
+      "jobId":"1234567",
+      "jobName":"SeaTunnel-02"
+    },
+    "env": {
+      "job.mode": "batch"
+    },
+    "source": [
+      {
+        "plugin_name": "FakeSource",
+        "result_table_name": "fake",
+        "row.num": 1000,
+        "schema": {
+          "fields": {
+            "name": "string",
+            "age": "int",
+            "card": "int"
+          }
+        }
+      }
+    ],
+    "transform": [
+    ],
+    "sink": [
+      {
+        "plugin_name": "Console",
+        "source_table_name": ["fake"]
+      }
+    ]
+  }
+]
+```
+
+#### 响应
+
+```json
+[
+  {
+    "jobId": "123456",
+    "jobName": "SeaTunnel-01"
+  },{
+    "jobId": "1234567",
+    "jobName": "SeaTunnel-02"
+  }
+]
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
+### 停止作业
 
 <details>
 <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> 
<code>(如果作业成功停止,返回jobId。)</code></summary>
@@ -412,9 +515,46 @@ network:
 
 </details>
 
+
+------------------------------------------------------------------------------------------
+
+### 批量停止作业
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-jobs</b></code> 
<code>(如果作业成功停止,返回jobId。)</code></summary>
+
+#### 请求体
+
+```json
+[
+  {
+    "jobId": 881432421482889220,
+    "isStopWithSavePoint": false
+  },
+  {
+    "jobId": 881432456517910529,
+    "isStopWithSavePoint": false
+  }
+]
+```
+
+#### 响应
+
+```json
+[
+  {
+    "jobId": 881432421482889220
+  },
+  {
+    "jobId": 881432456517910529
+  }
+]
+```
+
+</details>
 
------------------------------------------------------------------------------------------
 
-### 加密配置。
+### 加密配置
 
 <details>
 <summary><code>POST</code> 
<code><b>/hazelcast/rest/maps/encrypt-config</b></code> 
<code>(如果配置加密成功,则返回加密后的配置。)</code></summary>
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index bf16cab75d..422735a3ec 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
 import org.awaitility.Awaitility;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -32,13 +33,18 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
+import com.hazelcast.jet.json.JsonUtil;
 import io.restassured.response.Response;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -269,6 +275,167 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
         return submitJob(jobMode, container, false, jobName, paramJobName);
     }
 
+    @Test
+    public void testStopJobs() {
+        Arrays.asList(server)
+                .forEach(
+                        container -> {
+                            try {
+                                submitJobs("STREAMING", container, false, 
CUSTOM_JOB_ID);
+
+                                String parameters =
+                                        "[{\"jobId\":"
+                                                + CUSTOM_JOB_ID
+                                                + 
",\"isStopWithSavePoint\":false},{\"jobId\":"
+                                                + (CUSTOM_JOB_ID - 1)
+                                                + 
",\"isStopWithSavePoint\":false}]";
+
+                                given().body(parameters)
+                                        .post(
+                                                http
+                                                        + container.getHost()
+                                                        + colon
+                                                        + 
container.getFirstMappedPort()
+                                                        + 
RestConstant.STOP_JOBS_URL)
+                                        .then()
+                                        .statusCode(200)
+                                        .body("[0].jobId", 
equalTo(CUSTOM_JOB_ID))
+                                        .body("[1].jobId", 
equalTo(CUSTOM_JOB_ID - 1));
+
+                                Awaitility.await()
+                                        .atMost(2, TimeUnit.MINUTES)
+                                        .untilAsserted(
+                                                () ->
+                                                        given().get(
+                                                                        http
+                                                                               
 + container
+                                                                               
         .getHost()
+                                                                               
 + colon
+                                                                               
 + container
+                                                                               
         .getFirstMappedPort()
+                                                                               
 + RestConstant
+                                                                               
         .FINISHED_JOBS_INFO
+                                                                               
 + "/CANCELED")
+                                                                .then()
+                                                                
.statusCode(200)
+                                                                .body(
+                                                                        
"[0].jobId",
+                                                                        
equalTo(
+                                                                               
 String.valueOf(
+                                                                               
         CUSTOM_JOB_ID)))
+                                                                .body(
+                                                                        
"[0].jobId",
+                                                                        
equalTo(
+                                                                               
 String.valueOf(
+                                                                               
         CUSTOM_JOB_ID
+                                                                               
                 - 1))));
+
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+
+    @Test
+    public void testSubmitJobs() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> {
+                            try {
+                                submitJobs("BATCH", container, false, 
CUSTOM_JOB_ID);
+                                submitJobs("BATCH", container, true, 
CUSTOM_JOB_ID);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+    }
+
+    private void submitJobs(
+            String jobMode, GenericContainer<?> container, boolean 
isStartWithSavePoint, Long jobId)
+            throws IOException {
+
+        String requestBody = getJobJson(jobMode, isStartWithSavePoint, jobId);
+
+        Response response =
+                given().body(requestBody)
+                        .header("Content-Type", "application/json; 
charset=utf-8")
+                        .post(
+                                http
+                                        + container.getHost()
+                                        + colon
+                                        + container.getFirstMappedPort()
+                                        + RestConstant.SUBMIT_JOBS_URL);
+
+        response.then()
+                .statusCode(200)
+                .body("[0].jobId", equalTo(String.valueOf(jobId)))
+                .body("[1].jobId", equalTo(String.valueOf(jobId - 1)));
+
+        Response jobInfoResponse =
+                given().header("Content-Type", "application/json; 
charset=utf-8")
+                        .get(
+                                http
+                                        + container.getHost()
+                                        + colon
+                                        + container.getFirstMappedPort()
+                                        + RestConstant.JOB_INFO_URL
+                                        + "/"
+                                        + jobId);
+        jobInfoResponse.then().statusCode(200).body("jobStatus", 
equalTo("RUNNING"));
+    }
+
+    private static @NotNull String getJobJson(
+            String jobMode, boolean isStartWithSavePoint, Long jobId) throws 
IOException {
+        List<Map<String, Object>> jobList = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            Map<String, Object> job = new HashMap<>();
+            Map<String, String> params = new HashMap<>();
+            params.put("jobId", String.valueOf(jobId - i));
+            if (isStartWithSavePoint) {
+                params.put("isStartWithSavePoint", "true");
+            }
+            job.put("params", params);
+
+            Map<String, String> env = new HashMap<>();
+            env.put("job.mode", jobMode);
+            job.put("env", env);
+
+            List<Map<String, Object>> sourceList = new ArrayList<>();
+            Map<String, Object> source = new HashMap<>();
+            source.put("plugin_name", "FakeSource");
+            source.put("result_table_name", "fake");
+            source.put("row.num", 1000);
+
+            Map<String, Object> schema = new HashMap<>();
+            Map<String, String> fields = new HashMap<>();
+            fields.put("name", "string");
+            fields.put("age", "int");
+            fields.put("card", "int");
+            schema.put("fields", fields);
+            source.put("schema", schema);
+
+            sourceList.add(source);
+            job.put("source", sourceList);
+
+            List<Map<String, Object>> transformList = new ArrayList<>();
+            job.put("transform", transformList);
+
+            List<Map<String, Object>> sinkList = new ArrayList<>();
+            Map<String, Object> sink = new HashMap<>();
+            sink.put("plugin_name", "Console");
+            List<String> sourceTableName = new ArrayList<>();
+            sourceTableName.add("fake");
+            sink.put("source_table_name", sourceTableName);
+
+            sinkList.add(sink);
+            job.put("sink", sinkList);
+
+            jobList.add(job);
+        }
+        return JsonUtil.toJson(jobList);
+    }
+
     private Response submitJob(
             String jobMode,
             GenericContainer<?> container,
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index fdb2102581..60551202e8 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -24,6 +24,8 @@ public class Constant {
 
     public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "seatunnel";
 
+    public static final String REST_SUBMIT_JOBS_PARAMS = "params";
+
     /**
      * The default port number for the cluster auto-discovery mechanism's 
multicast communication.
      */
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 7248773703..1d26a0b5fe 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -50,7 +50,6 @@ public class RestConstant {
     @Deprecated public static final String RUNNING_JOB_URL = 
"/hazelcast/rest/maps/running-job";
     public static final String JOB_INFO_URL = "/hazelcast/rest/maps/job-info";
     public static final String FINISHED_JOBS_INFO = 
"/hazelcast/rest/maps/finished-jobs";
-    public static final String SUBMIT_JOB_URL = 
"/hazelcast/rest/maps/submit-job";
     public static final String ENCRYPT_CONFIG = 
"/hazelcast/rest/maps/encrypt-config";
 
     // only for test use
@@ -59,5 +58,8 @@ public class RestConstant {
     public static final String SYSTEM_MONITORING_INFORMATION =
             "/hazelcast/rest/maps/system-monitoring-information";
 
+    public static final String SUBMIT_JOB_URL = 
"/hazelcast/rest/maps/submit-job";
+    public static final String SUBMIT_JOBS_URL = 
"/hazelcast/rest/maps/submit-jobs";
     public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
+    public static final String STOP_JOBS_URL = 
"/hazelcast/rest/maps/stop-jobs";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 150aae54c1..9b8f0f8bca 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -44,17 +44,23 @@ import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
 import com.hazelcast.internal.ascii.rest.HttpPostCommand;
 import com.hazelcast.internal.json.Json;
+import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
 import com.hazelcast.internal.serialization.Data;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.ENCRYPT_CONFIG;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOBS_URL;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOBS_URL;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
 
 public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostCommand> {
@@ -78,10 +84,14 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
     public void handle(HttpPostCommand httpPostCommand) {
         String uri = httpPostCommand.getURI();
         try {
-            if (uri.startsWith(SUBMIT_JOB_URL)) {
+            if (uri.startsWith(SUBMIT_JOBS_URL)) {
+                handleSubmitJobs(httpPostCommand);
+            } else if (uri.startsWith(SUBMIT_JOB_URL)) {
                 handleSubmitJob(httpPostCommand, uri);
+            } else if (uri.startsWith(STOP_JOBS_URL)) {
+                handleStopJobs(httpPostCommand);
             } else if (uri.startsWith(STOP_JOB_URL)) {
-                handleStopJob(httpPostCommand, uri);
+                handleStopJob(httpPostCommand);
             } else if (uri.startsWith(ENCRYPT_CONFIG)) {
                 handleEncrypt(httpPostCommand);
             } else {
@@ -93,7 +103,6 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
             logger.warning("An error occurred while handling request " + 
httpPostCommand, e);
             prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));
         }
-
         this.textCommandService.sendResponse(httpPostCommand);
     }
 
@@ -103,11 +112,41 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
         return (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
     }
 
+    private void handleSubmitJobs(HttpPostCommand httpPostCommand) throws 
IllegalArgumentException {
+        List<Tuple2<Map<String, String>, Config>> configTuples =
+                RestUtil.buildConfigList(requestHandle(httpPostCommand), 
false);
+
+        JsonArray jsonArray =
+                configTuples.stream()
+                        .map(
+                                tuple -> {
+                                    String urlParams = 
mapToUrlParams(tuple._1);
+                                    Map<String, String> requestParams = new 
HashMap<>();
+                                    RestUtil.buildRequestParams(requestParams, 
urlParams);
+                                    return submitJobInternal(tuple._2, 
requestParams);
+                                })
+                        .collect(JsonArray::new, JsonArray::add, 
JsonArray::add);
+
+        prepareResponse(httpPostCommand, jsonArray);
+    }
+
+    private String mapToUrlParams(Map<String, String> params) {
+        return params.entrySet().stream()
+                .map(entry -> entry.getKey() + "=" + entry.getValue())
+                .collect(Collectors.joining("&", "?", ""));
+    }
+
     private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
             throws IllegalArgumentException {
         Map<String, String> requestParams = new HashMap<>();
         RestUtil.buildRequestParams(requestParams, uri);
         Config config = RestUtil.buildConfig(requestHandle(httpPostCommand), 
false);
+
+        JsonObject jsonObject = submitJobInternal(config, requestParams);
+        this.prepareResponse(httpPostCommand, jsonObject);
+    }
+
+    private JsonObject submitJobInternal(Config config, Map<String, String> 
requestParams) {
         ReadonlyConfig envOptions = 
ReadonlyConfig.fromConfig(config.getConfig("env"));
         String jobName = envOptions.get(EnvCommonOptions.JOB_NAME);
 
@@ -146,15 +185,35 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
             submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
         }
 
+        return new JsonObject()
+                .add(RestConstant.JOB_ID, String.valueOf(jobId))
+                .add(RestConstant.JOB_NAME, jobConfig.getName());
+    }
+
+    private void handleStopJobs(HttpPostCommand command) {
+        List<Map> jobList = 
JsonUtils.toList(requestHandle(command).toString(), Map.class);
+        JsonArray jsonResponse = new JsonArray();
+
+        jobList.forEach(
+                job -> {
+                    handleStopJob(job);
+                    jsonResponse.add(
+                            new JsonObject()
+                                    .add(RestConstant.JOB_ID, (Long) 
job.get(RestConstant.JOB_ID)));
+                });
+
+        this.prepareResponse(command, jsonResponse);
+    }
+
+    private void handleStopJob(HttpPostCommand httpPostCommand) {
+        Map<String, Object> map = 
JsonUtils.toMap(requestHandle(httpPostCommand));
+        handleStopJob(map);
         this.prepareResponse(
                 httpPostCommand,
-                new JsonObject()
-                        .add(RestConstant.JOB_ID, String.valueOf(jobId))
-                        .add(RestConstant.JOB_NAME, jobConfig.getName()));
+                new JsonObject().add(RestConstant.JOB_ID, 
map.get(RestConstant.JOB_ID).toString()));
     }
 
-    private void handleStopJob(HttpPostCommand httpPostCommand, String uri) {
-        Map<String, Object> map = 
JsonUtils.toMap(requestHandle(httpPostCommand));
+    private void handleStopJob(Map<String, Object> map) {
         boolean isStopWithSavePoint = false;
         if (map.get(RestConstant.JOB_ID) == null) {
             throw new IllegalArgumentException("jobId cannot be empty.");
@@ -186,10 +245,7 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                 coordinatorService.cancelJob(jobId);
             }
         }
-
-        this.prepareResponse(
-                httpPostCommand,
-                new JsonObject().add(RestConstant.JOB_ID, 
map.get(RestConstant.JOB_ID).toString()));
+        logger.info("Stop job with jobId: " + jobId);
     }
 
     private void handleEncrypt(HttpPostCommand httpPostCommand) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index 9aaa8cd595..b114073aad 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -26,11 +26,17 @@ import 
org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 
 import com.hazelcast.internal.util.StringUtil;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.seatunnel.engine.common.Constant.REST_SUBMIT_JOBS_PARAMS;
 
 public class RestUtil {
     private RestUtil() {}
@@ -69,4 +75,19 @@ public class RestUtil {
         Map<String, Object> objectMap = JsonUtils.toMap(jsonNode);
         return ConfigBuilder.of(objectMap, isEncrypt, true);
     }
+
+    public static List<Tuple2<Map<String, String>, Config>> buildConfigList(
+            JsonNode jsonNode, boolean isEncrypt) {
+        return StreamSupport.stream(jsonNode.spliterator(), false)
+                .filter(JsonNode::isObject)
+                .map(
+                        node -> {
+                            Map<String, Object> nodeMap = 
JsonUtils.toMap(node);
+                            Map<String, String> params =
+                                    (Map<String, String>) 
nodeMap.remove(REST_SUBMIT_JOBS_PARAMS);
+                            Config config = ConfigBuilder.of(nodeMap, 
isEncrypt, true);
+                            return new Tuple2<>(params, config);
+                        })
+                .collect(Collectors.toList());
+    }
 }

Reply via email to