This is an automated email from the ASF dual-hosted git repository. zhouyao2023 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 56d99562a1 [Improve] Add `SavePointFailedException` for savepoint failed (#5994) 56d99562a1 is described below commit 56d99562a1640d6ab2f322c7758dda10bbb96c12 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Mon Dec 18 23:11:48 2023 +0800 [Improve] Add `SavePointFailedException` for savepoint failed (#5994) * [Improve] Add `SavePointFailedException` for savepoint failed * update --- .../common/exception/SavePointFailedException.java | 34 ++++++++++++++++++++++ .../engine/server/CoordinatorService.java | 10 ++++--- .../engine/server/checkpoint/SavePointTest.java | 14 +++++++++ 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SavePointFailedException.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SavePointFailedException.java new file mode 100644 index 0000000000..06b9cc0d87 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/exception/SavePointFailedException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.common.exception; + +public class SavePointFailedException extends SeaTunnelEngineException { + + public SavePointFailedException(String message) { + super(message); + } + + public SavePointFailedException(String message, Throwable throwable) { + super(message, throwable); + } + + @Override + public Throwable createException(String s, Throwable throwable) { + return new SavePointFailedException(s, throwable); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 98b5f03b3a..a5028f3597 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig; import org.apache.seatunnel.engine.common.exception.JobException; import org.apache.seatunnel.engine.common.exception.JobNotFoundException; +import org.apache.seatunnel.engine.common.exception.SavePointFailedException; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobDAGInfo; @@ -448,10 +449,11 @@ public class CoordinatorService { public PassiveCompletableFuture<Void> savePoint(long jobId) { CompletableFuture<Void> voidCompletableFuture = new CompletableFuture<>(); if (!runningJobMasterMap.containsKey(jobId)) { - Throwable throwable = - new Throwable("The jobId: " + jobId + "of savePoint does not exist"); - logger.warning(throwable); - voidCompletableFuture.completeExceptionally(throwable); + SavePointFailedException exception = + new SavePointFailedException( + "The job with id '" + jobId + "' not running, save point failed"); + logger.warning(exception); + voidCompletableFuture.completeExceptionally(exception); } else { voidCompletableFuture = new PassiveCompletableFuture<>( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java index 43fc98cd7b..23b22b183c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.engine.server.checkpoint; import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.common.exception.SavePointFailedException; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -34,6 +35,7 @@ import org.junit.jupiter.api.condition.OS; import com.hazelcast.internal.serialization.Data; import java.util.Collections; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; @@ -49,6 +51,18 @@ public class SavePointTest extends AbstractSeaTunnelServerTest { savePointAndRestore(false); } + @Test + public void testSavePointWithNotExistedJob() { + CompletionException exception = + Assertions.assertThrows( + CompletionException.class, + () -> server.getCoordinatorService().savePoint(1L).join()); + Assertions.assertTrue(exception.getCause() instanceof SavePointFailedException); + Assertions.assertEquals( + "The job with id '1' not running, save point failed", + exception.getCause().getMessage()); + } + @Test @Disabled() public void testSavePointOnServerRestart() throws InterruptedException {