This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 670bba0c41 [Improve][Zeta] Clean checkpoint file when job 
FINISHED/CANCELED (#6938)
670bba0c41 is described below

commit 670bba0c412229743893fe3cc9a9cb28621df188
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Fri Jun 14 17:46:05 2024 +0800

    [Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (#6938)
---
 .../server/checkpoint/CheckpointManager.java       |  3 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  1 +
 .../server/checkpoint/CheckpointManagerTest.java   |  4 +-
 .../server/checkpoint/CheckpointStorageTest.java   | 64 ++++++++++++++++++++++
 .../batch_fakesource_to_file_with_checkpoint.conf  | 62 +++++++++++++++++++++
 .../stream_fake_to_console_with_checkpoint.conf    | 52 ++++++++++++++++++
 6 files changed, 181 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index a38703dee8..0142d8a6a4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -241,12 +241,11 @@ public class CheckpointManager {
      * Called by the JobMaster. <br>
      * Listen to the {@link JobStatus} of the {@link Job}.
      */
-    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+    public void clearCheckpointIfNeed(JobStatus jobStatus) {
         if ((jobStatus == JobStatus.FINISHED || jobStatus == 
JobStatus.CANCELED)
                 && !isSavePointEnd()) {
             checkpointStorage.deleteCheckpoint(jobId + "");
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 8f54402d80..1b7bf6bdad 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -518,6 +518,7 @@ public class JobMaster {
     }
 
     public void cleanJob() {
+        checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus());
         jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), 
getJobDAGInfo());
         jobHistoryService.storeFinishedJobState(this);
         removeJobIMap();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index c4b7b8af7d..ce5cca1780 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -41,7 +41,6 @@ import com.hazelcast.map.IMap;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
 import static 
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
@@ -94,8 +93,7 @@ public class CheckpointManagerTest extends 
AbstractSeaTunnelServerTest {
         Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
         checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
         Assertions.assertNull(checkpointIdMap.get(1));
-        CompletableFuture<Void> future = 
checkpointManager.shutdown(JobStatus.FINISHED);
-        future.join();
+        checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED);
         Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + 
"").isEmpty());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 2a334fe384..63e1277827 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -43,6 +43,11 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
 
     public static String STREAM_CONF_PATH = 
"stream_fake_to_console_biginterval.conf";
     public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
+    public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
+            "batch_fakesource_to_file_with_checkpoint.conf";
+
+    public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
+            "stream_fake_to_console_with_checkpoint.conf";
 
     @Override
     public SeaTunnelConfig loadSeaTunnelConfig() {
@@ -113,4 +118,63 @@ public class CheckpointStorageTest extends 
AbstractSeaTunnelServerTest {
                 checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
         Assertions.assertEquals(0, allCheckpoints.size());
     }
+
+    @Test
+    public void testBatchJobWithCheckpoint() throws CheckpointStorageException 
{
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        CheckpointStorage checkpointStorage =
+                FactoryUtil.discoverFactory(
+                                Thread.currentThread().getContextClassLoader(),
+                                CheckpointStorageFactory.class,
+                                checkpointConfig.getStorage().getStorage())
+                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+        List<PipelineState> allCheckpoints =
+                checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+        Assertions.assertEquals(0, allCheckpoints.size());
+    }
+
+    @Test
+    public void testStreamJobWithCancel() throws CheckpointStorageException, 
InterruptedException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        CheckpointStorage checkpointStorage =
+                FactoryUtil.discoverFactory(
+                                Thread.currentThread().getContextClassLoader(),
+                                CheckpointStorageFactory.class,
+                                checkpointConfig.getStorage().getStorage())
+                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.RUNNING));
+        // wait for checkpoint
+        Thread.sleep(10 * 1000);
+        server.getCoordinatorService().getJobMaster(jobId).cancelJob();
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.CANCELED));
+        List<PipelineState> allCheckpoints =
+                checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+        Assertions.assertEquals(0, allCheckpoints.size());
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
new file mode 100644
index 0000000000..721f89fe94
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 1000
+}
+
+source {
+    FakeSource {
+      row.num = 100
+      split.num = 5
+      split.read-interval = 3000
+      result_table_name = "fake"
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path="/tmp/hive/warehouse/test2"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format_type="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+    save_mode="error"
+
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
new file mode 100644
index 0000000000..de02ec9624
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
SeaTunnel config
+######
+
+env {
+  # You can set SeaTunnel environment configuration here
+  parallelism = 2
+  job.mode = "STREAMING"
+  checkpoint.interval = 1000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 2
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure SeaTunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+sink {
+  Console {
+  }
+
+  # If you would like to get more information about how to configure SeaTunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}

Reply via email to