Hisoka-X commented on code in PR #8233: URL: https://github.com/apache/seatunnel/pull/8233#discussion_r1877265332
########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/allocation/strategy/SlotAllocationStrategy.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.allocation.strategy; + +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import com.hazelcast.cluster.Address; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +// Java Review Comment: Please add some comment. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java: ########## @@ -206,14 +232,20 @@ private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember( })); } + public Double calculateWeight( Review Comment: we can move this method into `SystemLoadStrategy`. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java: ########## @@ -236,6 +288,31 @@ public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) { return workerProfile; } + /** + * Calculate the slot usage rate of the worker + * + * @param worker WorkerProfile + * @return slot usage rate, range 0.0-1.0 + */ + private double calculateSlotUsage(WorkerProfile worker) { Review Comment: this method useless now? ########## seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java: ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.utils; + +import org.apache.seatunnel.shade.com.google.common.collect.EvictingQueue; + +import org.apache.seatunnel.engine.common.config.EngineConfig; +import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions; +import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig; +import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager; +import org.apache.seatunnel.engine.server.resourcemanager.ResourceRequestHandler; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.hazelcast.cluster.Address; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +public class SystemLoadCalculateTest { + + private SystemLoadCalculate systemLoadCalculate; + + @BeforeEach + void setUp() { + systemLoadCalculate = new SystemLoadCalculate(); + } + + @Test + @DisplayName("Step0: A newly created LoadBalancer should return the highest priority of 1.0") + void newLoadBalancerShouldReturnMaxPriority() { + Assertions.assertEquals(1.0, systemLoadCalculate.calculateSchedulingPriority()); + } + + @Test + @DisplayName("Step1-3: Adding invalid utilization data should throw an exception") + void shouldThrowExceptionForInvalidUtilizationData() { + Assertions.assertAll( + () -> + assertThrows( + IllegalArgumentException.class, + () -> systemLoadCalculate.addUtilizationData(-0.1, 0.5)), + () -> + assertThrows( + IllegalArgumentException.class, + () -> systemLoadCalculate.addUtilizationData(0.5, 1.1)), + () -> + assertThrows( + IllegalArgumentException.class, + () -> systemLoadCalculate.addUtilizationData(1.1, 0.5))); + } + + @Test + @DisplayName("Step1-3: Test weight calculation for 3 records") + void shouldCalculateCorrectPriorityForThreeRecords() { + // Add 3 records + // Oldest record + systemLoadCalculate.addUtilizationData(0.5, 0.4); // CPU: 50%, Memory: 40% + systemLoadCalculate.addUtilizationData(0.7, 0.6); // CPU: 70%, Memory: 60% + // Newest record + systemLoadCalculate.addUtilizationData(0.6, 0.5); // CPU: 60%, Memory: 50% + + double priority = systemLoadCalculate.calculateSchedulingPriority(); + + // Manually calculate the expected result + // Weight distribution should be [4/8, 2/8, 2/8] + double expectedPriority = + // Newest record (1-0.6)*0.5 + (1-0.5)*0.5 * (4/8) + ((((1.0 - 0.6) * 0.5) + ((1.0 - 0.5) * 0.5)) * (4.0 / 8.0)) + + + // Second record (1-0.7)*0.5 + (1-0.6)*0.5 * (2/8) + ((((1.0 - 0.7) * 0.5) + ((1.0 - 0.6) * 0.5)) * (2.0 / 8.0)) + + + // Oldest record (1-0.5)*0.5 + (1-0.4)*0.5 * (2/8) + ((((1.0 - 0.5) * 0.5) + ((1.0 - 0.4) * 0.5)) * (2.0 / 8.0)); + + Assertions.assertEquals(expectedPriority, priority); + } + + @Test + @DisplayName("Step1-3: Test weight calculation for 5 records") + void shouldCalculateCorrectPriorityForFiveRecords() { + // Add 5 records, from oldest to newest + systemLoadCalculate.addUtilizationData(0.3, 0.2); + systemLoadCalculate.addUtilizationData(0.4, 0.3); + systemLoadCalculate.addUtilizationData(0.5, 0.4); + systemLoadCalculate.addUtilizationData(0.7, 0.6); + systemLoadCalculate.addUtilizationData(0.6, 0.5); + + double priority = systemLoadCalculate.calculateSchedulingPriority(); + + // Manually calculate the expected result + // Weight distribution should be [4/10, 2/10, 2/10, 1/10, 1/10] + double expectedPriority = + // Newest record: (1-0.6)*0.5 + (1-0.5)*0.5 * (4/10) + ((((1.0 - 0.6) * 0.5) + ((1.0 - 0.5) * 0.5)) * (4.0 / 10.0)) + + + // Second record: (1-0.7)*0.5 + (1-0.6)*0.5 * (2/10) + ((((1.0 - 0.7) * 0.5) + ((1.0 - 0.6) * 0.5)) * (2.0 / 10.0)) + + + // Third record: (1-0.5)*0.5 + (1-0.4)*0.5 * (2/10) + ((((1.0 - 0.5) * 0.5) + ((1.0 - 0.4) * 0.5)) * (2.0 / 10.0)) + + + // Fourth record: (1-0.4)*0.5 + (1-0.3)*0.5 * (1/10) + ((((1.0 - 0.4) * 0.5) + ((1.0 - 0.3) * 0.5)) * (1.0 / 10.0)) + + + // Oldest record: (1-0.3)*0.5 + (1-0.2)*0.5 * (1/10) + ((((1.0 - 0.3) * 0.5) + ((1.0 - 0.2) * 0.5)) * (1.0 / 10.0)); + + Assertions.assertEquals(expectedPriority, priority); + } + + @Test + @DisplayName( + "Step1-3: Detailed verification of adding 6 records (verifying the maximum window limit of 5)") + void detailedCalculationForSixRecords() { + SystemLoadCalculate systemLoadCalculate = new SystemLoadCalculate(); + + // Add 6 records in chronological order (from oldest to newest) + // The first record will be discarded because it exceeds the window limit of 5 + systemLoadCalculate.addUtilizationData(0.2, 0.1); // Oldest record (will be discarded) + systemLoadCalculate.addUtilizationData(0.3, 0.2); // Now the oldest record + systemLoadCalculate.addUtilizationData(0.4, 0.3); // Fourth record + systemLoadCalculate.addUtilizationData(0.5, 0.4); // Third record + systemLoadCalculate.addUtilizationData(0.7, 0.6); // Second record + systemLoadCalculate.addUtilizationData(0.6, 0.5); // Newest record + + double expectedPriority = + // Newest record: (1-0.6)*0.5 + (1-0.5)*0.5 * (4/10) + ((((1.0 - 0.6) * 0.5) + ((1.0 - 0.5) * 0.5)) * (4.0 / 10.0)) + + + // Second record: (1-0.7)*0.5 + (1-0.6)*0.5 * (2/10) + ((((1.0 - 0.7) * 0.5) + ((1.0 - 0.6) * 0.5)) * (2.0 / 10.0)) + + + // Third record: (1-0.5)*0.5 + (1-0.4)*0.5 * (2/10) + ((((1.0 - 0.5) * 0.5) + ((1.0 - 0.4) * 0.5)) * (2.0 / 10.0)) + + + // Fourth record: (1-0.4)*0.5 + (1-0.3)*0.5 * (1/10) + ((((1.0 - 0.4) * 0.5) + ((1.0 - 0.3) * 0.5)) * (1.0 / 10.0)) + + + // Oldest record: (1-0.3)*0.5 + (1-0.2)*0.5 * (1/10) + ((((1.0 - 0.3) * 0.5) + ((1.0 - 0.2) * 0.5)) * (1.0 / 10.0)); + + double actualPriority = systemLoadCalculate.calculateSchedulingPriority(); + + Assertions.assertEquals(expectedPriority, actualPriority); + } + + @Test + @DisplayName("Step4: Test calculateComprehensiveResourceAvailability method") + void testCalculateComprehensiveResourceAvailability() throws UnknownHostException { + // Assume that the overall resource idle rate is 0.8, and the Worker node has been + // continuously allocated 3 slots. This value is calculated based on the actual memory and + // CPU. + double comprehensiveResourceAvailability = 0.8; + + SystemLoadCalculate systemLoadCalculate = new SystemLoadCalculate(); + WorkerProfile workerProfile = Mockito.mock(WorkerProfile.class); + Address address = new Address("127.0.0.1", 5701); + when(workerProfile.getAddress()).thenReturn(address); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[5]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[3]); + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots = + new ConcurrentHashMap<>(); + + // Each task has a fixed slot resource + double singleSlotResource = + Math.round(((1 - comprehensiveResourceAvailability) / 5) * 100.0) / 100.0; + int times = 0; + + // When the worker has not been assigned, the overall resource idle rate remains unchanged + double result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + double expected = comprehensiveResourceAvailability - (singleSlotResource * times); + Assertions.assertEquals(expected, result, 0.01); + Assertions.assertEquals( + comprehensiveResourceAvailability - (singleSlotResource * times), result, 0.01); + Assertions.assertEquals(0.8, result, 0.01); + + // The worker has been assigned 1 slot + times = 1; + workerAssignedSlots.put(address, new ImmutableTriple<>(singleSlotResource, 1, 0)); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[6]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[2]); + result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + expected = comprehensiveResourceAvailability - (singleSlotResource * times); + Assertions.assertEquals(expected, result, 0.01); + Assertions.assertEquals( + comprehensiveResourceAvailability - (singleSlotResource * times), result, 0.01); + Assertions.assertEquals(0.76, result, 0.01); + + // The worker has been assigned 2 slots + times = 2; + workerAssignedSlots.put(address, new ImmutableTriple<>(singleSlotResource, 2, 0)); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[7]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[1]); + result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + expected = comprehensiveResourceAvailability - (singleSlotResource * times); + Assertions.assertEquals(expected, result, 0.01); + Assertions.assertEquals( + comprehensiveResourceAvailability - (singleSlotResource * times), result, 0.01); + Assertions.assertEquals(0.72, result, 0.01); + + // If there is no unassigned slot, it will not be executed. + + } + + @Test + @DisplayName("Step5: Test balanceFactor method") + void testBalanceFactor() { + WorkerProfile workerProfile = Mockito.mock(WorkerProfile.class); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[3]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[7]); + double balanceFactor = systemLoadCalculate.balanceFactor(workerProfile, 3); + Assertions.assertEquals(0.7, balanceFactor, 0.01); + } + + @Test + @DisplayName("All: Test the overall calculation logic") + void testLoadBalancer() throws UnknownHostException { + + // Verification plan 1: Split each step and verify whether the settlement indicators of each + // link are accurate + SystemLoadCalculate systemLoadCalculate = new SystemLoadCalculate(); + + // Add 6 records in chronological order (from oldest to newest) + // The first record will be discarded because it exceeds the window limit of 5 + systemLoadCalculate.addUtilizationData(0.2, 0.1); // Oldest record (will be discarded) + systemLoadCalculate.addUtilizationData(0.3, 0.2); // Now the oldest record + systemLoadCalculate.addUtilizationData(0.4, 0.3); // Fourth record + systemLoadCalculate.addUtilizationData(0.5, 0.4); // Third record + systemLoadCalculate.addUtilizationData(0.7, 0.6); // Second record + systemLoadCalculate.addUtilizationData(0.6, 0.5); // Newest record + double comprehensiveResourceAvailability = + systemLoadCalculate.calculateSchedulingPriority(); + Address address = new Address("127.0.0.1", 5701); + WorkerProfile workerProfile = Mockito.mock(WorkerProfile.class); + when(workerProfile.getAddress()).thenReturn(address); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[5]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[3]); + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots = + new ConcurrentHashMap<>(); + + // Each task has a fixed Slot resource + double singleSlotResource = + Math.round(((1 - comprehensiveResourceAvailability) / 5) * 100.0) / 100.0; + int times = 0; + + // When the worker has not been assigned, the overall resource idle rate remains unchanged + double result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + double expected = comprehensiveResourceAvailability - (singleSlotResource * times); + Assertions.assertEquals(expected, result, 0.01); + Assertions.assertEquals( + comprehensiveResourceAvailability - (singleSlotResource * times), result, 0.01); + Assertions.assertEquals(0.5, result, 0.01); + + // The worker has been assigned 1 slot + times = 1; + workerAssignedSlots.put(address, new ImmutableTriple<>(singleSlotResource, 1, 0)); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[6]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[2]); + result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + expected = comprehensiveResourceAvailability - (singleSlotResource * times); + Assertions.assertEquals(expected, result, 0.01); + Assertions.assertEquals( + comprehensiveResourceAvailability - (singleSlotResource * times), result, 0.01); + Assertions.assertEquals(0.4, result, 0.01); + + workerAssignedSlots.put(address, new ImmutableTriple<>(singleSlotResource, 2, 0)); + when(workerProfile.getAssignedSlots()).thenReturn(new SlotProfile[7]); + when(workerProfile.getUnassignedSlots()).thenReturn(new SlotProfile[1]); + result = + systemLoadCalculate.calculateComprehensiveResourceAvailability( + comprehensiveResourceAvailability, workerProfile, workerAssignedSlots); + double balanceFactor = systemLoadCalculate.balanceFactor(workerProfile, 7); + Assertions.assertEquals(0.12, balanceFactor, 0.01); + + double finalResult = 0.7 * 0.3 + 0.125 * 0.3; + Assertions.assertEquals( + finalResult, + systemLoadCalculate.calculateResourceAvailability(result, balanceFactor), + 0.01); + + // Verification plan 2: simulate the actual scenario and call the calculateWeight method to + // verify the final result and whether it is consistent with the result of step 1 + Map<Address, EvictingQueue<SystemLoadInfo>> workerLoadMap = new ConcurrentHashMap<>(); + workerLoadMap + .computeIfAbsent(address, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.3, 0.2)); + workerLoadMap + .computeIfAbsent(address, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.4, 0.3)); + workerLoadMap + .computeIfAbsent(address, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.5, 0.4)); + workerLoadMap + .computeIfAbsent(address, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.7, 0.6)); + workerLoadMap + .computeIfAbsent(address, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.6, 0.5)); + + // Mock current node resources + WorkerProfile workerProfile2 = Mockito.mock(WorkerProfile.class); + when(workerProfile2.getAssignedSlots()).thenReturn(new SlotProfile[5]); + when(workerProfile2.getUnassignedSlots()).thenReturn(new SlotProfile[3]); + when(workerProfile2.getAddress()).thenReturn(address); + + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots2 = + new ConcurrentHashMap<>(); + List<ResourceProfile> resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + // Mock ResourceManager + AbstractResourceManager rm = Mockito.mock(AbstractResourceManager.class); + when(rm.getEngineConfig()).thenReturn(Mockito.mock(EngineConfig.class)); + when(rm.getEngineConfig().getSlotServiceConfig()) + .thenReturn(Mockito.mock(SlotServiceConfig.class)); + when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy()) + .thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue()); + // Simulate ResourceRequestHandler to call calculateWeight to calculate weight + ResourceRequestHandler resourceRequestHandler = + new ResourceRequestHandler(1L, resourceProfiles, null, rm, workerLoadMap); + resourceRequestHandler.calculateWeight(workerProfile2, workerAssignedSlots2); + // Mock Application Resources + workerAssignedSlots2.put(address, new ImmutableTriple<>(singleSlotResource, 1, 5)); + when(workerProfile2.getAssignedSlots()).thenReturn(new SlotProfile[6]); + when(workerProfile2.getUnassignedSlots()).thenReturn(new SlotProfile[2]); + resourceRequestHandler.calculateWeight(workerProfile2, workerAssignedSlots2); + + workerAssignedSlots2.put(address, new ImmutableTriple<>(singleSlotResource, 2, 5)); + when(workerProfile2.getAssignedSlots()).thenReturn(new SlotProfile[7]); + when(workerProfile2.getUnassignedSlots()).thenReturn(new SlotProfile[1]); + // Verity + Assertions.assertEquals( + resourceRequestHandler.calculateWeight(workerProfile2, workerAssignedSlots2), + finalResult); + } + + /** + * Test Multi-Node System Load Balancing: + * + * <p>This test simulates the load distribution between two nodes, gradually increasing each + * node's load to verify the system's load balancing algorithm. The main steps include creating + * nodes, adding load information, configuring resource management components, calculating node + * weights, and finally allocating slots based on these weights. + * + * <p>Specific Process: <br> + * - Initialize two nodes (address1 and address2), each with a pre-added 5 load entries. <br> + * - Configure ResourceManager and ResourceRequestHandler for handling resource requests and + * calculating weights. <br> + * - Create workerProfile1 and workerProfile2, representing two worker nodes, and set their + * allocated and unallocated slots. <br> + * - Initially, it is expected that the first node has a higher weight (0.78 vs the second + * node's 0.41), leading to the preference of the first node for allocation. <br> + * - Gradually allocate slots to the first node (from 1 to 4), recalculating weights after each + * allocation and noting changes: <br> + * - After allocating 1 slot: the first node's weight drops to 0.68; <br> + * - After allocating 2 slots: the first node's weight drops to 0.58; <br> + * - After allocating 3 slots: the first node's weight drops to 0.48; <br> + * - After allocating 4 slots: the first node's weight drops to 0.38, at which point the second + * node has a higher weight (0.41), switching preference to the second node. <br> + * - Finally, allocate one slot to the second node, updating its weight to 0.31, and again + * choosing the first node for allocation. <br> + * <br> + * Each slot consumes a fixed amount of resources, set to 0.1 in this test case. This test + * ensures that the load balancing algorithm can make reasonable resource allocation decisions + * based on the current load situation of the nodes. <br> + */ + @Test + @DisplayName("All: Test multiple node system load") + void testMultipleNodeSystemLoad() throws UnknownHostException { + Address address1 = new Address("127.0.0.1", 5701); + Address address2 = new Address("127.0.0.1", 5702); + + // Simulate the actual scenario and call the calculateWeight method to verify the final + // result + Map<Address, EvictingQueue<SystemLoadInfo>> workerLoadMap = new ConcurrentHashMap<>(); + workerLoadMap + .computeIfAbsent(address1, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.2, 0.1)); + workerLoadMap + .computeIfAbsent(address1, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.3, 0.2)); + workerLoadMap + .computeIfAbsent(address1, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.4, 0.3)); + workerLoadMap + .computeIfAbsent(address1, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.4, 0.4)); + workerLoadMap + .computeIfAbsent(address1, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.3, 0.3)); + + workerLoadMap + .computeIfAbsent(address2, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.8, 0.7)); + workerLoadMap + .computeIfAbsent(address2, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.9, 0.8)); + workerLoadMap + .computeIfAbsent(address2, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.85, 0.75)); + workerLoadMap + .computeIfAbsent(address2, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.9, 0.85)); + workerLoadMap + .computeIfAbsent(address2, v -> EvictingQueue.create(5)) + .offer(new SystemLoadInfo(0.88, 0.8)); + + // Mock current node resources + WorkerProfile workerProfile2 = Mockito.mock(WorkerProfile.class); + when(workerProfile2.getAssignedSlots()).thenReturn(new SlotProfile[0]); + when(workerProfile2.getUnassignedSlots()).thenReturn(new SlotProfile[10]); + when(workerProfile2.getAddress()).thenReturn(address2); + + List<ResourceProfile> resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + + // Mock ResourceManager + AbstractResourceManager rm = Mockito.mock(AbstractResourceManager.class); + when(rm.getEngineConfig()).thenReturn(Mockito.mock(EngineConfig.class)); + when(rm.getEngineConfig().getSlotServiceConfig()) + .thenReturn(Mockito.mock(SlotServiceConfig.class)); + when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy()) + .thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue()); + + WorkerProfile workerProfile1 = Mockito.mock(WorkerProfile.class); + when(workerProfile1.getAssignedSlots()).thenReturn(new SlotProfile[0]); + when(workerProfile1.getUnassignedSlots()).thenReturn(new SlotProfile[10]); + when(workerProfile1.getAddress()).thenReturn(address1); + // Simulate ResourceRequestHandler to call calculateWeight to calculate weight + ResourceRequestHandler resourceRequestHandler = + new ResourceRequestHandler(1L, resourceProfiles, null, rm, workerLoadMap); + + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots1 = + new ConcurrentHashMap<>(); + Double calculateWeight1 = + resourceRequestHandler.calculateWeight(workerProfile1, workerAssignedSlots1); + System.out.println("Node1 initialization weight: " + calculateWeight1); + + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots2 = + new ConcurrentHashMap<>(); + Double calculateWeight2 = + resourceRequestHandler.calculateWeight(workerProfile2, workerAssignedSlots2); + System.out.println("Node2 initialization weight: " + calculateWeight2); + + // First node load is low, second node load is high, first node weight should be greater + // than second node + Assertions.assertTrue(calculateWeight1 > calculateWeight2); + + // Tip: Here, we default to singleSlotUseResource=0.1 for easy verification of the accuracy + // of the results. The singleSlotUseResource for the load can refer to the class: + // org.apache.setannel.engine.E2e.allocatestgy SystemLoadAllocateStrategyIT + double singleSlotUseResource = 0.1; + + // First node is assigned a slot + workerAssignedSlots1.put(address1, new ImmutableTriple<>(singleSlotUseResource, 1, 0)); + when(workerProfile1.getAssignedSlots()).thenReturn(new SlotProfile[1]); + when(workerProfile1.getUnassignedSlots()).thenReturn(new SlotProfile[9]); + calculateWeight1 = + resourceRequestHandler.calculateWeight(workerProfile1, workerAssignedSlots1); + calculateWeight2 = + resourceRequestHandler.calculateWeight(workerProfile2, workerAssignedSlots2); + System.out.println( Review Comment: please do not use System.out.println. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/allocation/strategy/SlotRatioStrategy.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.allocation.strategy; + +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import com.hazelcast.cluster.Address; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** SlotRatioStrategy is a strategy that selects the worker with the lowest slot usage rate. */ +public class SlotRatioStrategy implements SlotAllocationStrategy { + + private final Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots; + + public SlotRatioStrategy( + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots) { Review Comment: I can't know the ImmutableTriple values used for. How about use a object to store it? ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/allocation/strategy/SystemLoadStrategy.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.allocation.strategy; + +import org.apache.seatunnel.shade.com.google.common.collect.EvictingQueue; + +import org.apache.seatunnel.engine.server.resourcemanager.resource.SystemLoadInfo; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; +import org.apache.seatunnel.engine.server.utils.SystemLoadCalculate; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import com.hazelcast.cluster.Address; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** SystemLoadStrategy is a strategy that selects the worker with the lowest system load. */ +public class SystemLoadStrategy implements SlotAllocationStrategy { + private final Map<Address, EvictingQueue<SystemLoadInfo>> workerLoadMap; + + public SystemLoadStrategy(Map<Address, EvictingQueue<SystemLoadInfo>> workerLoadMap) { + this.workerLoadMap = workerLoadMap; + } + + @Override + public Optional<WorkerProfile> selectWorker( + List<WorkerProfile> availableWorkers, + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots) { + Optional<WorkerProfile> workerProfile = + availableWorkers.stream() + .max( + Comparator.comparingDouble( + w -> calculateWeight(w, workerAssignedSlots))); + + workerProfile.ifPresent( + profile -> { + workerAssignedSlots.merge( + profile.getAddress(), + new ImmutableTriple<>(0.0, 1, profile.getAssignedSlots().length), Review Comment: ditto ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java: ########## @@ -49,21 +57,27 @@ public abstract class AbstractResourceManager implements ResourceManager { private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500; - protected final ConcurrentMap<Address, WorkerProfile> registerWorker; + @Getter public final ConcurrentMap<Address, WorkerProfile> registerWorker; private final NodeEngine nodeEngine; private final ExecutionMode mode; - private final EngineConfig engineConfig; + @Getter private final EngineConfig engineConfig; private volatile boolean isRunning = true; + private Map<Address, EvictingQueue<SystemLoadInfo>> workerLoadMap; + + @Setter @Getter + private Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots; Review Comment: There fields only work when set specify strategy. Can we move it into Stragtegy class? We can skip store it when strategy is which doesn't need them. ########## seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/allocation/strategy/SlotRatioStrategy.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager.allocation.strategy; + +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; + +import org.apache.commons.lang3.tuple.ImmutableTriple; + +import com.hazelcast.cluster.Address; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** SlotRatioStrategy is a strategy that selects the worker with the lowest slot usage rate. */ +public class SlotRatioStrategy implements SlotAllocationStrategy { + + private final Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots; + + public SlotRatioStrategy( + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots) { + this.workerAssignedSlots = workerAssignedSlots; + } + + @Override + public Optional<WorkerProfile> selectWorker( + List<WorkerProfile> availableWorkers, + Map<Address, ImmutableTriple<Double, Integer, Integer>> workerAssignedSlots) { + + Optional<WorkerProfile> workerProfile = + availableWorkers.stream().min(Comparator.comparingDouble(this::calculateSlotUsage)); + workerProfile.ifPresent( + profile -> { + workerAssignedSlots.merge( + profile.getAddress(), + new ImmutableTriple<>(0.0, 1, profile.getAssignedSlots().length), + (oldVal, newVal) -> + new ImmutableTriple<>(0.0, oldVal.middle + 1, oldVal.right)); + }); + return workerProfile; + } + + /** + * Calculate the slot usage rate of the worker + * + * @param worker WorkerProfile + * @return slot usage rate, range 0.0-1.0 + */ + private double calculateSlotUsage(WorkerProfile worker) { + ImmutableTriple<Double, Integer, Integer> immutableTriple = + workerAssignedSlots.get(worker.getAddress()); + // If we manually record the number of assigned slots, we use that number, since + // worker.getAssignedSlots is not updated in real time. + int assignedSlots = + (immutableTriple != null) + ? immutableTriple.middle + : worker.getAssignedSlots().length; + workerAssignedSlots.put(worker.getAddress(), new ImmutableTriple<>(0.0, assignedSlots, 0)); + + int totalSlots = worker.getUnassignedSlots().length + worker.getAssignedSlots().length; + if (totalSlots == 0) { Review Comment: if the worker use dynamic slot, the totalSlots is 0. But we should give a chance to dynamic worker to execute task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org