This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 77f61408b7 [Improve][Engine] Support custom job id for rest-api named submit-job (#7053) 77f61408b7 is described below commit 77f61408b7e45d18979f6db410c64162514a218d Author: dailai <dai...@chinatelecom.cn> AuthorDate: Tue Jun 25 13:35:52 2024 +0800 [Improve][Engine] Support custom job id for rest-api named submit-job (#7053) --- .../engine/e2e/ClusterSeaTunnelContainer.java | 189 ++++++++++----------- .../server/rest/RestHttpPostCommandProcessor.java | 12 +- .../server/rest/RestJobExecutionEnvironment.java | 3 +- 3 files changed, 100 insertions(+), 104 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java index f3b5922b3b..2967d4227f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java @@ -66,6 +66,8 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { private static final Path hadoopJar = Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar"); + private static final long CUSTOM_JOB_ID = 123456789; + @Override @BeforeEach public void startUp() throws Exception { @@ -101,106 +103,24 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { } @Test - public void testSubmitJob() { + public void testSubmitJobWithCustomJobId() { AtomicInteger i = new AtomicInteger(); - Arrays.asList(server, secondServer) .forEach( - container -> { - Response response = - i.get() == 0 - ? submitJob(container, "BATCH", jobName, paramJobName) - : submitJob(container, "BATCH", jobName, null); - if (i.get() == 0) { - response.then() - .statusCode(200) - .body("jobName", equalTo(paramJobName)); - } else { - response.then().statusCode(200).body("jobName", equalTo(jobName)); - } - String jobId = response.getBody().jsonPath().getString("jobId"); + container -> + submitJobAndAssertResponse( + container, + i, + paramJobName + "&jobId=" + CUSTOM_JOB_ID, + true)); + } - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> { - given().get( - http - + container.getHost() - + colon - + container - .getFirstMappedPort() - + RestConstant - .FINISHED_JOBS_INFO - + "/FINISHED") - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobName", - equalTo( - i.get() == 0 - ? paramJobName - : jobName)) - .body( - "[" + i.get() + "].errorMsg", - equalTo(null)) - .body( - "[" + i.get() + "].jobDag.jobId", - equalTo(Long.parseLong(jobId))) - .body( - "[" - + i.get() - + "].metrics.SourceReceivedCount", - equalTo("100")) - .body( - "[" - + i.get() - + "].metrics.SinkWriteCount", - equalTo("100")) - .body( - "[" + i.get() + "].jobStatus", - equalTo("FINISHED")); - - // test for without status parameter. - given().get( - http - + container.getHost() - + colon - + container - .getFirstMappedPort() - + RestConstant - .FINISHED_JOBS_INFO) - .then() - .statusCode(200) - .body( - "[" + i.get() + "].jobName", - equalTo( - i.get() == 0 - ? paramJobName - : jobName)) - .body( - "[" + i.get() + "].errorMsg", - equalTo(null)) - .body( - "[" + i.get() + "].jobDag.jobId", - equalTo(Long.parseLong(jobId))) - .body( - "[" - + i.get() - + "].metrics.SourceReceivedCount", - equalTo("100")) - .body( - "[" - + i.get() - + "].metrics.SinkWriteCount", - equalTo("100")) - .body( - "[" + i.get() + "].jobStatus", - equalTo("FINISHED")); - }); - - i.getAndIncrement(); - }); + @Test + public void testSubmitJobWithoutCustomJobId() { + AtomicInteger i = new AtomicInteger(); + Arrays.asList(server, secondServer) + .forEach( + container -> submitJobAndAssertResponse(container, i, paramJobName, false)); } @Test @@ -459,4 +379,81 @@ public class ClusterSeaTunnelContainer extends SeaTunnelContainer { return server; } + + private void submitJobAndAssertResponse( + GenericContainer<? extends GenericContainer<?>> container, + AtomicInteger i, + String customParam, + boolean isCustomJobId) { + Response response = submitJobAndResponse(container, i, customParam); + String jobId = response.getBody().jsonPath().getString("jobId"); + assertResponse(container, i, jobId, isCustomJobId); + i.getAndIncrement(); + } + + private Response submitJobAndResponse( + GenericContainer<? extends GenericContainer<?>> container, + AtomicInteger i, + String customParam) { + Response response = + i.get() == 0 + ? submitJob(container, "BATCH", jobName, customParam) + : submitJob(container, "BATCH", jobName, null); + if (i.get() == 0) { + response.then().statusCode(200).body("jobName", equalTo(paramJobName)); + } else { + response.then().statusCode(200).body("jobName", equalTo(jobName)); + } + return response; + } + + private void assertResponse( + GenericContainer<? extends GenericContainer<?>> container, + AtomicInteger i, + String jobId, + boolean isCustomJobId) { + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + assertWithStatusParameterOrNot( + container, i, jobId, isCustomJobId, true); + + // test for without status parameter. + assertWithStatusParameterOrNot( + container, i, jobId, isCustomJobId, false); + }); + } + + private void assertWithStatusParameterOrNot( + GenericContainer<? extends GenericContainer<?>> container, + AtomicInteger i, + String jobId, + boolean isCustomJobId, + boolean isStatusWithSubmitJob) { + String baseRestUrl = getBaseRestUrl(container); + String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" : baseRestUrl; + given().get(restUrl) + .then() + .statusCode(200) + .body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ? paramJobName : jobName)) + .body("[" + i.get() + "].errorMsg", equalTo(null)) + .body( + "[" + i.get() + "].jobId", + equalTo( + i.get() == 0 && isCustomJobId + ? Long.toString(CUSTOM_JOB_ID) + : jobId)) + .body("[" + i.get() + "].metrics.SourceReceivedCount", equalTo("100")) + .body("[" + i.get() + "].metrics.SinkWriteCount", equalTo("100")) + .body("[" + i.get() + "].jobStatus", equalTo("FINISHED")); + } + + private String getBaseRestUrl(GenericContainer<? extends GenericContainer<?>> container) { + return http + + container.getHost() + + colon + + container.getFirstMappedPort() + + RestConstant.FINISHED_JOBS_INFO; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index 6b822520c3..e250fdf936 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -38,7 +38,7 @@ import org.apache.seatunnel.engine.server.operation.SubmitJobOperation; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import org.apache.seatunnel.engine.server.utils.RestUtil; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import com.hazelcast.internal.ascii.TextCommandService; import com.hazelcast.internal.ascii.rest.HttpCommandProcessor; @@ -119,6 +119,8 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC boolean startWithSavePoint = Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)); + String jobIdStr = requestParams.get(RestConstant.JOB_ID); + Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? Long.parseLong(jobIdStr) : null; SeaTunnelServer seaTunnelServer = getSeaTunnelServer(); RestJobExecutionEnvironment restJobExecutionEnvironment = new RestJobExecutionEnvironment( @@ -127,9 +129,7 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC config, textCommandService.getNode(), startWithSavePoint, - startWithSavePoint - ? Long.parseLong(requestParams.get(RestConstant.JOB_ID)) - : null); + finalJobId); JobImmutableInformation jobImmutableInformation = restJobExecutionEnvironment.build(); Long jobId = jobImmutableInformation.getJobId(); if (!seaTunnelServer.isMasterNode()) { @@ -137,12 +137,10 @@ public class RestHttpPostCommandProcessor extends HttpCommandProcessor<HttpPostC NodeEngineUtil.sendOperationToMasterNode( getNode().nodeEngine, new SubmitJobOperation( - jobImmutableInformation.getJobId(), - getNode().nodeEngine.toData(jobImmutableInformation))) + jobId, getNode().nodeEngine.toData(jobImmutableInformation))) .join(); } else { - submitJob(seaTunnelServer, jobImmutableInformation, jobConfig); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java index a166d0a4d5..d13f1a49d8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; public class RestJobExecutionEnvironment extends AbstractJobEnvironment { @@ -63,7 +64,7 @@ public class RestJobExecutionEnvironment extends AbstractJobEnvironment { this.nodeEngine = node.getNodeEngine(); this.jobConfig.setJobContext( new JobContext( - isStartWithSavePoint + Objects.nonNull(jobId) ? jobId : nodeEngine .getHazelcastInstance()