This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bfd70cd49b [Fix][Zeta] Avoid redundant checkpoint reads when disabled 
checkpoint (#9552)
bfd70cd49b is described below

commit bfd70cd49badc02749be9af65549c495f154ffc4
Author: chestnufang <[email protected]>
AuthorDate: Mon Jul 28 16:12:56 2025 +0800

    [Fix][Zeta] Avoid redundant checkpoint reads when disabled checkpoint 
(#9552)
---
 .../seatunnel/engine/server/CheckpointService.java |   3 +-
 .../server/checkpoint/CheckpointManager.java       |  27 ++--
 .../seatunnel/engine/server/master/JobMaster.java  |   4 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   3 +
 .../server/checkpoint/CheckpointManagerTest.java   |   1 +
 .../server/checkpoint/CheckpointStorageTest.java   | 138 ++++++++++++++++-----
 ...ake_to_console_without_checkpoint_interval.conf |  47 +++++++
 7 files changed, 174 insertions(+), 49 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
index ce640f6369..5ec5895c8d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CheckpointService.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
 
+import lombok.Getter;
 import lombok.SneakyThrows;
 
 import java.io.IOException;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
  * <p>The service provides the APIs to get the latest checkpoint data of a job.
  */
 public class CheckpointService {
-    private CheckpointStorage checkpointStorage;
+    @Getter private CheckpointStorage checkpointStorage;
     private Serializer serializer = new ProtoStuffSerializer();
 
     @SneakyThrows
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 2cfef3fce3..707c7c6f0d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -22,11 +22,8 @@ import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
 import org.apache.seatunnel.api.tracing.MDCTracer;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
-import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
-import org.apache.seatunnel.engine.common.utils.FactoryUtil;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
@@ -81,9 +78,9 @@ public class CheckpointManager {
 
     private final CheckpointStorage checkpointStorage;
 
-    private final JobMaster jobMaster;
+    private final CheckpointConfig checkpointConfig;
 
-    private final ExecutorService executorService;
+    private final JobMaster jobMaster;
 
     public CheckpointManager(
             long jobId,
@@ -92,19 +89,15 @@ public class CheckpointManager {
             JobMaster jobMaster,
             Map<Integer, CheckpointPlan> checkpointPlanMap,
             CheckpointConfig checkpointConfig,
+            CheckpointStorage checkpointStorage,
             ExecutorService executorService,
-            IMap<Object, Object> runningJobStateIMap)
-            throws CheckpointStorageException {
-        this.executorService = executorService;
+            IMap<Object, Object> runningJobStateIMap) {
         this.jobId = jobId;
         this.nodeEngine = nodeEngine;
         this.jobMaster = jobMaster;
-        this.checkpointStorage =
-                FactoryUtil.discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CheckpointStorageFactory.class,
-                                checkpointConfig.getStorage().getStorage())
-                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        this.checkpointStorage = checkpointStorage;
+        this.checkpointConfig = checkpointConfig;
+
         this.coordinatorMap =
                 MDCTracer.tracing(checkpointPlanMap.values().parallelStream())
                         .map(
@@ -115,7 +108,8 @@ public class CheckpointManager {
                                     try {
                                         idCounter.start();
                                         PipelineState pipelineState = null;
-                                        if (isStartWithSavePoint) {
+                                        if 
(checkpointConfig.isCheckpointEnable()
+                                                && isStartWithSavePoint) {
                                             pipelineState =
                                                     checkpointStorage
                                                             
.getLatestCheckpointByJobIdAndPipelineId(
@@ -240,7 +234,8 @@ public class CheckpointManager {
      * Listen to the {@link JobStatus} of the {@link Job}.
      */
     public void clearCheckpointIfNeed(JobStatus jobStatus) {
-        if ((jobStatus == JobStatus.FINISHED || jobStatus == 
JobStatus.CANCELED)
+        if (checkpointConfig.isCheckpointEnable()
+                && (jobStatus == JobStatus.FINISHED || jobStatus == 
JobStatus.CANCELED)
                 && !isSavePointEnd()) {
             checkpointStorage.deleteCheckpoint(jobId + "");
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3dab4a1231..273749e5be 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -32,7 +32,6 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
-import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -317,7 +316,7 @@ public class JobMaster {
         }
     }
 
-    public void initCheckPointManager(boolean restart) throws 
CheckpointStorageException {
+    public void initCheckPointManager(boolean restart) {
         this.checkpointManager =
                 new CheckpointManager(
                         jobImmutableInformation.getJobId(),
@@ -326,6 +325,7 @@ public class JobMaster {
                         this,
                         checkpointPlanMap,
                         jobCheckpointConfig,
+                        
seaTunnelServer.getCheckpointService().getCheckpointStorage(),
                         executorService,
                         runningJobStateIMap);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
index 243361327a..f0fb67a664 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorTest.java
@@ -64,6 +64,7 @@ public class CheckpointCoordinatorTest
                         null,
                         planMap,
                         checkpointConfig,
+                        server.getCheckpointService().getCheckpointStorage(),
                         instance.getExecutorService("test"),
                         
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
         checkpointManager.acknowledgeTask(
@@ -100,6 +101,7 @@ public class CheckpointCoordinatorTest
                             null,
                             planMap,
                             checkpointConfig,
+                            
server.getCheckpointService().getCheckpointStorage(),
                             executorService,
                             
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
 
@@ -144,6 +146,7 @@ public class CheckpointCoordinatorTest
                             null,
                             planMap,
                             checkpointConfig,
+                            
server.getCheckpointService().getCheckpointStorage(),
                             executorService,
                             
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)) {
                         @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index ce5cca1780..664bc04395 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -88,6 +88,7 @@ public class CheckpointManagerTest extends 
AbstractSeaTunnelServerTest {
                         null,
                         planMap,
                         new CheckpointConfig(),
+                        server.getCheckpointService().getCheckpointStorage(),
                         instance.getExecutorService("test"),
                         
nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE));
         Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 7b2c84655a..3b054ffb6d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -17,24 +17,26 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
-import org.apache.seatunnel.engine.common.utils.FactoryUtil;
 import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.CheckpointService;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.awaitility.Awaitility.await;
 
@@ -45,6 +47,8 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
     public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
     public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
             "batch_fakesource_to_file_with_checkpoint.conf";
+    public static String BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH =
+            "batch_fake_to_console_without_checkpoint_interval.conf";
 
     public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
             "stream_fake_to_console_with_checkpoint.conf";
@@ -64,15 +68,8 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
     public void testGenerateFileWhenSavepoint()
             throws CheckpointStorageException, InterruptedException {
         long jobId = System.currentTimeMillis();
-        CheckpointConfig checkpointConfig =
-                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
 
-        CheckpointStorage checkpointStorage =
-                FactoryUtil.discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CheckpointStorageFactory.class,
-                                checkpointConfig.getStorage().getStorage())
-                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        CheckpointStorage checkpointStorage = 
server.getCheckpointService().getCheckpointStorage();
         startJob(jobId, STREAM_CONF_PATH, false);
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
@@ -98,15 +95,8 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
     @Test
     public void testBatchJob() throws CheckpointStorageException {
         long jobId = System.currentTimeMillis();
-        CheckpointConfig checkpointConfig =
-                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
 
-        CheckpointStorage checkpointStorage =
-                FactoryUtil.discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CheckpointStorageFactory.class,
-                                checkpointConfig.getStorage().getStorage())
-                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        CheckpointStorage checkpointStorage = 
server.getCheckpointService().getCheckpointStorage();
         startJob(jobId, BATCH_CONF_PATH, false);
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
@@ -126,12 +116,7 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
                 
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
         
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
 
-        CheckpointStorage checkpointStorage =
-                FactoryUtil.discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CheckpointStorageFactory.class,
-                                checkpointConfig.getStorage().getStorage())
-                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        CheckpointStorage checkpointStorage = 
server.getCheckpointService().getCheckpointStorage();
         startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
@@ -151,12 +136,7 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
                 
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
         
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
 
-        CheckpointStorage checkpointStorage =
-                FactoryUtil.discoverFactory(
-                                Thread.currentThread().getContextClassLoader(),
-                                CheckpointStorageFactory.class,
-                                checkpointConfig.getStorage().getStorage())
-                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        CheckpointStorage checkpointStorage = 
server.getCheckpointService().getCheckpointStorage();
         startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
         await().atMost(120000, TimeUnit.MILLISECONDS)
                 .untilAsserted(
@@ -177,4 +157,102 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
                 checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
         Assertions.assertEquals(0, allCheckpoints.size());
     }
+
+    @Test
+    public void testBatchJobResetCheckpointStorage() throws 
CheckpointStorageException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+        final CheckpointStorage originalCheckpointStorage =
+                server.getCheckpointService().getCheckpointStorage();
+
+        // access checkpoint storage counter
+        AtomicInteger accessCounter = new AtomicInteger(0);
+        CheckpointStorage checkpointStorage =
+                new CheckpointStorage() {
+                    @Override
+                    public String storeCheckPoint(PipelineState pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return "";
+                    }
+
+                    @Override
+                    public void asyncStoreCheckPoint(PipelineState 
pipelineState)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public List<PipelineState> getAllCheckpoints(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public List<PipelineState> getLatestCheckpoint(String s)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public PipelineState 
getLatestCheckpointByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public List<PipelineState> 
getCheckpointsByJobIdAndPipelineId(
+                            String s, String s1) throws 
CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s) {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public PipelineState getCheckpoint(String s, String s1, 
String s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                        return null;
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, String 
s2)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+
+                    @Override
+                    public void deleteCheckpoint(String s, String s1, 
List<String> list)
+                            throws CheckpointStorageException {
+                        accessCounter.incrementAndGet();
+                    }
+                };
+
+        // replace the checkpoint storage reused by the system
+        CheckpointService checkpointService = server.getCheckpointService();
+        ReflectionUtils.setField(checkpointService, "checkpointStorage", 
checkpointStorage);
+
+        startJob(jobId, BATCH_CONF_WITHOUT_CHECKPOINT_INTERVAL_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+
+        checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+        Assertions.assertEquals(1, accessCounter.get());
+
+        // restore the server's checkpointStorage to avoid affecting other 
unit cases
+        ReflectionUtils.setField(checkpointService, "checkpointStorage", 
originalCheckpointStorage);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
new file mode 100644
index 0000000000..48d51ef435
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fake_to_console_without_checkpoint_interval.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
SeaTunnel config
+######
+
+env {
+  # You can set SeaTunnel environment configuration here
+  parallelism = 2
+  job.mode = "BATCH"
+  # remove `checkpoint.interval` config
+  # checkpoint.interval = 10000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 2
+    plugin_output = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+  }
+}

Reply via email to