This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 4ec25f345f [Fix][Zeta] Fix release slot resource twice (#7236) 4ec25f345f is described below commit 4ec25f345f57debb1b48fcd96a723881161ff492 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Mon Jul 22 12:01:36 2024 +0800 [Fix][Zeta] Fix release slot resource twice (#7236) --- .../seatunnel/engine/e2e/JobClientJobProxyIT.java | 9 ++++++++ .../seatunnel/engine/server/master/JobMaster.java | 25 +++++++++++++++++----- .../CheckTaskGroupIsExecutingOperation.java | 3 ++- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index 3d871adb5a..e6966875e6 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -63,6 +63,15 @@ public class JobClientJobProxyIT extends SeaTunnelContainer { "Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf")); } + @Test + public void testNoDuplicatedReleaseSlot() throws IOException, InterruptedException { + Container.ExecResult execResult = + executeJob(server, "/savemode/fake_to_inmemory_savemode.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertFalse( + server.getLogs().contains("wrong target release operation with job")); + } + @Test public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException { Container.ExecResult execResult = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 29d8611f13..e9928a018a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -70,7 +70,6 @@ import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOp import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; -import com.google.common.collect.Lists; import com.hazelcast.cluster.Address; import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.flakeidgen.FlakeIdGenerator; @@ -92,6 +91,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -146,6 +147,8 @@ public class JobMaster { private Map<Integer, CheckpointPlan> checkpointPlanMap; + private final Map<Integer, List<SlotProfile>> releasedSlotWhenTaskGroupFinished; + private final IMap<Long, JobInfo> runningJobInfoIMap; private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap; @@ -190,6 +193,7 @@ public class JobMaster { this.engineConfig = engineConfig; this.metricsImap = metricsImap; this.seaTunnelServer = seaTunnelServer; + this.releasedSlotWhenTaskGroupFinished = new ConcurrentHashMap<>(); } public synchronized void init(long initializationTimestamp, boolean restart) throws Exception { @@ -464,13 +468,17 @@ public class JobMaster { jobImmutableInformation.getJobId(), Collections.singletonList(taskGroupSlotProfile)) .join(); - + releasedSlotWhenTaskGroupFinished + .computeIfAbsent( + pipelineLocation.getPipelineId(), + k -> new CopyOnWriteArrayList<>()) + .add(taskGroupSlotProfile); return null; }, new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true, - exception -> ExceptionUtil.isOperationNeedRetryException(exception), + ExceptionUtil::isOperationNeedRetryException, Constant.OPERATION_RETRY_SLEEP)); } catch (Exception e) { LOGGER.warning( @@ -487,6 +495,11 @@ public class JobMaster { if (taskGroupLocationSlotProfileMap == null) { return; } + List<SlotProfile> alreadyReleased = new ArrayList<>(); + if (releasedSlotWhenTaskGroupFinished.containsKey(subPlan.getPipelineId())) { + alreadyReleased.addAll( + releasedSlotWhenTaskGroupFinished.get(subPlan.getPipelineId())); + } RetryUtils.retryWithException( () -> { @@ -497,10 +510,12 @@ public class JobMaster { resourceManager .releaseResources( jobImmutableInformation.getJobId(), - Lists.newArrayList( - taskGroupLocationSlotProfileMap.values())) + taskGroupLocationSlotProfileMap.values().stream() + .filter(p -> !alreadyReleased.contains(p)) + .collect(Collectors.toList())) .join(); ownedSlotProfilesIMap.remove(subPlan.getPipelineLocation()); + releasedSlotWhenTaskGroupFinished.remove(subPlan.getPipelineId()); return null; }, new RetryUtils.RetryMaterial( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java index c43381b785..d4e158abdb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CheckTaskGroupIsExecutingOperation.java @@ -46,7 +46,8 @@ public class CheckTaskGroupIsExecutingOperation extends Operation SeaTunnelServer server = getService(); try { response = - server.getTaskExecutionService().getExecutionContext(taskGroupLocation) != null; + server.getTaskExecutionService().getActiveExecutionContext(taskGroupLocation) + != null; } catch (TaskGroupContextNotFoundException e) { response = false; }