This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bfd70cd49b [Fix][Zeta] Avoid redundant checkpoint reads when disabled
checkpoint (#9552)
bfd70cd49b is described below
commit bfd70cd49badc02749be9af65549c495f154ffc4
Author: chestnufang <[email protected]>
AuthorDate: Mon Jul 28 16:12:56 2025 +0800
[Fix][Zeta] Avoid redundant checkpoint reads when disabled checkpoint
(#9552)
---
.../seatunnel/engine/server/CheckpointService.java | 3 +-
.../server/checkpoint/CheckpointManager.java | 27 ++--
.../seatunnel/engine/server/master/JobMaster.java | 4 +-
.../checkpoint/CheckpointCoordinatorTest.java | 3 +
.../server/checkpoint/CheckpointManagerTest.java | 1 +
.../server/checkpoint/CheckpointStorageTest.java | 138 ++++++++++++++++-----
...ake_to_console_without_checkpoint_interval.conf | 47 +++++++
7 files changed, 174 insertions(+), 49 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
index ce640f6369..5ec5895c8d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
+import lombok.Getter;
import lombok.SneakyThrows;
import java.io.IOException;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
* <p>The service provides the APIs to get the latest checkpoint data of a job.
*/
public class CheckpointService {
- private CheckpointStorage checkpointStorage;
+ @Getter private CheckpointStorage checkpointStorage;
private Serializer serializer = new ProtoStuffSerializer();
@SneakyThrows
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 2cfef3fce3..707c7c6f0d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -22,11 +22,8 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
-import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -81,9 +78,9 @@ public class CheckpointManager {
private final CheckpointStorage checkpointStorage;
- private final JobMaster jobMaster;
+ private final CheckpointConfig checkpointConfig;
- private final ExecutorService executorService;
+ private final JobMaster jobMaster;
public CheckpointManager(
long jobId,
@@ -92,19 +89,15 @@ public class CheckpointManager {
JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
CheckpointConfig checkpointConfig,
+ CheckpointStorage checkpointStorage,
ExecutorService executorService,
- IMap<Object, Object> runningJobStateIMap)
- throws CheckpointStorageException {
- this.executorService = executorService;
+ IMap<Object, Object> runningJobStateIMap) {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.jobMaster = jobMaster;
- this.checkpointStorage =
- FactoryUtil.discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CheckpointStorageFactory.class,
- checkpointConfig.getStorage().getStorage())
-
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ this.checkpointStorage = checkpointStorage;
+ this.checkpointConfig = checkpointConfig;
+
this.coordinatorMap =
MDCTracer.tracing(checkpointPlanMap.values().parallelStream())
.map(
@@ -115,7 +108,8 @@ public class CheckpointManager {
try {
idCounter.start();
PipelineState pipelineState = null;
- if (isStartWithSavePoint) {
+ if
(checkpointConfig.isCheckpointEnable()
+ && isStartWithSavePoint) {
pipelineState =
checkpointStorage
.getLatestCheckpointByJobIdAndPipelineId(
@@ -240,7 +234,8 @@ public class CheckpointManager {
* Listen to the {@link JobStatus} of the {@link Job}.
*/
public void clearCheckpointIfNeed(JobStatus jobStatus) {
- if ((jobStatus == JobStatus.FINISHED || jobStatus ==
JobStatus.CANCELED)
+ if (checkpointConfig.isCheckpointEnable()
+ && (jobStatus == JobStatus.FINISHED || jobStatus ==
JobStatus.CANCELED)
&& !isSavePointEnd()) {
checkpointStorage.deleteCheckpoint(jobId + "");
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3dab4a1231..273749e5be 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -32,7 +32,6 @@ import
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
-import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -317,7 +316,7 @@ public class JobMaster {
}
}
- public void initCheckPointManager(boolean restart) throws
CheckpointStorageException {
+ public void initCheckPointManager(boolean restart) {
this.checkpointManager =
new CheckpointManager(
jobImmutableInformation.getJobId(),
@@ -326,6 +325,7 @@ public class JobMaster {
this,
checkpointPlanMap,
jobCheckpointConfig,
+
seaTunnelServer.getCheckpointService().getCheckpointStorage(),
executorService,
runningJobStateIMap);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index 243361327a..f0fb67a664 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -64,6 +64,7 @@ public class CheckpointCoordinatorTest
null,
planMap,
checkpointConfig,
+ server.getCheckpointService().getCheckpointStorage(),
instance.getExecutorService("test"),
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
checkpointManager.acknowledgeTask(
@@ -100,6 +101,7 @@ public class CheckpointCoordinatorTest
null,
planMap,
checkpointConfig,
+
server.getCheckpointService().getCheckpointStorage(),
executorService,
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
@@ -144,6 +146,7 @@ public class CheckpointCoordinatorTest
null,
planMap,
checkpointConfig,
+
server.getCheckpointService().getCheckpointStorage(),
executorService,
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index ce5cca1780..664bc04395 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -88,6 +88,7 @@ public class CheckpointManagerTest extends
AbstractSeaTunnelServerTest {
null,
planMap,
new CheckpointConfig(),
+ server.getCheckpointService().getCheckpointStorage(),
instance.getExecutorService("test"),
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 7b2c84655a..3b054ffb6d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -17,24 +17,26 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
-import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.CheckpointService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.awaitility.Awaitility.await;
@@ -45,6 +47,8 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
"batch_fakesource_to_file_with_checkpoint.conf";
+ public static String BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH =
+ "batch_fake_to_console_without_checkpoint_interval.conf";
public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
"stream_fake_to_console_with_checkpoint.conf";
@@ -64,15 +68,8 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
public void testGenerateFileWhenSavepoint()
throws CheckpointStorageException, InterruptedException {
long jobId = System.currentTimeMillis();
- CheckpointConfig checkpointConfig =
-
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
- CheckpointStorage checkpointStorage =
- FactoryUtil.discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CheckpointStorageFactory.class,
- checkpointConfig.getStorage().getStorage())
-
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ CheckpointStorage checkpointStorage =
server.getCheckpointService().getCheckpointStorage();
startJob(jobId, STREAM_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
@@ -98,15 +95,8 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
@Test
public void testBatchJob() throws CheckpointStorageException {
long jobId = System.currentTimeMillis();
- CheckpointConfig checkpointConfig =
-
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
- CheckpointStorage checkpointStorage =
- FactoryUtil.discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CheckpointStorageFactory.class,
- checkpointConfig.getStorage().getStorage())
-
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ CheckpointStorage checkpointStorage =
server.getCheckpointService().getCheckpointStorage();
startJob(jobId, BATCH_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
@@ -126,12 +116,7 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
- CheckpointStorage checkpointStorage =
- FactoryUtil.discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CheckpointStorageFactory.class,
- checkpointConfig.getStorage().getStorage())
-
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ CheckpointStorage checkpointStorage =
server.getCheckpointService().getCheckpointStorage();
startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
@@ -151,12 +136,7 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
- CheckpointStorage checkpointStorage =
- FactoryUtil.discoverFactory(
- Thread.currentThread().getContextClassLoader(),
- CheckpointStorageFactory.class,
- checkpointConfig.getStorage().getStorage())
-
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ CheckpointStorage checkpointStorage =
server.getCheckpointService().getCheckpointStorage();
startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
@@ -177,4 +157,102 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
+
+ @Test
+ public void testBatchJobResetCheckpointStorage() throws
CheckpointStorageException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+ final CheckpointStorage originalCheckpointStorage =
+ server.getCheckpointService().getCheckpointStorage();
+
+ // access checkpoint storage counter
+ AtomicInteger accessCounter = new AtomicInteger(0);
+ CheckpointStorage checkpointStorage =
+ new CheckpointStorage() {
+ @Override
+ public String storeCheckPoint(PipelineState pipelineState)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return "";
+ }
+
+ @Override
+ public void asyncStoreCheckPoint(PipelineState
pipelineState)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public List<PipelineState> getAllCheckpoints(String s)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<PipelineState> getLatestCheckpoint(String s)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public PipelineState
getLatestCheckpointByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public List<PipelineState>
getCheckpointsByJobIdAndPipelineId(
+ String s, String s1) throws
CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s) {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public PipelineState getCheckpoint(String s, String s1,
String s2)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1, String
s2)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+
+ @Override
+ public void deleteCheckpoint(String s, String s1,
List<String> list)
+ throws CheckpointStorageException {
+ accessCounter.incrementAndGet();
+ }
+ };
+
+ // replace the checkpoint storage reused by the system
+ CheckpointService checkpointService = server.getCheckpointService();
+ ReflectionUtils.setField(checkpointService, "checkpointStorage",
checkpointStorage);
+
+ startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FINISHED));
+
+ checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+ Assertions.assertEquals(1, accessCounter.get());
+
+ // restore the server's checkpointStorage to avoid affecting other
unit cases
+ ReflectionUtils.setField(checkpointService, "checkpointStorage",
originalCheckpointStorage);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
new file mode 100644
index 0000000000..48d51ef435
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
SeaTunnel config
+######
+
+env {
+ # You can set SeaTunnel environment configuration here
+ parallelism = 2
+ job.mode = "BATCH"
+ # remove `checkpoint.interval` config
+ # checkpoint.interval = 10000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 2
+ plugin_output = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ }
+}