This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new af38fac149 [Improve][Zeta] Remove misleading exception log when job be 
canceled (#8988)
af38fac149 is described below

commit af38fac149be4acabb2f63f4565b4442f6308b7d
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Fri Mar 21 00:46:40 2025 +0800

    [Improve][Zeta] Remove misleading exception log when job be canceled (#8988)
---
 .../container/seatunnel/SeaTunnelContainer.java    |  4 +-
 .../e2e/sink/inmemory/InMemorySinkFactory.java     |  4 ++
 .../e2e/sink/inmemory/InMemorySinkWriter.java      |  8 ++++
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java  | 44 ++++++++++++++++++++
 .../test/resources/seatunnel_fixed_slot_num.yaml   |  3 ++
 ...kesource_to_inmemory_pending_row_in_queue.conf} | 48 ++++++++++++++--------
 .../engine/server/TaskExecutionService.java        |  6 ++-
 7 files changed, 96 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 254519a813..1be4cb4ca6 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -159,8 +159,8 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                                                 "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
                         .waitingFor(Wait.forLogMessage(".*received new worker 
register:.*", 1));
         copySeaTunnelStarterToContainer(server);
-        server.setPortBindings(Collections.singletonList("5801:5801"));
-        server.setExposedPorts(Collections.singletonList(5801));
+        server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
+        server.setExposedPorts(Arrays.asList(5801, 8080));
 
         server.withCopyFileToContainer(
                 MountableFile.forHostPath(
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 9ba1956dbe..e8722bb74b 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -40,6 +40,9 @@ public class InMemorySinkFactory
     public static final Option<Boolean> THROW_EXCEPTION =
             Options.key("throw_exception").booleanType().defaultValue(false);
 
+    public static final Option<Boolean> WRITER_SLEEP =
+            Options.key("writer_sleep").booleanType().defaultValue(false);
+
     public static final Option<Boolean> THROW_OUT_OF_MEMORY =
             
Options.key("throw_out_of_memory").booleanType().defaultValue(false);
     public static final Option<Boolean> CHECKPOINT_SLEEP =
@@ -66,6 +69,7 @@ public class InMemorySinkFactory
                 .optional(
                         THROW_EXCEPTION,
                         THROW_OUT_OF_MEMORY,
+                        WRITER_SLEEP,
                         CHECKPOINT_SLEEP,
                         THROW_EXCEPTION_OF_COMMITTER,
                         ASSERT_OPTIONS_KEY,
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
index 81c8cf0af5..b5dc29e41c 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java
@@ -79,6 +79,14 @@ public class InMemorySinkWriter
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (config.get(InMemorySinkFactory.WRITER_SLEEP)) {
+            try {
+                Thread.sleep(999999999L);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
         if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
             throw new OutOfMemoryError();
         }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index b7f2ea53f8..aa4d959383 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -24,10 +24,16 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
 
+@Slf4j
 public class JobClientJobProxyIT extends SeaTunnelEngineContainer {
 
     @Override
@@ -73,6 +79,44 @@ public class JobClientJobProxyIT extends 
SeaTunnelEngineContainer {
                 server.getLogs().contains("wrong target release operation with 
job"));
     }
 
+    @Test
+    public void testNoExceptionLogWhenCancelJob() throws IOException, 
InterruptedException {
+        String jobId = String.valueOf(System.currentTimeMillis());
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        executeJob(
+                                
"/stream_fakesource_to_inmemory_pending_row_in_queue.conf", jobId);
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException();
+                    }
+                });
+
+        given().pollDelay(10, TimeUnit.SECONDS)
+                .await()
+                .pollDelay(5000L, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals("RUNNING", 
this.getJobStatus(jobId));
+                        });
+
+        String logBeforeCancel = this.getServerLogs();
+        cancelJob(jobId);
+        given().pollDelay(10, TimeUnit.SECONDS)
+                .await()
+                .pollDelay(5000L, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals("CANCELED", 
this.getJobStatus(jobId));
+                        });
+        String logAfterCancel = 
this.getServerLogs().substring(logBeforeCancel.length());
+        // in TaskExecutionService.BlockingWorker::run catch Throwable
+        Assertions.assertFalse(logAfterCancel.contains("Exception in"), 
logAfterCancel);
+        Assertions.assertEquals(
+                4, StringUtils.countMatches(logAfterCancel, "Interrupted 
task"), logAfterCancel);
+    }
+
     @Test
     public void testMultiTableSinkFailedWithThrowable() throws IOException, 
InterruptedException {
         Container.ExecResult execResult =
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
index 4be4ec075c..146ffc0379 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
@@ -33,3 +33,6 @@ seatunnel:
         max-retained: 3
         plugin-config:
           namespace: /tmp/seatunnel/checkpoint_snapshot/
+    http:
+      enable-http: true
+      port: 8080
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
similarity index 59%
copy from 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
index 4be4ec075c..df3d922937 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fakesource_to_inmemory_pending_row_in_queue.conf
@@ -14,22 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
-seatunnel:
-  engine:
-    history-job-expire-minutes: 1
-    classloader-cache-mode: false
-    backup-count: 2
-    queue-type: blockingqueue
-    print-execution-info-interval: 10
-    slot-service:
-      dynamic-slot: false
-      slot-num: 3
-    checkpoint:
-      interval: 300000
-      timeout: 100000
-      storage:
-        type: localfile
-        max-retained: 3
-        plugin-config:
-          namespace: /tmp/seatunnel/checkpoint_snapshot/
+env {
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    # More than TaskGroupWithIntermediateBlockingQueue::QUEUE_SIZE
+    row.num = 9999
+    parallelism = 1
+    schema = {
+      fields {
+        c_int = int
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    writer_sleep = true
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index d42e95d480..afd12cbc7b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -702,7 +702,11 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     taskGroupExecutionTracker.exception(e);
                 }
             } catch (Throwable e) {
-                logger.warning("Exception in " + t, e);
+                if (taskGroupExecutionTracker.isCancel.get()) {
+                    logger.warning(String.format("Interrupted task %d - %s", 
t.getTaskID(), t));
+                } else {
+                    logger.warning("Exception in " + t, e);
+                }
                 taskGroupExecutionTracker.exception(e);
             } finally {
                 taskGroupExecutionTracker.taskDone(t);

Reply via email to