This is an automated email from the ASF dual-hosted git repository. liugddx pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 7f0ab4fee1 [Feature][Core] Added rest-api for batch start and stop (#7522) 7f0ab4fee1 is described below commit 7f0ab4fee1f6f3e5d65be3644fedd9db9e782515 Author: Jast <ad...@hadoop.wiki> AuthorDate: Fri Aug 30 22:20:00 2024 +0800 [Feature][Core] Added rest-api for batch start and stop (#7522) * [feature]add rest api batch submit and stop * Update seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java Co-authored-by: Jia Fan <fanjiaemi...@qq.com> * [feature]delete disable engine * Revert "[feature]delete disable engine" This reverts commit 40df87ebe93fcfa23ff3b9954712db27aef0ef74. * [feature]fix some problem --------- Co-authored-by: Jia Fan <fanjiaemi...@qq.com> --- docs/en/seatunnel-engine/rest-api.md | 135 +++++++++++++++++ docs/zh/seatunnel-engine/rest-api.md | 154 ++++++++++++++++++- .../engine/e2e/ClusterSeaTunnelContainer.java | 167 +++++++++++++++++++++ .../apache/seatunnel/engine/common/Constant.java | 2 + .../seatunnel/engine/server/rest/RestConstant.java | 4 +- .../server/rest/RestHttpPostCommandProcessor.java | 80 ++++++++-- .../seatunnel/engine/server/utils/RestUtil.java | 21 +++ 7 files changed, 543 insertions(+), 20 deletions(-) diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 8b6cdb231b..c2c3e06b79 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -389,6 +389,106 @@ When we can't get the job info, the response will be: ------------------------------------------------------------------------------------------ +### Batch Submit Jobs + +<details> +<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-jobs</b></code> <code>(Returns jobId and jobName if the job is successfully submitted.)</code></summary> + +#### Parameters (add in the `params` field in the request body) + +> | Parameter Name | Required | Type | Description | +> |----------------------|--------------|---------|---------------------------------------| +> | jobId | optional | string | job id | +> | jobName | optional | string | job name | +> | isStartWithSavePoint | optional | string | if the job is started with save point | + +#### Request Body + +```json +[ + { + "params":{ + "jobId":"123456", + "jobName":"SeaTunnel-01" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] + }, + { + "params":{ + "jobId":"1234567", + "jobName":"SeaTunnel-02" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] + } +] +``` + +#### Response + +```json +[ + { + "jobId": "123456", + "jobName": "SeaTunnel-01" + },{ + "jobId": "1234567", + "jobName": "SeaTunnel-02" + } +] +``` + +</details> + +------------------------------------------------------------------------------------------ + ### Stop A Job <details> @@ -414,7 +514,42 @@ When we can't get the job info, the response will be: </details> ------------------------------------------------------------------------------------------ +### Batch Stop Jobs + +<details> +<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-jobs</b></code> <code>(Returns jobId if the job is successfully stopped.)</code></summary> + +#### Request Body + +```json +[ + { + "jobId": 881432421482889220, + "isStopWithSavePoint": false + }, + { + "jobId": 881432456517910529, + "isStopWithSavePoint": false + } +] +``` + +#### Response + +```json +[ + { + "jobId": 881432421482889220 + }, + { + "jobId": 881432456517910529 + } +] +``` + +</details> +------------------------------------------------------------------------------------------ ### Encrypt Config <details> diff --git a/docs/zh/seatunnel-engine/rest-api.md b/docs/zh/seatunnel-engine/rest-api.md index 69199cccc2..d472885b99 100644 --- a/docs/zh/seatunnel-engine/rest-api.md +++ b/docs/zh/seatunnel-engine/rest-api.md @@ -68,7 +68,7 @@ network: ------------------------------------------------------------------------------------------ -### 返回所有作业及其当前状态的概览。 +### 返回所有作业及其当前状态的概览 <details> <summary><code>GET</code> <code><b>/hazelcast/rest/maps/running-jobs</b></code> <code>(返回所有作业及其当前状态的概览。)</code></summary> @@ -107,7 +107,7 @@ network: ------------------------------------------------------------------------------------------ -### 返回作业的详细信息。 +### 返回作业的详细信息 <details> <summary><code>GET</code> <code><b>/hazelcast/rest/maps/job-info/:jobId</b></code> <code>(返回作业的详细信息。)</code></summary> @@ -233,7 +233,7 @@ network: ------------------------------------------------------------------------------------------ -### 返回所有已完成的作业信息。 +### 返回所有已完成的作业信息 <details> <summary><code>GET</code> <code><b>/hazelcast/rest/maps/finished-jobs/:state</b></code> <code>(返回所有已完成的作业信息。)</code></summary> @@ -265,7 +265,7 @@ network: ------------------------------------------------------------------------------------------ -### 返回系统监控信息。 +### 返回系统监控信息 <details> <summary><code>GET</code> <code><b>/hazelcast/rest/maps/system-monitoring-information</b></code> <code>(返回系统监控信息。)</code></summary> @@ -330,7 +330,7 @@ network: ------------------------------------------------------------------------------------------ -### 提交作业。 +### 提交作业 <details> <summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code> <code>(如果作业提交成功,返回jobId和jobName。)</code></summary> @@ -388,7 +388,110 @@ network: ------------------------------------------------------------------------------------------ -### 停止作业。 + +### 批量提交作业 + +<details> +<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-jobs</b></code> <code>(如果作业提交成功,返回jobId和jobName。)</code></summary> + +#### 参数(在请求体中params字段中添加) + +> | 参数名称 | 是否必传 | 参数类型 | 参数描述 | +> |----------------------|----------|--------|-----------------------------------| +> | jobId | optional | string | job id | +> | jobName | optional | string | job name | +> | isStartWithSavePoint | optional | string | if job is started with save point | + + + +#### 请求体 + +```json +[ + { + "params":{ + "jobId":"123456", + "jobName":"SeaTunnel-01" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] + }, + { + "params":{ + "jobId":"1234567", + "jobName":"SeaTunnel-02" + }, + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 1000, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] + } +] +``` + +#### 响应 + +```json +[ + { + "jobId": "123456", + "jobName": "SeaTunnel-01" + },{ + "jobId": "1234567", + "jobName": "SeaTunnel-02" + } +] +``` + +</details> + +------------------------------------------------------------------------------------------ + +### 停止作业 <details> <summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> <code>(如果作业成功停止,返回jobId。)</code></summary> @@ -412,9 +515,46 @@ network: </details> + +------------------------------------------------------------------------------------------ + +### 批量停止作业 + +<details> +<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-jobs</b></code> <code>(如果作业成功停止,返回jobId。)</code></summary> + +#### 请求体 + +```json +[ + { + "jobId": 881432421482889220, + "isStopWithSavePoint": false + }, + { + "jobId": 881432456517910529, + "isStopWithSavePoint": false + } +] +``` + +#### 响应 + +```json +[ + { + "jobId": 881432421482889220 + }, + { + "jobId": 881432456517910529 + } +] +``` + +</details> ------------------------------------------------------------------------------------------ -### 加密配置。 +### 加密配置 <details> <summary><code>POST</code> <code><b>/hazelcast/rest/maps/encrypt-config</b></code> <code>(如果配置加密成功,则返回加密后的配置。)</code></summary> diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index bf16cab75d..422735a3ec 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.seatunnel.engine.server.rest.RestConstant; import org.awaitility.Awaitility; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -32,13 +33,18 @@ import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; +import com.hazelcast.jet.json.JsonUtil; import io.restassured.response.Response; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -269,6 +275,167 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { return submitJob(jobMode, container, false, jobName, paramJobName); } + @Test + public void testStopJobs() { + Arrays.asList(server) + .forEach( + container -> { + try { + submitJobs("STREAMING", container, false, CUSTOM_JOB_ID); + + String parameters = + "[{\"jobId\":" + + CUSTOM_JOB_ID + + ",\"isStopWithSavePoint\":false},{\"jobId\":" + + (CUSTOM_JOB_ID - 1) + + ",\"isStopWithSavePoint\":false}]"; + + given().body(parameters) + .post( + http + + container.getHost() + + colon + + container.getFirstMappedPort() + + RestConstant.STOP_JOBS_URL) + .then() + .statusCode(200) + .body("[0].jobId", equalTo(CUSTOM_JOB_ID)) + .body("[1].jobId", equalTo(CUSTOM_JOB_ID - 1)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + given().get( + http + + container + .getHost() + + colon + + container + .getFirstMappedPort() + + RestConstant + .FINISHED_JOBS_INFO + + "/CANCELED") + .then() + .statusCode(200) + .body( + "[0].jobId", + equalTo( + String.valueOf( + CUSTOM_JOB_ID))) + .body( + "[0].jobId", + equalTo( + String.valueOf( + CUSTOM_JOB_ID + - 1)))); + + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testSubmitJobs() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> { + try { + submitJobs("BATCH", container, false, CUSTOM_JOB_ID); + submitJobs("BATCH", container, true, CUSTOM_JOB_ID); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private void submitJobs( + String jobMode, GenericContainer<?> container, boolean isStartWithSavePoint, Long jobId) + throws IOException { + + String requestBody = getJobJson(jobMode, isStartWithSavePoint, jobId); + + Response response = + given().body(requestBody) + .header("Content-Type", "application/json; charset=utf-8") + .post( + http + + container.getHost() + + colon + + container.getFirstMappedPort() + + RestConstant.SUBMIT_JOBS_URL); + + response.then() + .statusCode(200) + .body("[0].jobId", equalTo(String.valueOf(jobId))) + .body("[1].jobId", equalTo(String.valueOf(jobId - 1))); + + Response jobInfoResponse = + given().header("Content-Type", "application/json; charset=utf-8") + .get( + http + + container.getHost() + + colon + + container.getFirstMappedPort() + + RestConstant.JOB_INFO_URL + + "/" + + jobId); + jobInfoResponse.then().statusCode(200).body("jobStatus", equalTo("RUNNING")); + } + + private static @NotNull String getJobJson( + String jobMode, boolean isStartWithSavePoint, Long jobId) throws IOException { + List<Map<String, Object>> jobList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + Map<String, Object> job = new HashMap<>(); + Map<String, String> params = new HashMap<>(); + params.put("jobId", String.valueOf(jobId - i)); + if (isStartWithSavePoint) { + params.put("isStartWithSavePoint", "true"); + } + job.put("params", params); + + Map<String, String> env = new HashMap<>(); + env.put("job.mode", jobMode); + job.put("env", env); + + List<Map<String, Object>> sourceList = new ArrayList<>(); + Map<String, Object> source = new HashMap<>(); + source.put("plugin_name", "FakeSource"); + source.put("result_table_name", "fake"); + source.put("row.num", 1000); + + Map<String, Object> schema = new HashMap<>(); + Map<String, String> fields = new HashMap<>(); + fields.put("name", "string"); + fields.put("age", "int"); + fields.put("card", "int"); + schema.put("fields", fields); + source.put("schema", schema); + + sourceList.add(source); + job.put("source", sourceList); + + List<Map<String, Object>> transformList = new ArrayList<>(); + job.put("transform", transformList); + + List<Map<String, Object>> sinkList = new ArrayList<>(); + Map<String, Object> sink = new HashMap<>(); + sink.put("plugin_name", "Console"); + List<String> sourceTableName = new ArrayList<>(); + sourceTableName.add("fake"); + sink.put("source_table_name", sourceTableName); + + sinkList.add(sink); + job.put("sink", sinkList); + + jobList.add(job); + } + return JsonUtil.toJson(jobList); + } + private Response submitJob( String jobMode, GenericContainer<?> container, diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index fdb2102581..60551202e8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -24,6 +24,8 @@ public class Constant { public static final String DEFAULT_SEATUNNEL_CLUSTER_NAME = "seatunnel"; + public static final String REST_SUBMIT_JOBS_PARAMS = "params"; + /** * The default port number for the cluster auto-discovery mechanism's multicast communication. */ diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 7248773703..1d26a0b5fe 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -50,7 +50,6 @@ public class RestConstant { @Deprecated public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; public static final String JOB_INFO_URL = "/hazelcast/rest/maps/job-info"; public static final String FINISHED_JOBS_INFO = "/hazelcast/rest/maps/finished-jobs"; - public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String ENCRYPT_CONFIG = "/hazelcast/rest/maps/encrypt-config"; // only for test use @@ -59,5 +58,8 @@ public class RestConstant { public static final String SYSTEM_MONITORING_INFORMATION = "/hazelcast/rest/maps/system-monitoring-information"; + public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; + public static final String SUBMIT_JOBS_URL = "/hazelcast/rest/maps/submit-jobs"; public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job"; + public static final String STOP_JOBS_URL = "/hazelcast/rest/maps/stop-jobs"; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index 150aae54c1..9b8f0f8bca 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -44,17 +44,23 @@ import com.hazelcast.internal.ascii.TextCommandService; import com.hazelcast.internal.ascii.rest.HttpCommandProcessor; import com.hazelcast.internal.ascii.rest.HttpPostCommand; import com.hazelcast.internal.json.Json; +import com.hazelcast.internal.json.JsonArray; import com.hazelcast.internal.json.JsonObject; import com.hazelcast.internal.serialization.Data; +import scala.Tuple2; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.ENCRYPT_CONFIG; +import static org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOBS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL; +import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOBS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL; public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostCommand> { @@ -78,10 +84,14 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC public void handle(HttpPostCommand httpPostCommand) { String uri = httpPostCommand.getURI(); try { - if (uri.startsWith(SUBMIT_JOB_URL)) { + if (uri.startsWith(SUBMIT_JOBS_URL)) { + handleSubmitJobs(httpPostCommand); + } else if (uri.startsWith(SUBMIT_JOB_URL)) { handleSubmitJob(httpPostCommand, uri); + } else if (uri.startsWith(STOP_JOBS_URL)) { + handleStopJobs(httpPostCommand); } else if (uri.startsWith(STOP_JOB_URL)) { - handleStopJob(httpPostCommand, uri); + handleStopJob(httpPostCommand); } else if (uri.startsWith(ENCRYPT_CONFIG)) { handleEncrypt(httpPostCommand); } else { @@ -93,7 +103,6 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC logger.warning("An error occurred while handling request " + httpPostCommand, e); prepareResponse(SC_500, httpPostCommand, exceptionResponse(e)); } - this.textCommandService.sendResponse(httpPostCommand); } @@ -103,11 +112,41 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC return (SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME); } + private void handleSubmitJobs(HttpPostCommand httpPostCommand) throws IllegalArgumentException { + List<Tuple2<Map<String, String>, Config>> configTuples = + RestUtil.buildConfigList(requestHandle(httpPostCommand), false); + + JsonArray jsonArray = + configTuples.stream() + .map( + tuple -> { + String urlParams = mapToUrlParams(tuple._1); + Map<String, String> requestParams = new HashMap<>(); + RestUtil.buildRequestParams(requestParams, urlParams); + return submitJobInternal(tuple._2, requestParams); + }) + .collect(JsonArray::new, JsonArray::add, JsonArray::add); + + prepareResponse(httpPostCommand, jsonArray); + } + + private String mapToUrlParams(Map<String, String> params) { + return params.entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining("&", "?", "")); + } + private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) throws IllegalArgumentException { Map<String, String> requestParams = new HashMap<>(); RestUtil.buildRequestParams(requestParams, uri); Config config = RestUtil.buildConfig(requestHandle(httpPostCommand), false); + + JsonObject jsonObject = submitJobInternal(config, requestParams); + this.prepareResponse(httpPostCommand, jsonObject); + } + + private JsonObject submitJobInternal(Config config, Map<String, String> requestParams) { ReadonlyConfig envOptions = ReadonlyConfig.fromConfig(config.getConfig("env")); String jobName = envOptions.get(EnvCommonOptions.JOB_NAME); @@ -146,15 +185,35 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC submitJob(seaTunnelServer, jobImmutableInformation, jobConfig); } + return new JsonObject() + .add(RestConstant.JOB_ID, String.valueOf(jobId)) + .add(RestConstant.JOB_NAME, jobConfig.getName()); + } + + private void handleStopJobs(HttpPostCommand command) { + List<Map> jobList = JsonUtils.toList(requestHandle(command).toString(), Map.class); + JsonArray jsonResponse = new JsonArray(); + + jobList.forEach( + job -> { + handleStopJob(job); + jsonResponse.add( + new JsonObject() + .add(RestConstant.JOB_ID, (Long) job.get(RestConstant.JOB_ID))); + }); + + this.prepareResponse(command, jsonResponse); + } + + private void handleStopJob(HttpPostCommand httpPostCommand) { + Map<String, Object> map = JsonUtils.toMap(requestHandle(httpPostCommand)); + handleStopJob(map); this.prepareResponse( httpPostCommand, - new JsonObject() - .add(RestConstant.JOB_ID, String.valueOf(jobId)) - .add(RestConstant.JOB_NAME, jobConfig.getName())); + new JsonObject().add(RestConstant.JOB_ID, map.get(RestConstant.JOB_ID).toString())); } - private void handleStopJob(HttpPostCommand httpPostCommand, String uri) { - Map<String, Object> map = JsonUtils.toMap(requestHandle(httpPostCommand)); + private void handleStopJob(Map<String, Object> map) { boolean isStopWithSavePoint = false; if (map.get(RestConstant.JOB_ID) == null) { throw new IllegalArgumentException("jobId cannot be empty."); @@ -186,10 +245,7 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC coordinatorService.cancelJob(jobId); } } - - this.prepareResponse( - httpPostCommand, - new JsonObject().add(RestConstant.JOB_ID, map.get(RestConstant.JOB_ID).toString())); + logger.info("Stop job with jobId: " + jobId); } private void handleEncrypt(HttpPostCommand httpPostCommand) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java index 9aaa8cd595..b114073aad 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java @@ -26,11 +26,17 @@ import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.server.rest.RestConstant; import com.hazelcast.internal.util.StringUtil; +import scala.Tuple2; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.seatunnel.engine.common.Constant.REST_SUBMIT_JOBS_PARAMS; public class RestUtil { private RestUtil() {} @@ -69,4 +75,19 @@ public class RestUtil { Map<String, Object> objectMap = JsonUtils.toMap(jsonNode); return ConfigBuilder.of(objectMap, isEncrypt, true); } + + public static List<Tuple2<Map<String, String>, Config>> buildConfigList( + JsonNode jsonNode, boolean isEncrypt) { + return StreamSupport.stream(jsonNode.spliterator(), false) + .filter(JsonNode::isObject) + .map( + node -> { + Map<String, Object> nodeMap = JsonUtils.toMap(node); + Map<String, String> params = + (Map<String, String>) nodeMap.remove(REST_SUBMIT_JOBS_PARAMS); + Config config = ConfigBuilder.of(nodeMap, isEncrypt, true); + return new Tuple2<>(params, config); + }) + .collect(Collectors.toList()); + } }