This is an automated email from the ASF dual-hosted git repository.
xiaochenzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e400c2ca62 [Fix][Zeta] make the job failed when triggering checkpoint
fails (apache#10442) (#10448)
e400c2ca62 is described below
commit e400c2ca62c3e7ad2f4919f50b0c2e1912ba1ea2
Author: Sephiroth <[email protected]>
AuthorDate: Mon Mar 23 20:13:05 2026 +0800
[Fix][Zeta] make the job failed when triggering checkpoint fails
(apache#10442) (#10448)
---
.../server/checkpoint/CheckpointCoordinator.java | 11 +-
.../CheckpointBarrierTriggerErrorTest.java | 125 +++++++++++++++++++++
...o_console_checkpoint_barrier_trigger_error.conf | 53 +++++++++
3 files changed, 187 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 17985fc9c9..726b7c9d97 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -683,9 +683,16 @@ public class CheckpointCoordinator {
try {
CompletableFuture.allOf(completableFutureArray).get();
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ handleCoordinatorError(
+ "triggering checkpoint barrier has been
interrupted",
+ e,
+ CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
+ return;
} catch (Exception e) {
- LOG.error(ExceptionUtils.getMessage(e));
+ handleCoordinatorError(
+ "triggering checkpoint barrier failed",
+ e,
+ CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return;
}
if (coordinatorConfig.isCheckpointEnable()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
new file mode 100644
index 0000000000..ebcf282724
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+import
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+import com.hazelcast.internal.serialization.Data;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class CheckpointBarrierTriggerErrorTest extends
AbstractSeaTunnelServerTest {
+
+ private static final String CONF_PATH =
+ "stream_fake_to_console_checkpoint_barrier_trigger_error.conf";
+ private static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+ @Test
+ public void testCheckpointBarrierTriggerError()
+ throws NoSuchFieldException, IllegalAccessException {
+ long jobId = System.currentTimeMillis();
+ startJob(jobId, CONF_PATH);
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.RUNNING));
+
+ CheckpointManager spiedCheckpointManager =
spy(getCheckpointManager(jobId));
+ setCheckpointManager(spiedCheckpointManager);
+
+ doAnswer(this::mockException)
+ .when(spiedCheckpointManager)
+
.sendOperationToMemberNode(Mockito.any(CheckpointBarrierTriggerOperation.class));
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FAILED);
+ });
+ }
+
+ private void startJob(Long jobid, String path) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobid.toString(), jobid);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobid,
+ "Test",
+ false,
+ nodeEngine.getSerializationService(),
+ testLogicalDag,
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService()
+ .submitJob(jobid, data,
jobImmutableInformation.isStartWithSavePoint());
+ voidPassiveCompletableFuture.join();
+ }
+
+ private Object mockException(InvocationOnMock invocation) throws Throwable
{
+ if (COUNTER.incrementAndGet() == 1) {
+ throw new RuntimeException(
+ "An exception occurred while sending
CheckpointBarrierTriggerOperation.");
+ }
+ return invocation.callRealMethod();
+ }
+
+ private CheckpointManager getCheckpointManager(Long jobId)
+ throws NoSuchFieldException, IllegalAccessException {
+ JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
+ Field checkpointManagerField =
JobMaster.class.getDeclaredField("checkpointManager");
+ checkpointManagerField.setAccessible(true);
+ return (CheckpointManager) checkpointManagerField.get(jobMaster);
+ }
+
+ private void setCheckpointManager(CheckpointManager checkpointManager)
+ throws NoSuchFieldException, IllegalAccessException {
+ CheckpointCoordinator checkpointCoordinator =
checkpointManager.getCheckpointCoordinator(1);
+ Field checkpointManagerField =
+
CheckpointCoordinator.class.getDeclaredField("checkpointManager");
+ checkpointManagerField.setAccessible(true);
+ checkpointManagerField.set(checkpointCoordinator, checkpointManager);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
new file mode 100644
index 0000000000..b9f00bbaf4
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ job.retry.times = 0
+ checkpoint.interval = 1000
+ checkpoint.timeout = 60000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake1"
+ row.num = 1000
+ split.num = 100
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+
+ }
+}