Hisoka-X commented on code in PR #9696:
URL: https://github.com/apache/seatunnel/pull/9696#discussion_r2269627977
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1095,69 @@ public ConnectorPackageService
getConnectorPackageService() {
}
return connectorPackageService;
}
+
+ private void startMetricsCleanupWorker() {
+ Runnable cleanupTask =
+ () -> {
+ Thread.currentThread().setName("metrics-cleanup-worker");
Review Comment:
```suggestion
Thread.currentThread().setName("metrics-cleanup-runner");
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -1078,4 +1095,69 @@ public ConnectorPackageService
getConnectorPackageService() {
}
return connectorPackageService;
}
+
+ private void startMetricsCleanupWorker() {
+ Runnable cleanupTask =
+ () -> {
+ Thread.currentThread().setName("metrics-cleanup-worker");
+ while (!Thread.currentThread().isInterrupted()
+ && !metricsCleanupRetryQueue.isEmpty()) {
+ try {
+ PipelineLocation pipelineLocation =
+ metricsCleanupRetryQueue.poll(
+ cleanUpRetryInterval,
TimeUnit.SECONDS);
+
+ if (pipelineLocation != null) {
+ JobMaster jobMaster =
getJobMaster(pipelineLocation.getJobId());
+ if (jobMaster != null) {
+ jobMaster.removeMetricsContext(
+ pipelineLocation,
+ (PipelineStatus)
+
runningJobStateIMap.get(pipelineLocation));
+ } else {
+
retryRemoveMetricsContext(pipelineLocation);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.info("Metrics cleanup worker interrupted,
exiting...");
+ } catch (Exception e) {
+ logger.warning(
+ String.format(
+ "Metrics cleanup retry failed:
%s", e.getMessage()));
+ }
+ }
+ };
+
+ executorService.submit(cleanupTask);
+ }
+
+ private void retryRemoveMetricsContext(PipelineLocation pipelineLocation) {
+ try {
+ metricsImap.compute(
+ Constant.IMAP_RUNNING_JOB_METRICS_KEY,
+ (key, centralMap) -> {
+ if (centralMap == null) {
+ return null;
+ }
+ List<TaskLocation> collect =
+ centralMap.keySet().stream()
+ .filter(
+ taskLocation ->
+ taskLocation
+
.getTaskGroupLocation()
+
.getPipelineLocation()
+
.equals(pipelineLocation))
+ .collect(Collectors.toList());
+ collect.forEach(centralMap::remove);
+ return centralMap;
+ });
Review Comment:
Redundant code was found, please refactor.
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java:
##########
@@ -183,4 +183,22 @@ public void testJobFailedWillThrowException() throws
IOException, InterruptedExc
.contains(
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException"));
}
+
+ @Test
+ public void testCleanupPendingJobMasterMapAfterJobFailed()
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeJob(server,
"/batch_slot_not_enough.conf");
+ Assertions.assertNotEquals(0, execResult.getExitCode());
+ String serverLogs = server.getLogs();
+ Assertions.assertTrue(serverLogs.contains("PendingJobMasterMap size
after cleanup: 0"));
+ }
+
+ @Test
+ public void testCleanupCheckPointCoordinatorStatus() throws IOException,
InterruptedException {
+ Container.ExecResult execResult = executeJob(server,
"/batch_slot_enough.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ String serverLogs = server.getLogs();
+ Assertions.assertTrue(
+ serverLogs.contains("Successfully removed checkpoint
coordinator state:"));
+ }
Review Comment:
The current test case is very fragile. If we add new map keys later, the
current test case will not detect the new memory leak, which will cause
regression. We can directly access the IMAP object in the
`CoordinatorServiceTest` test to check whether our data is deleted (when the
job is cancelled or fininshed).
##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java:
##########
@@ -363,6 +363,14 @@ public static class MasterServerConfigOptions {
.defaultValue(Integer.MAX_VALUE)
.withDescription("The max thread num of coordinator
service");
+ public static final Option<Integer> CLEANUP_RETRY_INTERVAL =
+ Options.key("cleanup-retry-interval")
+ .intType()
+ .defaultValue(10)
Review Comment:
Let's set the default value to 60s.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -623,6 +629,18 @@ private void removeJobIMap() {
runningJobStateTimestampsIMap.remove(
task.getTaskGroupLocation());
});
+
+ String checkpointStateImapKey =
+
CheckpointCoordinator.getCheckpointStateImapKey(
+ jobId, pipeline.getPipelineId());
+ Object removedState =
+
runningJobStateIMap.remove(checkpointStateImapKey);
+ if (removedState != null) {
+ LOGGER.info(
+ String.format(
+ "Successfully removed
checkpoint coordinator state: %s",
+ checkpointStateImapKey));
+ }
Review Comment:
just like other remove call, do not print any log.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -623,6 +629,18 @@ private void removeJobIMap() {
runningJobStateTimestampsIMap.remove(
task.getTaskGroupLocation());
});
+
+ String checkpointStateImapKey =
+
CheckpointCoordinator.getCheckpointStateImapKey(
Review Comment:
we can reuse
https://github.com/apache/seatunnel/pull/9696/files#diff-1abd8bd9a8f72a40ac2f8606a67279bdc7d2d638c0bc415639cea42c61b6a4d8R174
##########
docs/en/seatunnel-engine/hybrid-cluster-deployment.md:
##########
@@ -168,12 +168,18 @@ The corePoolSize of seatunnel coordinator job's executor
cached thread pool
The max job count can be executed at same time
+**cleanup-retry-interval**
Review Comment:
Please update `separated-cluster-deployment.md` and zh docs too.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -287,6 +295,11 @@ private void pendingJobSchedule() throws
InterruptedException {
} else {
queueRemove(jobMaster);
completeFailJob(jobMaster);
+ pendingJobMasterMap.remove(jobId);
+ logger.info(
+ String.format(
+ "PendingJobMasterMap size after cleanup: %d",
+ pendingJobMasterMap.size()));
Review Comment:
don't need logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]