This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new fc914759ff [Feature][Zeta] In batch mode, checkpoint can be disabled (#5914) fc914759ff is described below commit fc914759ff938130a906d9fa0e814b78dbd44614 Author: happyboy1024 <137260654+happyboy1...@users.noreply.github.com> AuthorDate: Wed Nov 29 09:14:42 2023 +0800 [Feature][Zeta] In batch mode, checkpoint can be disabled (#5914) --------- Co-authored-by: dengjunjie <296442...@qq.com> --- docs/en/concept/JobEnvConfig.md | 2 + .../src/test/resources/conf/getCatalogTable.conf | 1 - .../flink/execution/FlinkRuntimeEnvironment.java | 5 +- .../test/resources/args/user_defined_params.conf | 1 - .../resources/jdbc_mysql_source_and_sink_xa.conf | 1 - .../resources/jdbc_hive_source_and_assert.conf | 1 - .../jdbc_postgres_source_and_sink_xa.conf | 1 - .../resources/neo4j/fake_to_neo4j_batch_write.conf | 1 - .../test/resources/batch_pulsar_to_console.conf | 1 - .../flink/AbstractTestFlinkContainer.java | 5 + .../connector-seatunnel-e2e-base/pom.xml | 30 +++ .../seatunnel/engine/e2e/CheckpointEnableIT.java | 279 +++++++++++++++++++++ .../test/resources/batch_fakesource_to_file.conf | 1 - .../batch_fakesource_to_file_complex.conf | 1 - ...fakesource_to_localfile_checkpoint_disable.conf | 26 +- .../sink_file_text_to_assert.conf | 37 +-- ..._fakesource_to_localfile_checkpoint_enable.conf | 27 +- .../sink_file_text_to_assert.conf | 37 +-- .../sink_file_text_to_assert.conf | 37 +-- .../stream_fakesource_to_localfile.conf | 18 +- .../stream_fakesource_to_localfile_interval.conf | 19 +- .../src/test/resources/log4j2.properties | 4 + .../test/resources/batch_fakesource_to_file.conf | 1 - .../batch_fakesource_to_file_complex.conf | 1 - .../resources/batch_fakesource_to_two_file.conf | 1 - .../src/test/resources/client_test.conf | 1 - .../common/config/server/CheckpointConfig.java | 2 + .../server/checkpoint/CheckpointCoordinator.java | 8 +- .../seatunnel/engine/server/master/JobMaster.java | 12 +- .../test/resources/batch_fakesource_to_file.conf | 1 - .../batch_fakesource_to_file_complex.conf | 1 - .../src/test/resources/fake_to_console.conf | 1 - .../resources/fake_to_console_job_metrics.conf | 1 - .../main/resources/examples/fake_to_console.conf | 1 - 34 files changed, 444 insertions(+), 122 deletions(-) diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md index 7272c90fcc..ce1716e3c0 100644 --- a/docs/en/concept/JobEnvConfig.md +++ b/docs/en/concept/JobEnvConfig.md @@ -18,6 +18,8 @@ You can configure whether the task is in batch mode or stream mode through `job. Gets the interval in which checkpoints are periodically scheduled. +In `STREAMING` mode, checkpoints is required, if you do not set it, it will be obtained from the application configuration file `seatunnel.yaml`. In `BATCH` mode, you can disable checkpoints by not setting this parameter. + ## parallelism This parameter configures the parallelism of source and sink. diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf index 485f026a0d..63cf0f6cf8 100644 --- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf +++ b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf @@ -17,7 +17,6 @@ env { job.mode = "BATCH" - checkpoint.interval = 5000 } source { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 12168921d8..1f01687a1a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -236,7 +236,10 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment { } private void setCheckpoint() { - + if (jobMode == JobMode.BATCH) { + log.warn( + "Disabled Checkpointing. In flink execution environment, checkpointing is not supported and not needed when executing jobs in BATCH mode"); + } long interval = 0; if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) { interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()); diff --git a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf index bc2114443f..2ca5c56c19 100644 --- a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf +++ b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf index cfdf7691d0..f70cc87dce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf @@ -18,7 +18,6 @@ env { execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 1000 } source { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf index cd1b32c14e..75802d482c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf index 144f11da9b..75e2979b2e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf @@ -18,7 +18,6 @@ env { execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 1000 } source { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf index e1d9fed6f8..8fd4eab745 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf index 59d2efc862..563ee34165 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf @@ -21,7 +21,6 @@ env { execution.parallelism = 1 job.mode = "BATCH" - checkpoint.interval = 5000 #spark config spark.app.name = "SeaTunnel" spark.executor.instances = 1 diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java index c72fad44ee..9a0b353785 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java @@ -154,4 +154,9 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { public String getServerLogs() { return jobManager.getLogs(); } + + public String executeJobManagerInnerCommand(String command) + throws IOException, InterruptedException { + return jobManager.execInContainer("bash", "-c", command).getStdout(); + } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml index 20a2e612a6..973fe6434c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml @@ -52,6 +52,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-assert</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>imap-storage-file</artifactId> @@ -71,6 +77,30 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-flink-13-starter</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-flink-15-starter</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-spark-2-starter</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-spark-3-starter</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java new file mode 100644 index 0000000000..398ecdf0b9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.awaitility.Awaitility.await; + +@Slf4j +public class CheckpointEnableIT extends TestSuiteBase { + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "depending on the engine, the logic for determining whether a checkpoint is enabled is different") + public void testZetaBatchCheckpointEnable(TestContainer container) + throws IOException, InterruptedException { + // checkpoint disable, log don't contains 'checkpoint is disabled' + Container.ExecResult disableExecResult = + container.executeJob( + "/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf"); + Assertions.assertTrue(container.getServerLogs().contains("checkpoint is disabled")); + Assertions.assertEquals(0, disableExecResult.getExitCode()); + // check sink file is right + Container.ExecResult disableSinkFileExecResult = + container.executeJob( + "/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf"); + Assertions.assertEquals(0, disableSinkFileExecResult.getExitCode()); + + // checkpoint enable, log contains 'checkpoint is enabled' + Container.ExecResult enableExecResult = + container.executeJob( + "/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf"); + Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); + Assertions.assertEquals(0, enableExecResult.getExitCode()); + // check sink file is right + Container.ExecResult enableSinkFileExecResult = + container.executeJob( + "/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf"); + Assertions.assertEquals(0, enableSinkFileExecResult.getExitCode()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "depending on the engine, the logic for determining whether a checkpoint is enabled is different") + public void testZetaStreamingCheckpointInterval(TestContainer container) + throws IOException, InterruptedException { + // start job + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob( + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + // wait obtain job id + AtomicReference<String> jobId = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Pattern jobIdPattern = + Pattern.compile( + ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + Pattern.DOTALL); + Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); + if (matcher.matches()) { + jobId.set(matcher.group(1)); + } + Assertions.assertNotNull(jobId.get()); + }); + + Thread.sleep(15000); + Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); + Assertions.assertEquals(0, container.savepointJob(jobId.get()).getExitCode()); + + // restore job + CompletableFuture.supplyAsync( + () -> { + try { + return container + .restoreJob( + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf", + jobId.get()) + .getExitCode(); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + // check sink file is right + AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Container.ExecResult disableSinkFileExecResult = + container.executeJob( + "/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf"); + checkSinkFile.set(0 == disableSinkFileExecResult.getExitCode()); + Assertions.assertEquals(0, disableSinkFileExecResult.getExitCode()); + }); + Assertions.assertTrue(checkSinkFile.get()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "depending on the engine, the logic for determining whether a checkpoint is enabled is different") + public void testZetaStreamingCheckpointNoInterval(TestContainer container) + throws IOException, InterruptedException { + // start job + CompletableFuture.supplyAsync( + () -> { + try { + return container.executeJob( + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + // wait obtain job id + AtomicReference<String> jobId = new AtomicReference<>(); + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Pattern jobIdPattern = + Pattern.compile( + ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + Pattern.DOTALL); + Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); + if (matcher.matches()) { + jobId.set(matcher.group(1)); + } + Assertions.assertNotNull(jobId.get()); + }); + + Thread.sleep(15000); + Assertions.assertTrue(container.getServerLogs().contains("checkpoint is enabled")); + Assertions.assertEquals(0, container.savepointJob(jobId.get()).getExitCode()); + + // restore job + CompletableFuture.supplyAsync( + () -> { + try { + return container + .restoreJob( + "/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf", + jobId.get()) + .getExitCode(); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + + // check sink file is right + AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false); + await().atMost(300000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Container.ExecResult disableSinkFileExecResult = + container.executeJob( + "/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf"); + checkSinkFile.set(0 == disableSinkFileExecResult.getExitCode()); + Assertions.assertEquals(0, disableSinkFileExecResult.getExitCode()); + }); + Assertions.assertTrue(checkSinkFile.get()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SEATUNNEL, EngineType.SPARK}, + disabledReason = + "depending on the engine, the logic for determining whether a checkpoint is enabled is different") + public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container) + throws IOException, InterruptedException { + /** + * In flink execution environment, checkpoint is not supported and not needed when executing + * jobs in BATCH mode. So it is only necessary to determine whether flink has enabled + * checkpoint by configuring tasks with 'checkpoint.interval'. + */ + Container.ExecResult enableExecResult = + container.executeJob( + "/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf"); + // obtain flink job configuration + Matcher matcher = + Pattern.compile("JobID\\s([a-fA-F0-9]+)").matcher(enableExecResult.getStdout()); + Assertions.assertTrue(matcher.find()); + String jobId = matcher.group(1); + Map<String, Object> jobConfig = + JsonUtils.toMap( + container.executeJobManagerInnerCommand( + String.format( + "curl http://localhost:8081/jobs/%s/checkpoints/config", + jobId)), + String.class, + Object.class); + /** + * when the checkpoint interval is 0x7fffffffffffffff, indicates that checkpoint is + * disabled. reference {@link + * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()} + */ + Assertions.assertEquals(Long.MAX_VALUE, jobConfig.getOrDefault("interval", 0L)); + Assertions.assertEquals(0, enableExecResult.getExitCode()); + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SEATUNNEL, EngineType.FLINK}, + disabledReason = + "depending on the engine, the logic for determining whether a checkpoint is enabled is different") + public void testSparkCheckpointEnable(TestContainer container) + throws IOException, InterruptedException { + /** + * In spark execution environment, checkpoint is not supported and not needed when executing + * jobs in BATCH mode. So it is only necessary to determine whether spark has enabled + * checkpoint by configuring tasks with 'checkpoint.interval'. + */ + Container.ExecResult enableExecResult = + container.executeJob( + "/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf"); + // according to logs, if checkpoint.interval is configured, spark also ignores this + // configuration + Assertions.assertTrue( + enableExecResult + .getStderr() + .contains("Ignoring non-Spark config property: checkpoint.interval")); + Assertions.assertEquals(0, enableExecResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf index bc61e06312..10efe3a5e9 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf @@ -21,7 +21,6 @@ env { # You can set flink configuration here job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf index f6fda45b1f..48173bb786 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf @@ -21,7 +21,6 @@ env { # You can set flink configuration here job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf similarity index 72% copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf index 181a9fc1ad..80bd1ba374 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf @@ -15,48 +15,44 @@ # limitations under the License. # ###### -###### This config file is a demonstration of streaming processing in seatunnel config +###### This config file is a demonstration of disabled checkpoint in batch mode ###### env { - # You can set flink configuration here execution.parallelism = 1 + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.name = "DISABLE_CHECKPOINT" job.mode = "BATCH" - execution.checkpoint.interval = 5000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 schema = { fields { + id = "int" name = "string" age = "int" } } - parallelism = 3 } } - transform { } - sink { LocalFile { - path = "/tmp/hive/warehouse/test2" - field_delimiter = "\t" + path = "/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile/" row_delimiter = "\n" - partition_by = ["age"] - partition_dir_expression = "${k0}=${v0}" - is_partition_field_write_in_file = true file_name_expression = "${transactionId}_${now}" file_format_type = "text" - sink_columns = ["name", "age"] filename_time_format = "yyyy.MM.dd" is_enable_transaction = true - save_mode = "error" - } } \ No newline at end of file diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf similarity index 65% copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf index 485f026a0d..a30a538425 100644 --- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf @@ -16,29 +16,36 @@ # env { + job.name = "DISABLE_CHECKPOINT_ASSERT" job.mode = "BATCH" - checkpoint.interval = 5000 } source { - InMemory { + LocalFile { + path = "/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile" + file_format_type = "text" + schema = { + fields { + c_string = string + } + } result_table_name = "fake" - username = "st" - password = "stpassword" - table-names = ["st.public.table1", "st.public.table2"] - parallelism = 3 } } -transform { -} - sink { - InMemory { - source_table_name = "fake" - username = "st" - password = "stpassword" - address = "localhost" - port = 1234 + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 100 + }, + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } } } \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf similarity index 72% copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf index 181a9fc1ad..b3b5afec9e 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf @@ -15,48 +15,45 @@ # limitations under the License. # ###### -###### This config file is a demonstration of streaming processing in seatunnel config +###### This config file is a demonstration of enabled checkpoint in batch mode ###### env { - # You can set flink configuration here execution.parallelism = 1 + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.name = "ENABLE_CHECKPOINT" job.mode = "BATCH" - execution.checkpoint.interval = 5000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + checkpoint.interval = 1000 } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 schema = { fields { + id = "int" name = "string" age = "int" } } - parallelism = 3 } } - transform { } - sink { LocalFile { - path = "/tmp/hive/warehouse/test2" - field_delimiter = "\t" + path = "/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile/" row_delimiter = "\n" - partition_by = ["age"] - partition_dir_expression = "${k0}=${v0}" - is_partition_field_write_in_file = true file_name_expression = "${transactionId}_${now}" file_format_type = "text" - sink_columns = ["name", "age"] filename_time_format = "yyyy.MM.dd" is_enable_transaction = true - save_mode = "error" - } } \ No newline at end of file diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf similarity index 65% copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf index 485f026a0d..a951c1e002 100644 --- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf @@ -16,29 +16,36 @@ # env { + job.name = "ENABLE_CHECKPOINT_ASSERT" job.mode = "BATCH" - checkpoint.interval = 5000 } source { - InMemory { + LocalFile { + path = "/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile" + file_format_type = "text" + schema = { + fields { + c_string = string + } + } result_table_name = "fake" - username = "st" - password = "stpassword" - table-names = ["st.public.table1", "st.public.table2"] - parallelism = 3 } } -transform { -} - sink { - InMemory { - source_table_name = "fake" - username = "st" - password = "stpassword" - address = "localhost" - port = 1234 + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 100 + }, + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } } } \ No newline at end of file diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf similarity index 66% copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf index 485f026a0d..65883ccd68 100644 --- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf @@ -16,29 +16,36 @@ # env { + job.name = "STREAM_JOB_ASSERT" job.mode = "BATCH" - checkpoint.interval = 5000 } source { - InMemory { + LocalFile { + path = "/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile" + file_format_type = "text" + schema = { + fields { + c_string = string + } + } result_table_name = "fake" - username = "st" - password = "stpassword" - table-names = ["st.public.table1", "st.public.table2"] - parallelism = 3 } } -transform { -} - sink { - InMemory { - source_table_name = "fake" - username = "st" - password = "stpassword" - address = "localhost" - port = 1234 + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 100 + }, + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } } } \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf similarity index 78% copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf index 181a9fc1ad..fa278ae2e0 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf @@ -21,22 +21,23 @@ env { # You can set flink configuration here execution.parallelism = 1 - job.mode = "BATCH" - execution.checkpoint.interval = 5000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + job.mode = "STREAMING" } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 schema = { fields { + id = "int" name = "string" age = "int" } } - parallelism = 3 } } @@ -45,18 +46,11 @@ transform { sink { LocalFile { - path = "/tmp/hive/warehouse/test2" - field_delimiter = "\t" + path = "/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/" row_delimiter = "\n" - partition_by = ["age"] - partition_dir_expression = "${k0}=${v0}" - is_partition_field_write_in_file = true file_name_expression = "${transactionId}_${now}" file_format_type = "text" - sink_columns = ["name", "age"] filename_time_format = "yyyy.MM.dd" is_enable_transaction = true - save_mode = "error" - } } \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf similarity index 78% copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf index 181a9fc1ad..288b369e4c 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf @@ -21,22 +21,24 @@ env { # You can set flink configuration here execution.parallelism = 1 - job.mode = "BATCH" - execution.checkpoint.interval = 5000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" + job.mode = "STREAMING" + checkpoint.interval = 3000 } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" + row.num = 100 + split.num = 5 + split.read-interval = 3000 schema = { fields { + id = "int" name = "string" age = "int" } } - parallelism = 3 } } @@ -45,18 +47,11 @@ transform { sink { LocalFile { - path = "/tmp/hive/warehouse/test2" - field_delimiter = "\t" + path = "/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/" row_delimiter = "\n" - partition_by = ["age"] - partition_dir_expression = "${k0}=${v0}" - is_partition_field_write_in_file = true file_name_expression = "${transactionId}_${now}" file_format_type = "text" - sink_columns = ["name", "age"] filename_time_format = "yyyy.MM.dd" is_enable_transaction = true - save_mode = "error" - } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties index 051f67d12d..bfcd94a55a 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties @@ -25,6 +25,10 @@ logger.zeta.level=WARN logger.zetaMaster.name=org.apache.seatunnel.engine.server.master logger.zetaMaster.level=INFO +# For print checkpoint info +logger.checkpoint.name=org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator +logger.checkpoint.level=INFO + logger.debezium.name=io.debezium.connector logger.debezium.level=WARN diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf index 181a9fc1ad..ae9e400fc5 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf index 3a44886274..2ecc1d36dd 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf index 7ff3c21f78..254737fcfc 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf index a4404b9f91..2b44e9c757 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java index 78add9c883..76fb99f4d8 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java @@ -35,6 +35,8 @@ public class CheckpointConfig implements Serializable { private CheckpointStorageConfig storage = ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue(); + private boolean checkpointEnable = true; + public void setCheckpointInterval(long checkpointInterval) { checkArgument( checkpointInterval >= MINIMAL_CHECKPOINT_TIME, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 60459eaff5..09bf416f6a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -327,7 +327,13 @@ public class CheckpointCoordinator { InvocationFuture<?>[] futures = notifyTaskStart(); CompletableFuture.allOf(futures).join(); notifyCompleted(latestCompletedCheckpoint); - scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); + if (coordinatorConfig.isCheckpointEnable()) { + LOG.info("checkpoint is enabled, start schedule trigger pending checkpoint."); + scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval()); + } else { + LOG.info( + "checkpoint is disabled, because in batch mode and 'checkpoint.interval' of env is missing."); + } } private void notifyCompleted(CompletedCheckpoint completedCheckpoint) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 6c527732d6..9bd40804fb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; +import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; @@ -85,6 +86,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch; +import static org.apache.seatunnel.common.constants.JobMode.BATCH; public class JobMaster { private static final ILogger LOGGER = Logger.getLogger(JobMaster.class); @@ -188,8 +190,7 @@ public class JobMaster { nodeEngine.getSerializationService().toObject(jobImmutableInformationData); jobCheckpointConfig = createJobCheckpointConfig( - engineConfig.getCheckpointConfig(), - jobImmutableInformation.getJobConfig().getEnvOptions()); + engineConfig.getCheckpointConfig(), jobImmutableInformation.getJobConfig()); LOGGER.info( String.format( @@ -257,7 +258,8 @@ public class JobMaster { // TODO replace it after ReadableConfig Support parse yaml format, then use only one config to // read engine and env config. private CheckpointConfig createJobCheckpointConfig( - CheckpointConfig defaultCheckpointConfig, Map<String, Object> jobEnv) { + CheckpointConfig defaultCheckpointConfig, JobConfig jobConfig) { + Map<String, Object> jobEnv = jobConfig.getEnvOptions(); CheckpointConfig jobCheckpointConfig = new CheckpointConfig(); jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout()); jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval()); @@ -274,6 +276,10 @@ public class JobMaster { jobCheckpointConfig.setCheckpointInterval( Long.parseLong( jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString())); + } else if (jobConfig.getJobContext().getJobMode() == BATCH) { + LOGGER.info( + "in batch mode, the 'checkpoint.interval' configuration of env is missing, so checkpoint will be disabled"); + jobCheckpointConfig.setCheckpointEnable(false); } if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { jobCheckpointConfig.setCheckpointTimeout( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf index 24339945e7..382ba3db75 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf index e3e0e00d9b..30c0242137 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf @@ -22,7 +22,6 @@ env { # You can set flink configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf index 9e04be8bec..c59738b46f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf index 7edeb4ef71..68f70e5178 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - execution.checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf index f6482a5475..aa0d49c034 100644 --- a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf @@ -22,7 +22,6 @@ env { # You can set engine configuration here execution.parallelism = 1 job.mode = "BATCH" - checkpoint.interval = 5000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" }