dybyte commented on code in PR #9833:
URL: https://github.com/apache/seatunnel/pull/9833#discussion_r2334309508
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -343,49 +343,79 @@ private void printExecutionInfo() {
}
}
- public synchronized void updateMetrics(Map<TaskLocation,
SeaTunnelMetricsContext> localMap) {
+ public void updateMetrics(Map<TaskLocation, SeaTunnelMetricsContext>
localMap) {
if (localMap == null || localMap.isEmpty()) {
return;
}
- IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
-
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+ int partitionCount =
seaTunnelConfig.getEngineConfig().getJobMetricsPartitionCount();
- HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
- metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
+
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- if (centralMap == null) {
- centralMap = new HashMap<>();
- }
- centralMap.putAll(localMap);
- metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+ Map<Long, Map<TaskLocation, SeaTunnelMetricsContext>> partitioned =
new HashMap<>();
+ localMap.forEach(
+ (key, value) -> {
+ long partition = getMetricsImapPartition(key,
partitionCount);
+ partitioned.computeIfAbsent(partition, k -> new
HashMap<>()).put(key, value);
+ });
+
+ partitioned
+ .entrySet()
+ .parallelStream()
+ .forEach(
+ entry -> {
+ metricsImap.compute(
+ entry.getKey(),
+ (k, oldVal) -> {
+ if (oldVal == null) oldVal = new
HashMap<>();
+ oldVal.putAll(entry.getValue());
+ return oldVal;
+ });
+ });
}
- public synchronized void removeMetrics(PipelineLocation pipelineLocation) {
- IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+ public void removeMetrics(PipelineLocation pipelineLocation) {
+ IMap<Long, Map<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
- HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
- metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
- if (centralMap == null) {
- return;
+ Map<Long, List<TaskLocation>> partitionedTasks = new HashMap<>();
+ for (Map.Entry<Long, Map<TaskLocation, SeaTunnelMetricsContext>> entry
:
+ metricsImap.entrySet()) {
+ long partition = entry.getKey();
+ List<TaskLocation> tasksToRemove =
+ entry.getValue().keySet().stream()
+ .filter(
+ t ->
+ t.getTaskGroupLocation()
+ .getPipelineLocation()
+ .equals(pipelineLocation))
+ .collect(Collectors.toList());
+ if (!tasksToRemove.isEmpty()) {
+ partitionedTasks.put(partition, tasksToRemove);
+ }
}
- List<TaskLocation> taskLocations = getTaskLocations(pipelineLocation,
centralMap);
- taskLocations.forEach(centralMap::remove);
- metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+ partitionedTasks
+ .entrySet()
+ .parallelStream()
+ .forEach(
Review Comment:
I intentionally separated the logic into two iterations.
The first pass filters tasks outside of `metricsImap` to minimize the time
holding its internal resources,
and the second pass performs only the minimal compute operations inside the
`metricsImap`.
Since write operations are frequent, I believe this approach can help reduce
contention.
I’d be happy to adjust the implementation if you feel a single iteration
would be more appropriate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]