This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 670bba0c41 [Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (#6938) 670bba0c41 is described below commit 670bba0c412229743893fe3cc9a9cb28621df188 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Fri Jun 14 17:46:05 2024 +0800 [Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (#6938) --- .../server/checkpoint/CheckpointManager.java | 3 +- .../seatunnel/engine/server/master/JobMaster.java | 1 + .../server/checkpoint/CheckpointManagerTest.java | 4 +- .../server/checkpoint/CheckpointStorageTest.java | 64 ++++++++++++++++++++++ .../batch_fakesource_to_file_with_checkpoint.conf | 62 +++++++++++++++++++++ .../stream_fake_to_console_with_checkpoint.conf | 52 ++++++++++++++++++ 6 files changed, 181 insertions(+), 5 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index a38703dee8..0142d8a6a4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -241,12 +241,11 @@ public class CheckpointManager { * Called by the JobMaster. <br> * Listen to the {@link JobStatus} of the {@link Job}. */ - public CompletableFuture<Void> shutdown(JobStatus jobStatus) { + public void clearCheckpointIfNeed(JobStatus jobStatus) { if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED) && !isSavePointEnd()) { checkpointStorage.deleteCheckpoint(jobId + ""); } - return CompletableFuture.completedFuture(null); } /** diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 8f54402d80..1b7bf6bdad 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -518,6 +518,7 @@ public class JobMaster { } public void cleanJob() { + checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus()); jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo()); jobHistoryService.storeFinishedJobState(this); removeJobIMap(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java index c4b7b8af7d..ce5cca1780 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java @@ -41,7 +41,6 @@ import com.hazelcast.map.IMap; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID; import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE; @@ -94,8 +93,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest { Assertions.assertTrue(checkpointManager.isCompletedPipeline(1)); checkpointManager.listenPipeline(1, PipelineStatus.FINISHED); Assertions.assertNull(checkpointIdMap.get(1)); - CompletableFuture<Void> future = checkpointManager.shutdown(JobStatus.FINISHED); - future.join(); + checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED); Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty()); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java index 2a334fe384..63e1277827 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java @@ -43,6 +43,11 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest { public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf"; public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf"; + public static String BATCH_CONF_WITH_CHECKPOINT_PATH = + "batch_fakesource_to_file_with_checkpoint.conf"; + + public static String STREAM_CONF_WITH_CHECKPOINT_PATH = + "stream_fake_to_console_with_checkpoint.conf"; @Override public SeaTunnelConfig loadSeaTunnelConfig() { @@ -113,4 +118,63 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest { checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); Assertions.assertEquals(0, allCheckpoints.size()); } + + @Test + public void testBatchJobWithCheckpoint() throws CheckpointStorageException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.FINISHED)); + List<PipelineState> allCheckpoints = + checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(0, allCheckpoints.size()); + } + + @Test + public void testStreamJobWithCancel() throws CheckpointStorageException, InterruptedException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.RUNNING)); + // wait for checkpoint + Thread.sleep(10 * 1000); + server.getCoordinatorService().getJobMaster(jobId).cancelJob(); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.CANCELED)); + List<PipelineState> allCheckpoints = + checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(0, allCheckpoints.size()); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf new file mode 100644 index 0000000000..721f89fe94 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 1000 +} + +source { + FakeSource { + row.num = 100 + split.num = 5 + split.read-interval = 3000 + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + LocalFile { + path="/tmp/hive/warehouse/test2" + field_delimiter="\t" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format_type="text" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error" + + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf new file mode 100644 index 0000000000..de02ec9624 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in SeaTunnel config +###### + +env { + # You can set SeaTunnel environment configuration here + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 1000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 2 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +sink { + Console { + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +}