davidzollo commented on code in PR #10418:
URL: https://github.com/apache/seatunnel/pull/10418#discussion_r2853141291


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -446,6 +464,168 @@ private void initCoordinatorService() {
                                 
this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
     }
 
+    private void cleanupPendingPipelines() {
+        if (!isActive) {
+            return;
+        }
+        IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
+                this.pendingPipelineCleanupIMap;
+        if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
+            return;
+        }
+
+        try {
+            for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
+                    pendingCleanupIMap.entrySet()) {
+                processPendingPipelineCleanup(entry.getKey(), 
entry.getValue());
+            }
+        } catch (HazelcastInstanceNotActiveException e) {
+            logger.warning(
+                    String.format(
+                            "Skip pending pipeline cleanup: hazelcast not 
active: %s",
+                            ExceptionUtils.getMessage(e)));
+        } catch (Throwable t) {
+            logger.warning(
+                    String.format(
+                            "Unexpected exception in pending pipeline cleanup: 
%s",
+                            ExceptionUtils.getMessage(t)),
+                    t);
+        }
+    }
+
+    private void processPendingPipelineCleanup(
+            PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
+        if (pipelineLocation == null || record == null) {
+            return;
+        }
+        if (!shouldCleanup(record)) {
+            removePendingCleanupRecord(pipelineLocation, record);
+            return;
+        }
+
+        PipelineStatus currentStatus = 
getPipelineStatusFromIMap(pipelineLocation);
+        if (currentStatus != null && !currentStatus.isEndState()) {
+            return;
+        }
+
+        long now = System.currentTimeMillis();
+        PipelineCleanupRecord updated = copy(record);
+        updated.setLastAttemptTimeMillis(now);
+        updated.setAttemptCount(record.getAttemptCount() + 1);
+
+        if (!updated.isMetricsImapCleaned() && 
cleanupPipelineMetrics(pipelineLocation)) {
+            updated.setMetricsImapCleaned(true);
+        }
+
+        Map<TaskGroupLocation, Address> taskGroups = updated.getTaskGroups();
+        if (taskGroups != null && !taskGroups.isEmpty()) {
+            for (Map.Entry<TaskGroupLocation, Address> taskGroup : 
taskGroups.entrySet()) {
+                TaskGroupLocation taskGroupLocation = taskGroup.getKey();
+                if (updated.getCleanedTaskGroups() != null
+                        && 
updated.getCleanedTaskGroups().contains(taskGroupLocation)) {
+                    continue;
+                }
+                Address workerAddress = taskGroup.getValue();
+                if (workerAddress == null
+                        || 
nodeEngine.getClusterService().getMember(workerAddress) == null) {
+                    continue;

Review Comment:
   If a worker is offline (or workerAddress == null), CoordinatorService skips 
that task group (continue) and never marks it as cleaned.
   But PipelineCleanupRecord.isCleaned() requires cleanedTaskGroups to contain 
all taskGroups 
   ```
       public boolean isCleaned() {
           return metricsImapCleaned
                   && taskGroups != null
                   && cleanedTaskGroups != null
                   && cleanedTaskGroups.containsAll(taskGroups.keySet());
       }
   ```
   so the pending cleanup record may never be removed and can accumulate in 
IMAP_PENDING_PIPELINE_CLEANUP.
   
   Suggestion: 
   When workerAddress == null or getMember(workerAddress) == null, treat that 
task group as “no longer needs cleanup” and add it to cleanedTaskGroups (or 
remove it from taskGroups) so the record can reach isCleaned() and be removed.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to