This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4ec25f345f [Fix][Zeta] Fix release slot resource twice (#7236)
4ec25f345f is described below

commit 4ec25f345f57debb1b48fcd96a723881161ff492
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Mon Jul 22 12:01:36 2024 +0800

    [Fix][Zeta] Fix release slot resource twice (#7236)
---
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java  |  9 ++++++++
 .../seatunnel/engine/server/master/JobMaster.java  | 25 +++++++++++++++++-----
 .../CheckTaskGroupIsExecutingOperation.java        |  3 ++-
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 3d871adb5a..e6966875e6 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -63,6 +63,15 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
                                 "Restore time 3, pipeline Job 
stream_fake_to_inmemory_with_error.conf"));
     }
 
+    @Test
+    public void testNoDuplicatedReleaseSlot() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult =
+                executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertFalse(
+                server.getLogs().contains("wrong target release operation with 
job"));
+    }
+
     @Test
     public void testMultiTableSinkFailedWithThrowable() throws IOException, 
InterruptedException {
         Container.ExecResult execResult =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 29d8611f13..e9928a018a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -70,7 +70,6 @@ import 
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOp
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
-import com.google.common.collect.Lists;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -92,6 +91,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -146,6 +147,8 @@ public class JobMaster {
 
     private Map<Integer, CheckpointPlan> checkpointPlanMap;
 
+    private final Map<Integer, List<SlotProfile>> 
releasedSlotWhenTaskGroupFinished;
+
     private final IMap<Long, JobInfo> runningJobInfoIMap;
 
     private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsImap;
@@ -190,6 +193,7 @@ public class JobMaster {
         this.engineConfig = engineConfig;
         this.metricsImap = metricsImap;
         this.seaTunnelServer = seaTunnelServer;
+        this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>();
     }
 
     public synchronized void init(long initializationTimestamp, boolean 
restart) throws Exception {
@@ -464,13 +468,17 @@ public class JobMaster {
                                         jobImmutableInformation.getJobId(),
                                         
Collections.singletonList(taskGroupSlotProfile))
                                 .join();
-
+                        releasedSlotWhenTaskGroupFinished
+                                .computeIfAbsent(
+                                        pipelineLocation.getPipelineId(),
+                                        k -> new CopyOnWriteArrayList<>())
+                                .add(taskGroupSlotProfile);
                         return null;
                     },
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
-                            exception -> 
ExceptionUtil.isOperationNeedRetryException(exception),
+                            ExceptionUtil::isOperationNeedRetryException,
                             Constant.OPERATION_RETRY_SLEEP));
         } catch (Exception e) {
             LOGGER.warning(
@@ -487,6 +495,11 @@ public class JobMaster {
             if (taskGroupLocationSlotProfileMap == null) {
                 return;
             }
+            List<SlotProfile> alreadyReleased = new ArrayList<>();
+            if 
(releasedSlotWhenTaskGroupFinished.containsKey(subPlan.getPipelineId())) {
+                alreadyReleased.addAll(
+                        
releasedSlotWhenTaskGroupFinished.get(subPlan.getPipelineId()));
+            }
 
             RetryUtils.retryWithException(
                     () -> {
@@ -497,10 +510,12 @@ public class JobMaster {
                         resourceManager
                                 .releaseResources(
                                         jobImmutableInformation.getJobId(),
-                                        Lists.newArrayList(
-                                                
taskGroupLocationSlotProfileMap.values()))
+                                        
taskGroupLocationSlotProfileMap.values().stream()
+                                                .filter(p -> 
!alreadyReleased.contains(p))
+                                                .collect(Collectors.toList()))
                                 .join();
                         
ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation());
+                        
releasedSlotWhenTaskGroupFinished.remove(subPlan.getPipelineId());
                         return null;
                     },
                     new RetryUtils.RetryMaterial(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
index c43381b785..d4e158abdb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java
@@ -46,7 +46,8 @@ public class CheckTaskGroupIsExecutingOperation extends 
Operation
         SeaTunnelServer server = getService();
         try {
             response =
-                    
server.getTaskExecutionService().getExecutionContext(taskGroupLocation) != null;
+                    
server.getTaskExecutionService().getActiveExecutionContext(taskGroupLocation)
+                            != null;
         } catch (TaskGroupContextNotFoundException e) {
             response = false;
         }

Reply via email to