This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 77f61408b7 [Improve][Engine] Support custom job id for rest-api named 
submit-job (#7053)
77f61408b7 is described below

commit 77f61408b7e45d18979f6db410c64162514a218d
Author: dailai <dai...@chinatelecom.cn>
AuthorDate: Tue Jun 25 13:35:52 2024 +0800

    [Improve][Engine] Support custom job id for rest-api named submit-job 
(#7053)
---
 .../engine/e2e/ClusterSeaTunnelContainer.java      | 189 ++++++++++-----------
 .../server/rest/RestHttpPostCommandProcessor.java  |  12 +-
 .../server/rest/RestJobExecutionEnvironment.java   |   3 +-
 3 files changed, 100 insertions(+), 104 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index f3b5922b3b..2967d4227f 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -66,6 +66,8 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
     private static final Path hadoopJar =
             Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
 
+    private static final long CUSTOM_JOB_ID = 123456789;
+
     @Override
     @BeforeEach
     public void startUp() throws Exception {
@@ -101,106 +103,24 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
     }
 
     @Test
-    public void testSubmitJob() {
+    public void testSubmitJobWithCustomJobId() {
         AtomicInteger i = new AtomicInteger();
-
         Arrays.asList(server, secondServer)
                 .forEach(
-                        container -> {
-                            Response response =
-                                    i.get() == 0
-                                            ? submitJob(container, "BATCH", 
jobName, paramJobName)
-                                            : submitJob(container, "BATCH", 
jobName, null);
-                            if (i.get() == 0) {
-                                response.then()
-                                        .statusCode(200)
-                                        .body("jobName", 
equalTo(paramJobName));
-                            } else {
-                                
response.then().statusCode(200).body("jobName", equalTo(jobName));
-                            }
-                            String jobId = 
response.getBody().jsonPath().getString("jobId");
+                        container ->
+                                submitJobAndAssertResponse(
+                                        container,
+                                        i,
+                                        paramJobName + "&jobId=" + 
CUSTOM_JOB_ID,
+                                        true));
+    }
 
-                            Awaitility.await()
-                                    .atMost(2, TimeUnit.MINUTES)
-                                    .untilAsserted(
-                                            () -> {
-                                                given().get(
-                                                                http
-                                                                        + 
container.getHost()
-                                                                        + colon
-                                                                        + 
container
-                                                                               
 .getFirstMappedPort()
-                                                                        + 
RestConstant
-                                                                               
 .FINISHED_JOBS_INFO
-                                                                        + 
"/FINISHED")
-                                                        .then()
-                                                        .statusCode(200)
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobName",
-                                                                equalTo(
-                                                                        
i.get() == 0
-                                                                               
 ? paramJobName
-                                                                               
 : jobName))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].errorMsg",
-                                                                equalTo(null))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobDag.jobId",
-                                                                
equalTo(Long.parseLong(jobId)))
-                                                        .body(
-                                                                "["
-                                                                        + 
i.get()
-                                                                        + 
"].metrics.SourceReceivedCount",
-                                                                equalTo("100"))
-                                                        .body(
-                                                                "["
-                                                                        + 
i.get()
-                                                                        + 
"].metrics.SinkWriteCount",
-                                                                equalTo("100"))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobStatus",
-                                                                
equalTo("FINISHED"));
-
-                                                // test for without status 
parameter.
-                                                given().get(
-                                                                http
-                                                                        + 
container.getHost()
-                                                                        + colon
-                                                                        + 
container
-                                                                               
 .getFirstMappedPort()
-                                                                        + 
RestConstant
-                                                                               
 .FINISHED_JOBS_INFO)
-                                                        .then()
-                                                        .statusCode(200)
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobName",
-                                                                equalTo(
-                                                                        
i.get() == 0
-                                                                               
 ? paramJobName
-                                                                               
 : jobName))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].errorMsg",
-                                                                equalTo(null))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobDag.jobId",
-                                                                
equalTo(Long.parseLong(jobId)))
-                                                        .body(
-                                                                "["
-                                                                        + 
i.get()
-                                                                        + 
"].metrics.SourceReceivedCount",
-                                                                equalTo("100"))
-                                                        .body(
-                                                                "["
-                                                                        + 
i.get()
-                                                                        + 
"].metrics.SinkWriteCount",
-                                                                equalTo("100"))
-                                                        .body(
-                                                                "[" + i.get() 
+ "].jobStatus",
-                                                                
equalTo("FINISHED"));
-                                            });
-
-                            i.getAndIncrement();
-                        });
+    @Test
+    public void testSubmitJobWithoutCustomJobId() {
+        AtomicInteger i = new AtomicInteger();
+        Arrays.asList(server, secondServer)
+                .forEach(
+                        container -> submitJobAndAssertResponse(container, i, 
paramJobName, false));
     }
 
     @Test
@@ -459,4 +379,81 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
 
         return server;
     }
+
+    private void submitJobAndAssertResponse(
+            GenericContainer<? extends GenericContainer<?>> container,
+            AtomicInteger i,
+            String customParam,
+            boolean isCustomJobId) {
+        Response response = submitJobAndResponse(container, i, customParam);
+        String jobId = response.getBody().jsonPath().getString("jobId");
+        assertResponse(container, i, jobId, isCustomJobId);
+        i.getAndIncrement();
+    }
+
+    private Response submitJobAndResponse(
+            GenericContainer<? extends GenericContainer<?>> container,
+            AtomicInteger i,
+            String customParam) {
+        Response response =
+                i.get() == 0
+                        ? submitJob(container, "BATCH", jobName, customParam)
+                        : submitJob(container, "BATCH", jobName, null);
+        if (i.get() == 0) {
+            response.then().statusCode(200).body("jobName", 
equalTo(paramJobName));
+        } else {
+            response.then().statusCode(200).body("jobName", equalTo(jobName));
+        }
+        return response;
+    }
+
+    private void assertResponse(
+            GenericContainer<? extends GenericContainer<?>> container,
+            AtomicInteger i,
+            String jobId,
+            boolean isCustomJobId) {
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            assertWithStatusParameterOrNot(
+                                    container, i, jobId, isCustomJobId, true);
+
+                            // test for without status parameter.
+                            assertWithStatusParameterOrNot(
+                                    container, i, jobId, isCustomJobId, false);
+                        });
+    }
+
+    private void assertWithStatusParameterOrNot(
+            GenericContainer<? extends GenericContainer<?>> container,
+            AtomicInteger i,
+            String jobId,
+            boolean isCustomJobId,
+            boolean isStatusWithSubmitJob) {
+        String baseRestUrl = getBaseRestUrl(container);
+        String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" : 
baseRestUrl;
+        given().get(restUrl)
+                .then()
+                .statusCode(200)
+                .body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ? 
paramJobName : jobName))
+                .body("[" + i.get() + "].errorMsg", equalTo(null))
+                .body(
+                        "[" + i.get() + "].jobId",
+                        equalTo(
+                                i.get() == 0 && isCustomJobId
+                                        ? Long.toString(CUSTOM_JOB_ID)
+                                        : jobId))
+                .body("[" + i.get() + "].metrics.SourceReceivedCount", 
equalTo("100"))
+                .body("[" + i.get() + "].metrics.SinkWriteCount", 
equalTo("100"))
+                .body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
+    }
+
+    private String getBaseRestUrl(GenericContainer<? extends 
GenericContainer<?>> container) {
+        return http
+                + container.getHost()
+                + colon
+                + container.getFirstMappedPort()
+                + RestConstant.FINISHED_JOBS_INFO;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 6b822520c3..e250fdf936 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -38,7 +38,7 @@ import 
org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import org.apache.seatunnel.engine.server.utils.RestUtil;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import com.hazelcast.internal.ascii.TextCommandService;
 import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
@@ -119,6 +119,8 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
 
         boolean startWithSavePoint =
                 
Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
+        String jobIdStr = requestParams.get(RestConstant.JOB_ID);
+        Long finalJobId = StringUtils.isNotBlank(jobIdStr) ? 
Long.parseLong(jobIdStr) : null;
         SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
         RestJobExecutionEnvironment restJobExecutionEnvironment =
                 new RestJobExecutionEnvironment(
@@ -127,9 +129,7 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
                         config,
                         textCommandService.getNode(),
                         startWithSavePoint,
-                        startWithSavePoint
-                                ? 
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
-                                : null);
+                        finalJobId);
         JobImmutableInformation jobImmutableInformation = 
restJobExecutionEnvironment.build();
         Long jobId = jobImmutableInformation.getJobId();
         if (!seaTunnelServer.isMasterNode()) {
@@ -137,12 +137,10 @@ public class RestHttpPostCommandProcessor extends 
HttpCommandProcessor<HttpPostC
             NodeEngineUtil.sendOperationToMasterNode(
                             getNode().nodeEngine,
                             new SubmitJobOperation(
-                                    jobImmutableInformation.getJobId(),
-                                    
getNode().nodeEngine.toData(jobImmutableInformation)))
+                                    jobId, 
getNode().nodeEngine.toData(jobImmutableInformation)))
                     .join();
 
         } else {
-
             submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
         }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index a166d0a4d5..d13f1a49d8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 public class RestJobExecutionEnvironment extends AbstractJobEnvironment {
@@ -63,7 +64,7 @@ public class RestJobExecutionEnvironment extends 
AbstractJobEnvironment {
         this.nodeEngine = node.getNodeEngine();
         this.jobConfig.setJobContext(
                 new JobContext(
-                        isStartWithSavePoint
+                        Objects.nonNull(jobId)
                                 ? jobId
                                 : nodeEngine
                                         .getHazelcastInstance()

Reply via email to