Hisoka-X commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2271956698


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);

Review Comment:
   So it doesn't look likes interval, more like waiting time. This will cause 
the key that cannot be released normally to be put into the queue to be read 
out and reprocessed immediately because the interval is very short. The success 
rate of this method is not very high.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);
+
+                            if (pipelineLocation != null) {
+                                JobMaster jobMaster = 
getJobMaster(pipelineLocation.getJobId());
+                                if (jobMaster != null) {
+                                    jobMaster.removeMetricsContext(
+                                            pipelineLocation,
+                                            (PipelineStatus)
+                                                    
runningJobStateIMap.get(pipelineLocation));
+                                } else {
+                                    
retryRemoveMetricsContext(pipelineLocation);

Review Comment:
   we should lock again we remove it from imap.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);
+
+                            if (pipelineLocation != null) {
+                                JobMaster jobMaster = 
getJobMaster(pipelineLocation.getJobId());
+                                if (jobMaster != null) {
+                                    jobMaster.removeMetricsContext(
+                                            pipelineLocation,
+                                            (PipelineStatus)
+                                                    
runningJobStateIMap.get(pipelineLocation));
+                                } else {
+                                    
retryRemoveMetricsContext(pipelineLocation);
+                                }
+                            }
+
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            logger.info("Metrics cleanup worker interrupted, 
exiting...");
+                        } catch (Exception e) {
+                            logger.warning(
+                                    String.format(
+                                            "Metrics cleanup retry failed: 
%s", e.getMessage()));
+                        }
+                    }
+                };
+
+        executorService.submit(cleanupTask);
+    }
+
+    private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+        try {
+            metricsImap.compute(
+                    Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+                    (key, centralMap) -> {
+                        
MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, centralMap);
+                        return centralMap;
+                    });
+            logger.info(
+                    String.format(
+                            "Metrics cleanup via compute for pipeline: %s", 
pipelineLocation));
+        } catch (Exception e) {

Review Comment:
   if remove failed, I think we should reput it into queue?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -892,25 +906,18 @@ public void removeMetricsContext(
                                 Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, 
TimeUnit.SECONDS);
                 if (!lockedIMap) {
                     LOGGER.severe("lock imap failed in update metrics");
+                    boolean offer = 
metricsCleanupRetryQueue.offer(pipelineLocation);
+                    if (!offer) {
+                        LOGGER.warning("failed to add pipelineLocation to 
retry queue");
+                    }
                     return;
                 }
 
                 HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
                         metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
-                if (centralMap != null) {
-                    List<TaskLocation> collect =
-                            centralMap.keySet().stream()
-                                    .filter(
-                                            taskLocation -> {
-                                                return taskLocation
-                                                        .getTaskGroupLocation()
-                                                        .getPipelineLocation()
-                                                        
.equals(pipelineLocation);
-                                            })
-                                    .collect(Collectors.toList());
-                    collect.forEach(centralMap::remove);
-                    metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);
-                }
+                MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, 
centralMap);
+                metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 
centralMap);

Review Comment:
   You missed check of `centralMap != null` before put map into imap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to