This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fc914759ff [Feature][Zeta] In batch mode, checkpoint can be disabled 
(#5914)
fc914759ff is described below

commit fc914759ff938130a906d9fa0e814b78dbd44614
Author: happyboy1024 <137260654+happyboy1...@users.noreply.github.com>
AuthorDate: Wed Nov 29 09:14:42 2023 +0800

    [Feature][Zeta] In batch mode, checkpoint can be disabled (#5914)
    
    
    
    ---------
    
    Co-authored-by: dengjunjie <296442...@qq.com>
---
 docs/en/concept/JobEnvConfig.md                    |   2 +
 .../src/test/resources/conf/getCatalogTable.conf   |   1 -
 .../flink/execution/FlinkRuntimeEnvironment.java   |   5 +-
 .../test/resources/args/user_defined_params.conf   |   1 -
 .../resources/jdbc_mysql_source_and_sink_xa.conf   |   1 -
 .../resources/jdbc_hive_source_and_assert.conf     |   1 -
 .../jdbc_postgres_source_and_sink_xa.conf          |   1 -
 .../resources/neo4j/fake_to_neo4j_batch_write.conf |   1 -
 .../test/resources/batch_pulsar_to_console.conf    |   1 -
 .../flink/AbstractTestFlinkContainer.java          |   5 +
 .../connector-seatunnel-e2e-base/pom.xml           |  30 +++
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   | 279 +++++++++++++++++++++
 .../test/resources/batch_fakesource_to_file.conf   |   1 -
 .../batch_fakesource_to_file_complex.conf          |   1 -
 ...fakesource_to_localfile_checkpoint_disable.conf |  26 +-
 .../sink_file_text_to_assert.conf                  |  37 +--
 ..._fakesource_to_localfile_checkpoint_enable.conf |  27 +-
 .../sink_file_text_to_assert.conf                  |  37 +--
 .../sink_file_text_to_assert.conf                  |  37 +--
 .../stream_fakesource_to_localfile.conf            |  18 +-
 .../stream_fakesource_to_localfile_interval.conf   |  19 +-
 .../src/test/resources/log4j2.properties           |   4 +
 .../test/resources/batch_fakesource_to_file.conf   |   1 -
 .../batch_fakesource_to_file_complex.conf          |   1 -
 .../resources/batch_fakesource_to_two_file.conf    |   1 -
 .../src/test/resources/client_test.conf            |   1 -
 .../common/config/server/CheckpointConfig.java     |   2 +
 .../server/checkpoint/CheckpointCoordinator.java   |   8 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  12 +-
 .../test/resources/batch_fakesource_to_file.conf   |   1 -
 .../batch_fakesource_to_file_complex.conf          |   1 -
 .../src/test/resources/fake_to_console.conf        |   1 -
 .../resources/fake_to_console_job_metrics.conf     |   1 -
 .../main/resources/examples/fake_to_console.conf   |   1 -
 34 files changed, 444 insertions(+), 122 deletions(-)

diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 7272c90fcc..ce1716e3c0 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -18,6 +18,8 @@ You can configure whether the task is in batch mode or stream 
mode through `job.
 
 Gets the interval in which checkpoints are periodically scheduled.
 
+In `STREAMING` mode, checkpoints is required, if you do not set it, it will be 
obtained from the application configuration file `seatunnel.yaml`. In `BATCH` 
mode, you can disable checkpoints by not setting this parameter.
+
 ## parallelism
 
 This parameter configures the parallelism of source and sink.
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf 
b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
index 485f026a0d..63cf0f6cf8 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++ b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
@@ -17,7 +17,6 @@
 
 env {
   job.mode = "BATCH"
-  checkpoint.interval = 5000
 }
 
 source {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 12168921d8..1f01687a1a 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -236,7 +236,10 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
     }
 
     private void setCheckpoint() {
-
+        if (jobMode == JobMode.BATCH) {
+            log.warn(
+                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
+        }
         long interval = 0;
         if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
             interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
 
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
index bc2114443f..2ca5c56c19 100644
--- 
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
+++ 
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
index cfdf7691d0..f70cc87dce 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
@@ -18,7 +18,6 @@
 env {
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 1000
 }
 
 source {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index cd1b32c14e..75802d482c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index 144f11da9b..75e2979b2e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -18,7 +18,6 @@
 env {
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 1000
 }
 
 source {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
index e1d9fed6f8..8fd4eab745 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
index 59d2efc862..563ee34165 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -21,7 +21,6 @@
 env {
   execution.parallelism = 1
   job.mode = "BATCH"
-  checkpoint.interval = 5000
   #spark config
   spark.app.name = "SeaTunnel"
   spark.executor.instances = 1
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index c72fad44ee..9a0b353785 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -154,4 +154,9 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
     public String getServerLogs() {
         return jobManager.getLogs();
     }
+
+    public String executeJobManagerInnerCommand(String command)
+            throws IOException, InterruptedException {
+        return jobManager.execInContainer("bash", "-c", command).getStdout();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 20a2e612a6..973fe6434c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -52,6 +52,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>imap-storage-file</artifactId>
@@ -71,6 +77,30 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-13-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-15-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-spark-2-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-spark-3-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
new file mode 100644
index 0000000000..398ecdf0b9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import 
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+public class CheckpointEnableIT extends TestSuiteBase {
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
+    public void testZetaBatchCheckpointEnable(TestContainer container)
+            throws IOException, InterruptedException {
+        // checkpoint disable, log don't contains 'checkpoint is disabled'
+        Container.ExecResult disableExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf");
+        Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is disabled"));
+        Assertions.assertEquals(0, disableExecResult.getExitCode());
+        // check sink file is right
+        Container.ExecResult disableSinkFileExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf");
+        Assertions.assertEquals(0, disableSinkFileExecResult.getExitCode());
+
+        // checkpoint enable, log contains 'checkpoint is enabled'
+        Container.ExecResult enableExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+        Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
+        Assertions.assertEquals(0, enableExecResult.getExitCode());
+        // check sink file is right
+        Container.ExecResult enableSinkFileExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf");
+        Assertions.assertEquals(0, enableSinkFileExecResult.getExitCode());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
+    public void testZetaStreamingCheckpointInterval(TestContainer container)
+            throws IOException, InterruptedException {
+        // start job
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return container.executeJob(
+                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // wait obtain job id
+        AtomicReference<String> jobId = new AtomicReference<>();
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Pattern jobIdPattern =
+                                    Pattern.compile(
+                                            ".*Init JobMaster for Job 
SeaTunnel_Job \\(([0-9]*)\\).*",
+                                            Pattern.DOTALL);
+                            Matcher matcher = 
jobIdPattern.matcher(container.getServerLogs());
+                            if (matcher.matches()) {
+                                jobId.set(matcher.group(1));
+                            }
+                            Assertions.assertNotNull(jobId.get());
+                        });
+
+        Thread.sleep(15000);
+        Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
+        Assertions.assertEquals(0, 
container.savepointJob(jobId.get()).getExitCode());
+
+        // restore job
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return container
+                                .restoreJob(
+                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+                                        jobId.get())
+                                .getExitCode();
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // check sink file is right
+        AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
+        await().atMost(300000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Container.ExecResult disableSinkFileExecResult =
+                                    container.executeJob(
+                                            
"/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
+                            checkSinkFile.set(0 == 
disableSinkFileExecResult.getExitCode());
+                            Assertions.assertEquals(0, 
disableSinkFileExecResult.getExitCode());
+                        });
+        Assertions.assertTrue(checkSinkFile.get());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason =
+                    "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
+    public void testZetaStreamingCheckpointNoInterval(TestContainer container)
+            throws IOException, InterruptedException {
+        // start job
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return container.executeJob(
+                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // wait obtain job id
+        AtomicReference<String> jobId = new AtomicReference<>();
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Pattern jobIdPattern =
+                                    Pattern.compile(
+                                            ".*Init JobMaster for Job 
SeaTunnel_Job \\(([0-9]*)\\).*",
+                                            Pattern.DOTALL);
+                            Matcher matcher = 
jobIdPattern.matcher(container.getServerLogs());
+                            if (matcher.matches()) {
+                                jobId.set(matcher.group(1));
+                            }
+                            Assertions.assertNotNull(jobId.get());
+                        });
+
+        Thread.sleep(15000);
+        Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
+        Assertions.assertEquals(0, 
container.savepointJob(jobId.get()).getExitCode());
+
+        // restore job
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        return container
+                                .restoreJob(
+                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
+                                        jobId.get())
+                                .getExitCode();
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        // check sink file is right
+        AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
+        await().atMost(300000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Container.ExecResult disableSinkFileExecResult =
+                                    container.executeJob(
+                                            
"/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
+                            checkSinkFile.set(0 == 
disableSinkFileExecResult.getExitCode());
+                            Assertions.assertEquals(0, 
disableSinkFileExecResult.getExitCode());
+                        });
+        Assertions.assertTrue(checkSinkFile.get());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SEATUNNEL, EngineType.SPARK},
+            disabledReason =
+                    "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
+    public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
+            throws IOException, InterruptedException {
+        /**
+         * In flink execution environment, checkpoint is not supported and not 
needed when executing
+         * jobs in BATCH mode. So it is only necessary to determine whether 
flink has enabled
+         * checkpoint by configuring tasks with 'checkpoint.interval'.
+         */
+        Container.ExecResult enableExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+        // obtain flink job configuration
+        Matcher matcher =
+                
Pattern.compile("JobID\\s([a-fA-F0-9]+)").matcher(enableExecResult.getStdout());
+        Assertions.assertTrue(matcher.find());
+        String jobId = matcher.group(1);
+        Map<String, Object> jobConfig =
+                JsonUtils.toMap(
+                        container.executeJobManagerInnerCommand(
+                                String.format(
+                                        "curl 
http://localhost:8081/jobs/%s/checkpoints/config";,
+                                        jobId)),
+                        String.class,
+                        Object.class);
+        /**
+         * when the checkpoint interval is 0x7fffffffffffffff, indicates that 
checkpoint is
+         * disabled. reference {@link
+         * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
+         */
+        Assertions.assertEquals(Long.MAX_VALUE, 
jobConfig.getOrDefault("interval", 0L));
+        Assertions.assertEquals(0, enableExecResult.getExitCode());
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SEATUNNEL, EngineType.FLINK},
+            disabledReason =
+                    "depending on the engine, the logic for determining 
whether a checkpoint is enabled is different")
+    public void testSparkCheckpointEnable(TestContainer container)
+            throws IOException, InterruptedException {
+        /**
+         * In spark execution environment, checkpoint is not supported and not 
needed when executing
+         * jobs in BATCH mode. So it is only necessary to determine whether 
spark has enabled
+         * checkpoint by configuring tasks with 'checkpoint.interval'.
+         */
+        Container.ExecResult enableExecResult =
+                container.executeJob(
+                        
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+        // according to logs, if checkpoint.interval is configured, spark also 
ignores this
+        // configuration
+        Assertions.assertTrue(
+                enableExecResult
+                        .getStderr()
+                        .contains("Ignoring non-Spark config property: 
checkpoint.interval"));
+        Assertions.assertEquals(0, enableExecResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index bc61e06312..10efe3a5e9 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -21,7 +21,6 @@
 env {
   # You can set flink configuration here
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index f6fda45b1f..48173bb786 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -21,7 +21,6 @@
 env {
   # You can set flink configuration here
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
similarity index 72%
copy from 
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
index 181a9fc1ad..80bd1ba374 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
@@ -15,48 +15,44 @@
 # limitations under the License.
 #
 ######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
+###### This config file is a demonstration of disabled checkpoint in batch mode
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.name = "DISABLE_CHECKPOINT"
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   FakeSource {
     result_table_name = "fake"
+    row.num = 100
+    split.num = 5
+    split.read-interval = 3000
     schema = {
       fields {
+        id = "int"
         name = "string"
         age = "int"
       }
     }
-    parallelism = 3
   }
 }
-
 transform {
 }
-
 sink {
   LocalFile {
-    path = "/tmp/hive/warehouse/test2"
-    field_delimiter = "\t"
+    path = 
"/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile/"
     row_delimiter = "\n"
-    partition_by = ["age"]
-    partition_dir_expression = "${k0}=${v0}"
-    is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
     file_format_type = "text"
-    sink_columns = ["name", "age"]
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
-    save_mode = "error"
-
   }
 }
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
similarity index 65%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..a30a538425 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
 #
 
 env {
+  job.name = "DISABLE_CHECKPOINT_ASSERT"
   job.mode = "BATCH"
-  checkpoint.interval = 5000
 }
 
 source {
-  InMemory {
+  LocalFile {
+    path = 
"/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile"
+    file_format_type = "text"
+    schema = {
+      fields {
+        c_string = string
+      }
+    }
     result_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    table-names = ["st.public.table1", "st.public.table2"]
-    parallelism = 3
   }
 }
 
-transform {
-}
-
 sink {
-  InMemory {
-    source_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    address = "localhost"
-    port = 1234
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 100
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
+    }
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
similarity index 72%
copy from 
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
index 181a9fc1ad..b3b5afec9e 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
@@ -15,48 +15,45 @@
 # limitations under the License.
 #
 ######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
+###### This config file is a demonstration of enabled checkpoint in batch mode
 ######
 
 env {
-  # You can set flink configuration here
   execution.parallelism = 1
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.name = "ENABLE_CHECKPOINT"
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  checkpoint.interval = 1000
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   FakeSource {
     result_table_name = "fake"
+    row.num = 100
+    split.num = 5
+    split.read-interval = 3000
     schema = {
       fields {
+        id = "int"
         name = "string"
         age = "int"
       }
     }
-    parallelism = 3
   }
 }
-
 transform {
 }
-
 sink {
   LocalFile {
-    path = "/tmp/hive/warehouse/test2"
-    field_delimiter = "\t"
+    path = 
"/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile/"
     row_delimiter = "\n"
-    partition_by = ["age"]
-    partition_dir_expression = "${k0}=${v0}"
-    is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
     file_format_type = "text"
-    sink_columns = ["name", "age"]
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
-    save_mode = "error"
-
   }
 }
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
similarity index 65%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..a951c1e002 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
 #
 
 env {
+  job.name = "ENABLE_CHECKPOINT_ASSERT"
   job.mode = "BATCH"
-  checkpoint.interval = 5000
 }
 
 source {
-  InMemory {
+  LocalFile {
+    path = 
"/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile"
+    file_format_type = "text"
+    schema = {
+      fields {
+        c_string = string
+      }
+    }
     result_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    table-names = ["st.public.table1", "st.public.table2"]
-    parallelism = 3
   }
 }
 
-transform {
-}
-
 sink {
-  InMemory {
-    source_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    address = "localhost"
-    port = 1234
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 100
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
+    }
   }
 }
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
similarity index 66%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..65883ccd68 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
 #
 
 env {
+  job.name = "STREAM_JOB_ASSERT"
   job.mode = "BATCH"
-  checkpoint.interval = 5000
 }
 
 source {
-  InMemory {
+  LocalFile {
+    path = 
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile"
+    file_format_type = "text"
+    schema = {
+      fields {
+        c_string = string
+      }
+    }
     result_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    table-names = ["st.public.table1", "st.public.table2"]
-    parallelism = 3
   }
 }
 
-transform {
-}
-
 sink {
-  InMemory {
-    source_table_name = "fake"
-    username = "st"
-    password = "stpassword"
-    address = "localhost"
-    port = 1234
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 100
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
+    }
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
similarity index 78%
copy from 
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
index 181a9fc1ad..fa278ae2e0 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
@@ -21,22 +21,23 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  job.mode = "STREAMING"
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   FakeSource {
     result_table_name = "fake"
+    row.num = 100
+    split.num = 5
+    split.read-interval = 3000
     schema = {
       fields {
+        id = "int"
         name = "string"
         age = "int"
       }
     }
-    parallelism = 3
   }
 }
 
@@ -45,18 +46,11 @@ transform {
 
 sink {
   LocalFile {
-    path = "/tmp/hive/warehouse/test2"
-    field_delimiter = "\t"
+    path = 
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/"
     row_delimiter = "\n"
-    partition_by = ["age"]
-    partition_dir_expression = "${k0}=${v0}"
-    is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
     file_format_type = "text"
-    sink_columns = ["name", "age"]
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
-    save_mode = "error"
-
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
similarity index 78%
copy from 
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
index 181a9fc1ad..288b369e4c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
@@ -21,22 +21,24 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  job.mode = "STREAMING"
+  checkpoint.interval = 3000
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   FakeSource {
     result_table_name = "fake"
+    row.num = 100
+    split.num = 5
+    split.read-interval = 3000
     schema = {
       fields {
+        id = "int"
         name = "string"
         age = "int"
       }
     }
-    parallelism = 3
   }
 }
 
@@ -45,18 +47,11 @@ transform {
 
 sink {
   LocalFile {
-    path = "/tmp/hive/warehouse/test2"
-    field_delimiter = "\t"
+    path = 
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/"
     row_delimiter = "\n"
-    partition_by = ["age"]
-    partition_dir_expression = "${k0}=${v0}"
-    is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
     file_format_type = "text"
-    sink_columns = ["name", "age"]
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
-    save_mode = "error"
-
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
index 051f67d12d..bfcd94a55a 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
@@ -25,6 +25,10 @@ logger.zeta.level=WARN
 logger.zetaMaster.name=org.apache.seatunnel.engine.server.master
 logger.zetaMaster.level=INFO
 
+# For print checkpoint info
+logger.checkpoint.name=org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator
+logger.checkpoint.level=INFO
+
 logger.debezium.name=io.debezium.connector
 logger.debezium.level=WARN
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
index 181a9fc1ad..ae9e400fc5 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index 3a44886274..2ecc1d36dd 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
index 7ff3c21f78..254737fcfc 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index a4404b9f91..2b44e9c757 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 78add9c883..76fb99f4d8 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -35,6 +35,8 @@ public class CheckpointConfig implements Serializable {
 
     private CheckpointStorageConfig storage = 
ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
 
+    private boolean checkpointEnable = true;
+
     public void setCheckpointInterval(long checkpointInterval) {
         checkArgument(
                 checkpointInterval >= MINIMAL_CHECKPOINT_TIME,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 60459eaff5..09bf416f6a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -327,7 +327,13 @@ public class CheckpointCoordinator {
         InvocationFuture<?>[] futures = notifyTaskStart();
         CompletableFuture.allOf(futures).join();
         notifyCompleted(latestCompletedCheckpoint);
-        
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+        if (coordinatorConfig.isCheckpointEnable()) {
+            LOG.info("checkpoint is enabled, start schedule trigger pending 
checkpoint.");
+            
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+        } else {
+            LOG.info(
+                    "checkpoint is disabled, because in batch mode and 
'checkpoint.interval' of env is missing.");
+        }
     }
 
     private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6c527732d6..9bd40804fb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
@@ -85,6 +86,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static org.apache.seatunnel.common.constants.JobMode.BATCH;
 
 public class JobMaster {
     private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
@@ -188,8 +190,7 @@ public class JobMaster {
                 
nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
         jobCheckpointConfig =
                 createJobCheckpointConfig(
-                        engineConfig.getCheckpointConfig(),
-                        
jobImmutableInformation.getJobConfig().getEnvOptions());
+                        engineConfig.getCheckpointConfig(), 
jobImmutableInformation.getJobConfig());
 
         LOGGER.info(
                 String.format(
@@ -257,7 +258,8 @@ public class JobMaster {
     // TODO replace it after ReadableConfig Support parse yaml format, then 
use only one config to
     // read engine and env config.
     private CheckpointConfig createJobCheckpointConfig(
-            CheckpointConfig defaultCheckpointConfig, Map<String, Object> 
jobEnv) {
+            CheckpointConfig defaultCheckpointConfig, JobConfig jobConfig) {
+        Map<String, Object> jobEnv = jobConfig.getEnvOptions();
         CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
         
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
         
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
@@ -274,6 +276,10 @@ public class JobMaster {
             jobCheckpointConfig.setCheckpointInterval(
                     Long.parseLong(
                             
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
+        } else if (jobConfig.getJobContext().getJobMode() == BATCH) {
+            LOGGER.info(
+                    "in batch mode, the 'checkpoint.interval' configuration of 
env is missing, so checkpoint will be disabled");
+            jobCheckpointConfig.setCheckpointEnable(false);
         }
         if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
             jobCheckpointConfig.setCheckpointTimeout(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
index 24339945e7..382ba3db75 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
index e3e0e00d9b..30c0242137 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -22,7 +22,6 @@ env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
index 9e04be8bec..c59738b46f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
index 7edeb4ef71..68f70e5178 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
diff --git 
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
index f6482a5475..aa0d49c034 100644
--- 
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
@@ -22,7 +22,6 @@ env {
   # You can set engine configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  checkpoint.interval = 5000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 

Reply via email to