This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new af38fac149 [Improve][Zeta] Remove misleading exception log when job be canceled (#8988) af38fac149 is described below commit af38fac149be4acabb2f63f4565b4442f6308b7d Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Fri Mar 21 00:46:40 2025 +0800 [Improve][Zeta] Remove misleading exception log when job be canceled (#8988) --- .../container/seatunnel/SeaTunnelContainer.java | 4 +- .../e2e/sink/inmemory/InMemorySinkFactory.java | 4 ++ .../e2e/sink/inmemory/InMemorySinkWriter.java | 8 ++++ .../seatunnel/engine/e2e/JobClientJobProxyIT.java | 44 ++++++++++++++++++++ .../test/resources/seatunnel_fixed_slot_num.yaml | 3 ++ ...kesource_to_inmemory_pending_row_in_queue.conf} | 48 ++++++++++++++-------- .../engine/server/TaskExecutionService.java | 6 ++- 7 files changed, 96 insertions(+), 21 deletions(-) diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index 254519a813..1be4cb4ca6 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -159,8 +159,8 @@ public class SeaTunnelContainer extends AbstractTestContainer { "seatunnel-engine:" + JDK_DOCKER_IMAGE))) .waitingFor(Wait.forLogMessage(".*received new worker register:.*", 1)); copySeaTunnelStarterToContainer(server); - server.setPortBindings(Collections.singletonList("5801:5801")); - server.setExposedPorts(Collections.singletonList(5801)); + server.setPortBindings(Arrays.asList("5801:5801", "8080:8080")); + server.setExposedPorts(Arrays.asList(5801, 8080)); server.withCopyFileToContainer( MountableFile.forHostPath( diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 9ba1956dbe..e8722bb74b 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -40,6 +40,9 @@ public class InMemorySinkFactory public static final Option<Boolean> THROW_EXCEPTION = Options.key("throw_exception").booleanType().defaultValue(false); + public static final Option<Boolean> WRITER_SLEEP = + Options.key("writer_sleep").booleanType().defaultValue(false); + public static final Option<Boolean> THROW_OUT_OF_MEMORY = Options.key("throw_out_of_memory").booleanType().defaultValue(false); public static final Option<Boolean> CHECKPOINT_SLEEP = @@ -66,6 +69,7 @@ public class InMemorySinkFactory .optional( THROW_EXCEPTION, THROW_OUT_OF_MEMORY, + WRITER_SLEEP, CHECKPOINT_SLEEP, THROW_EXCEPTION_OF_COMMITTER, ASSERT_OPTIONS_KEY, diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java index 81c8cf0af5..b5dc29e41c 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java @@ -79,6 +79,14 @@ public class InMemorySinkWriter @Override public void write(SeaTunnelRow element) throws IOException { + if (config.get(InMemorySinkFactory.WRITER_SLEEP)) { + try { + Thread.sleep(999999999L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) { throw new OutOfMemoryError(); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index b7f2ea53f8..aa4d959383 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -24,10 +24,16 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +@Slf4j public class JobClientJobProxyIT extends SeaTunnelEngineContainer { @Override @@ -73,6 +79,44 @@ public class JobClientJobProxyIT extends SeaTunnelEngineContainer { server.getLogs().contains("wrong target release operation with job")); } + @Test + public void testNoExceptionLogWhenCancelJob() throws IOException, InterruptedException { + String jobId = String.valueOf(System.currentTimeMillis()); + CompletableFuture.runAsync( + () -> { + try { + executeJob( + "/stream_fakesource_to_inmemory_pending_row_in_queue.conf", jobId); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(); + } + }); + + given().pollDelay(10, TimeUnit.SECONDS) + .await() + .pollDelay(5000L, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals("RUNNING", this.getJobStatus(jobId)); + }); + + String logBeforeCancel = this.getServerLogs(); + cancelJob(jobId); + given().pollDelay(10, TimeUnit.SECONDS) + .await() + .pollDelay(5000L, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals("CANCELED", this.getJobStatus(jobId)); + }); + String logAfterCancel = this.getServerLogs().substring(logBeforeCancel.length()); + // in TaskExecutionService.BlockingWorker::run catch Throwable + Assertions.assertFalse(logAfterCancel.contains("Exception in"), logAfterCancel); + Assertions.assertEquals( + 4, StringUtils.countMatches(logAfterCancel, "Interrupted task"), logAfterCancel); + } + @Test public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException { Container.ExecResult execResult = diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml index 4be4ec075c..146ffc0379 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml @@ -33,3 +33,6 @@ seatunnel: max-retained: 3 plugin-config: namespace: /tmp/seatunnel/checkpoint_snapshot/ + http: + enable-http: true + port: 8080 \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf similarity index 59% copy from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf index 4be4ec075c..df3d922937 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf @@ -14,22 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. # +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### -seatunnel: - engine: - history-job-expire-minutes: 1 - classloader-cache-mode: false - backup-count: 2 - queue-type: blockingqueue - print-execution-info-interval: 10 - slot-service: - dynamic-slot: false - slot-num: 3 - checkpoint: - interval: 300000 - timeout: 100000 - storage: - type: localfile - max-retained: 3 - plugin-config: - namespace: /tmp/seatunnel/checkpoint_snapshot/ +env { + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + # More than TaskGroupWithIntermediateBlockingQueue::QUEUE_SIZE + row.num = 9999 + parallelism = 1 + schema = { + fields { + c_int = int + } + } + } +} + +transform { +} + +sink { + InMemory { + writer_sleep = true + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index d42e95d480..afd12cbc7b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -702,7 +702,11 @@ public class TaskExecutionService implements DynamicMetricsProvider { taskGroupExecutionTracker.exception(e); } } catch (Throwable e) { - logger.warning("Exception in " + t, e); + if (taskGroupExecutionTracker.isCancel.get()) { + logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t)); + } else { + logger.warning("Exception in " + t, e); + } taskGroupExecutionTracker.exception(e); } finally { taskGroupExecutionTracker.taskDone(t);