dybyte commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2277321084


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1092,57 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-runner");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);
+
+                            if (pipelineLocation != null) {
+                                JobMaster jobMaster = 
getJobMaster(pipelineLocation.getJobId());
+                                if (jobMaster != null) {
+                                    jobMaster.removeMetricsContext(
+                                            pipelineLocation,
+                                            (PipelineStatus)
+                                                    
runningJobStateIMap.get(pipelineLocation));
+                                } else {
+                                    
retryRemoveMetricsContext(pipelineLocation);
+                                }
+                            }
+
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            logger.info("Metrics cleanup worker interrupted, 
exiting...");
+                        } catch (Exception e) {
+                            logger.warning(
+                                    String.format(
+                                            "Metrics cleanup retry failed: 
%s", e.getMessage()));
+                        }
+                    }
+                };
+
+        executorService.submit(cleanupTask);
+    }
+
+    private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+        try {
+            metricsImap.compute(
+                    Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+                    (key, centralMap) -> {
+                        
MetricsCleanupUtils.removeMetricsEntries(pipelineLocation, centralMap);
+                        return centralMap;
+                    });
+            logger.info(
+                    String.format(
+                            "Metrics cleanup via compute for pipeline: %s", 
pipelineLocation));
+        } catch (Exception e) {

Review Comment:
   I've set both the retry queue capacity and the batch size to 100. This 
should be sufficient to avoid issues in most cases.
   Additionally, these values are exposed as configurable options, so they can 
be adjusted if different environments require larger or smaller capacities.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to