This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 5430ca962 Fix Zeta UT error (#4664) 5430ca962 is described below commit 5430ca9621ffdd7f7ec183235eed29764ecd6205 Author: Eric <gaojun2...@gmail.com> AuthorDate: Thu Apr 27 17:02:26 2023 +0800 Fix Zeta UT error (#4664) --- .../engine/client/SeaTunnelClientTest.java | 223 ++++++++++++--------- .../src/test/resources/hazelcast.yaml | 8 +- 2 files changed, 140 insertions(+), 91 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 554a1e782..a15dad5e3 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobClient; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; @@ -35,10 +36,8 @@ import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; @@ -63,7 +62,6 @@ public class SeaTunnelClientTest { private static SeaTunnelConfig SEATUNNEL_CONFIG = ConfigProvider.locateAndGetSeaTunnelConfig(); private static HazelcastInstance INSTANCE; - private static SeaTunnelClient CLIENT; @BeforeAll public static void beforeClass() throws Exception { @@ -77,17 +75,17 @@ public class SeaTunnelClientTest { new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig())); } - @BeforeEach - void setUp() { + private SeaTunnelClient createSeaTunnelClient() { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest")); - CLIENT = new SeaTunnelClient(clientConfig); + return new SeaTunnelClient(clientConfig); } @Test public void testSayHello() { String msg = "Hello world"; - String s = CLIENT.printMessageToMaster(msg); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + String s = seaTunnelClient.printMessageToMaster(msg); Assertions.assertEquals(msg, s); } @@ -96,12 +94,13 @@ public class SeaTunnelClientTest { Common.setDeployMode(DeployMode.CLIENT); String filePath = TestUtils.getResource("/client_test.conf"); JobConfig jobConfig = new JobConfig(); - jobConfig.setName("fake_to_file"); + jobConfig.setName("testExecuteJob"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync( @@ -119,6 +118,8 @@ public class SeaTunnelClientTest { } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); } } @@ -127,12 +128,14 @@ public class SeaTunnelClientTest { Common.setDeployMode(DeployMode.CLIENT); String filePath = TestUtils.getResource("/client_test.conf"); JobConfig jobConfig = new JobConfig(); - jobConfig.setName("fake_to_console"); + jobConfig.setName("testGetJobState"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync( @@ -145,19 +148,24 @@ public class SeaTunnelClientTest { .untilAsserted( () -> Assertions.assertTrue( - CLIENT.getJobDetailStatus(jobId).contains("RUNNING") - && CLIENT.listJobStatus().contains("RUNNING"))); + jobClient.getJobDetailStatus(jobId).contains("RUNNING") + && jobClient + .listJobStatus(true) + .contains("RUNNING"))); await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertTrue( - CLIENT.getJobDetailStatus(jobId).contains("FINISHED") - && CLIENT.listJobStatus() + jobClient.getJobDetailStatus(jobId).contains("FINISHED") + && jobClient + .listJobStatus(true) .contains("FINISHED"))); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); } } @@ -166,12 +174,15 @@ public class SeaTunnelClientTest { Common.setDeployMode(DeployMode.CLIENT); String filePath = TestUtils.getResource("/client_test.conf"); JobConfig jobConfig = new JobConfig(); - jobConfig.setName("fake_to_console"); + jobConfig.setName("testGetJobMetrics"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync( @@ -184,11 +195,12 @@ public class SeaTunnelClientTest { .untilAsserted( () -> Assertions.assertTrue( - CLIENT.getJobDetailStatus(jobId).contains("FINISHED") - && CLIENT.listJobStatus() + jobClient.getJobDetailStatus(jobId).contains("FINISHED") + && jobClient + .listJobStatus(true) .contains("FINISHED"))); - String jobMetrics = CLIENT.getJobMetrics(jobId); + String jobMetrics = jobClient.getJobMetrics(jobId); Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT)); Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS)); @@ -197,6 +209,8 @@ public class SeaTunnelClientTest { } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); } } @@ -205,24 +219,36 @@ public class SeaTunnelClientTest { Common.setDeployMode(DeployMode.CLIENT); String filePath = TestUtils.getResource("/streaming_fake_to_console.conf"); JobConfig jobConfig = new JobConfig(); - jobConfig.setName("streaming_fake_to_console"); + jobConfig.setName("testCancelJob"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); - final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - long jobId = clientJobProxy.getJobId(); + long jobId = clientJobProxy.getJobId(); - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId))); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "RUNNING", jobClient.getJobStatus(jobId))); - CLIENT.cancelJob(jobId); + jobClient.cancelJob(jobId); - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId))); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "CANCELED", jobClient.getJobStatus(jobId))); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); + } } @Test @@ -232,31 +258,36 @@ public class SeaTunnelClientTest { JobConfig jobConfig = new JobConfig(); jobConfig.setName("fake_to_console"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); long jobId = clientJobProxy.getJobId(); // Running - Assertions.assertNotNull(CLIENT.getJobInfo(jobId)); + Assertions.assertNotNull(jobClient.getJobInfo(jobId)); await().atMost(180000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertTrue( - CLIENT.getJobDetailStatus(jobId).contains("FINISHED") - && CLIENT.listJobStatus() + jobClient.getJobDetailStatus(jobId).contains("FINISHED") + && jobClient + .listJobStatus(true) .contains("FINISHED"))); // Finished - JobDAGInfo jobInfo = CLIENT.getJobInfo(jobId); + JobDAGInfo jobInfo = jobClient.getJobInfo(jobId); Assertions.assertTrue( StringUtils.isNotEmpty(new ObjectMapper().writeValueAsString(jobInfo))); } catch (Exception e) { throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); } } @@ -267,56 +298,68 @@ public class SeaTunnelClientTest { JobConfig jobConfig = new JobConfig(); jobConfig.setName("streaming_fake_to_console.conf"); - JobExecutionEnvironment jobExecutionEnv = - CLIENT.createExecutionContext(filePath, jobConfig); - final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); - CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); - long jobId = clientJobProxy.getJobId(); - - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId))); - - RetryUtils.retryWithException( - () -> { - CLIENT.savePointJob(jobId); - return null; - }, - new RetryUtils.RetryMaterial( - Constant.OPERATION_RETRY_TIME, - true, - exception -> { - // If we do savepoint for a Job which initialization has not been - // completed yet, we will get an error. - // In this test case, we need retry savepoint. - return exception - .getCause() - .getMessage() - .contains("Task not all ready, savepoint error"); - }, - Constant.OPERATION_RETRY_SLEEP)); - - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("FINISHED", CLIENT.getJobStatus(jobId))); - - Thread.sleep(1000); - CLIENT.restoreExecutionContext(filePath, jobConfig, jobId).execute(); - - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId))); - - CLIENT.cancelJob(jobId); - - await().atMost(30000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId))); - } + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + + try { + JobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig); + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + long jobId = clientJobProxy.getJobId(); - @AfterEach - void tearDown() { - CLIENT.close(); + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "RUNNING", jobClient.getJobStatus(jobId))); + + RetryUtils.retryWithException( + () -> { + jobClient.savePointJob(jobId); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> { + // If we do savepoint for a Job which initialization has not been + // completed yet, we will get an error. + // In this test case, we need retry savepoint. + return exception + .getCause() + .getMessage() + .contains("Task not all ready, savepoint error"); + }, + Constant.OPERATION_RETRY_SLEEP)); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "FINISHED", jobClient.getJobStatus(jobId))); + + Thread.sleep(1000); + seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobId).execute(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "RUNNING", jobClient.getJobStatus(jobId))); + + jobClient.cancelJob(jobId); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "CANCELED", jobClient.getJobStatus(jobId))); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); + } } @AfterAll diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml index 21f4d544d..6e76442a3 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml @@ -34,4 +34,10 @@ hazelcast: initial-mode: EAGER class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore properties: - path: /tmp/file-store-map \ No newline at end of file + path: /tmp/file-store-map + + properties: + hazelcast.invocation.max.retry.count: 20 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 200 \ No newline at end of file