zhangshenghang commented on code in PR #8233: URL: https://github.com/apache/seatunnel/pull/8233#discussion_r1875853162
########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java: ########## @@ -98,9 +127,70 @@ private void initWorker() { })) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); + + scheduledExecutorService.scheduleAtFixedRate( + () -> { + try { + log.debug( + "start send system load to resource manager, this address: " + + nodeEngine.getClusterService().getThisAddress()); + updateWorkerSystemLoad(); + } catch (Exception e) { + log.warn( + "failed send system load to resource manager, will retry later. this address: " + + nodeEngine.getClusterService().getThisAddress()); + } + }, + 0, + DEFAULT_SYSTEM_LOAD_PERIOD, + TimeUnit.MILLISECONDS); log.info("registerWorker: {}", registerWorker); } + private void updateWorkerSystemLoad() { + nodeEngine.getClusterService().getMembers().stream() + .map(Member::getAddress) + .forEach(this::collectAndUpdateSystemLoad); + } + + private void collectAndUpdateSystemLoad(Address node) { + sendToMember(new WorkerSystemLoadOperation(), node) + .thenAccept( + systemLoad -> { + if (Objects.isNull(systemLoad)) { + return; + } + + SystemLoad currentSystemLoad = + workerLoadMap.computeIfAbsent(node, k -> new SystemLoad()); + + updateSystemLoadMetrics(currentSystemLoad, (SystemLoad) systemLoad); + + log.debug( + "received system load from worker: {}, system load: {}", + node, + workerLoadMap.get(node)); + }); + } + + private void updateSystemLoadMetrics(SystemLoad currentSystemLoad, SystemLoad newSystemLoad) { + LinkedHashMap<String, SystemLoad.SystemLoadInfo> metrics = currentSystemLoad.getMetrics(); + + if (metrics == null) { + metrics = new LinkedHashMap<>(); + currentSystemLoad.setMetrics(metrics); + } + + // Keep up to 5 historical records + while (metrics.size() >= 5) { Review Comment: Already modified ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java: ########## @@ -222,7 +250,53 @@ public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) { slot -> slot.getResourceProfile() .enoughThan(r))) - .findAny(); + .collect(Collectors.toList()); + + Optional<WorkerProfile> workerProfile; + switch (allocateStrategy) { + case SYSTEM_LOAD: Review Comment: Already modified -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org