Hisoka-X commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2269627977


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1095,69 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-worker");

Review Comment:
   ```suggestion
                       Thread.currentThread().setName("metrics-cleanup-runner");
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1095,69 @@ public ConnectorPackageService 
getConnectorPackageService() {
         }
         return connectorPackageService;
     }
+
+    private void startMetricsCleanupWorker() {
+        Runnable cleanupTask =
+                () -> {
+                    Thread.currentThread().setName("metrics-cleanup-worker");
+                    while (!Thread.currentThread().isInterrupted()
+                            && !metricsCleanupRetryQueue.isEmpty()) {
+                        try {
+                            PipelineLocation pipelineLocation =
+                                    metricsCleanupRetryQueue.poll(
+                                            cleanUpRetryInterval, 
TimeUnit.SECONDS);
+
+                            if (pipelineLocation != null) {
+                                JobMaster jobMaster = 
getJobMaster(pipelineLocation.getJobId());
+                                if (jobMaster != null) {
+                                    jobMaster.removeMetricsContext(
+                                            pipelineLocation,
+                                            (PipelineStatus)
+                                                    
runningJobStateIMap.get(pipelineLocation));
+                                } else {
+                                    
retryRemoveMetricsContext(pipelineLocation);
+                                }
+                            }
+
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            logger.info("Metrics cleanup worker interrupted, 
exiting...");
+                        } catch (Exception e) {
+                            logger.warning(
+                                    String.format(
+                                            "Metrics cleanup retry failed: 
%s", e.getMessage()));
+                        }
+                    }
+                };
+
+        executorService.submit(cleanupTask);
+    }
+
+    private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+        try {
+            metricsImap.compute(
+                    Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+                    (key, centralMap) -> {
+                        if (centralMap == null) {
+                            return null;
+                        }
+                        List<TaskLocation> collect =
+                                centralMap.keySet().stream()
+                                        .filter(
+                                                taskLocation ->
+                                                        taskLocation
+                                                                
.getTaskGroupLocation()
+                                                                
.getPipelineLocation()
+                                                                
.equals(pipelineLocation))
+                                        .collect(Collectors.toList());
+                        collect.forEach(centralMap::remove);
+                        return centralMap;
+                    });

Review Comment:
   Redundant code was found, please refactor.



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java:
##########
@@ -183,4 +183,22 @@ public void testJobFailedWillThrowException() throws 
IOException, InterruptedExc
                                 .contains(
                                         
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
     }
+
+    @Test
+    public void testCleanupPendingJobMasterMapAfterJobFailed()
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeJob(server, 
"/batch_slot_not_enough.conf");
+        Assertions.assertNotEquals(0, execResult.getExitCode());
+        String serverLogs = server.getLogs();
+        Assertions.assertTrue(serverLogs.contains("PendingJobMasterMap size 
after cleanup: 0"));
+    }
+
+    @Test
+    public void testCleanupCheckPointCoordinatorStatus() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = executeJob(server, 
"/batch_slot_enough.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        String serverLogs = server.getLogs();
+        Assertions.assertTrue(
+                serverLogs.contains("Successfully removed checkpoint 
coordinator state:"));
+    }

Review Comment:
   The current test case is very fragile. If we add new map keys later, the 
current test case will not detect the new memory leak, which will cause 
regression. We can directly access the IMAP object in the 
`CoordinatorServiceTest` test to check whether our data is deleted (when the 
job is cancelled or fininshed).



##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java:
##########
@@ -363,6 +363,14 @@ public static class MasterServerConfigOptions {
                         .defaultValue(Integer.MAX_VALUE)
                         .withDescription("The max thread num of coordinator 
service");
 
+        public static final Option<Integer> CLEANUP_RETRY_INTERVAL =
+                Options.key("cleanup-retry-interval")
+                        .intType()
+                        .defaultValue(10)

Review Comment:
   Let's set the default value to 60s.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -623,6 +629,18 @@ private void removeJobIMap() {
                                                 
runningJobStateTimestampsIMap.remove(
                                                         
task.getTaskGroupLocation());
                                             });
+
+                            String checkpointStateImapKey =
+                                    
CheckpointCoordinator.getCheckpointStateImapKey(
+                                            jobId, pipeline.getPipelineId());
+                            Object removedState =
+                                    
runningJobStateIMap.remove(checkpointStateImapKey);
+                            if (removedState != null) {
+                                LOGGER.info(
+                                        String.format(
+                                                "Successfully removed 
checkpoint coordinator state: %s",
+                                                checkpointStateImapKey));
+                            }

Review Comment:
   just like other remove call, do not print any log.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -623,6 +629,18 @@ private void removeJobIMap() {
                                                 
runningJobStateTimestampsIMap.remove(
                                                         
task.getTaskGroupLocation());
                                             });
+
+                            String checkpointStateImapKey =
+                                    
CheckpointCoordinator.getCheckpointStateImapKey(

Review Comment:
   we can reuse 
https://github.com/apache/seatunnel/pull/9696/files#diff-1abd8bd9a8f72a40ac2f8606a67279bdc7d2d638c0bc415639cea42c61b6a4d8R174



##########
docs/en/seatunnel-engine/hybrid-cluster-deployment.md:
##########
@@ -168,12 +168,18 @@ The corePoolSize of seatunnel coordinator job's executor 
cached thread pool
 
 The max job count can be executed at same time
 
+**cleanup-retry-interval**

Review Comment:
   Please update `separated-cluster-deployment.md` and zh docs too.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -287,6 +295,11 @@ private void pendingJobSchedule() throws 
InterruptedException {
             } else {
                 queueRemove(jobMaster);
                 completeFailJob(jobMaster);
+                pendingJobMasterMap.remove(jobId);
+                logger.info(
+                        String.format(
+                                "PendingJobMasterMap size after cleanup: %d",
+                                pendingJobMasterMap.size()));

Review Comment:
   don't need logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to