This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 5bf6f7892a [Improve][Zeta] Add retry when request slot on resource manager (#7049) 5bf6f7892a is described below commit 5bf6f7892a65ed26ef0bee3d6e21638bd7cab37b Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Jul 10 22:28:42 2024 +0800 [Improve][Zeta] Add retry when request slot on resource manager (#7049) --- .../resourcemanager/AbstractResourceManager.java | 4 +- .../resourcemanager/ResourceRequestHandler.java | 118 +++++++++++++----- .../opeartion/ReleaseSlotOperation.java | 8 ++ .../resourcemanager/worker/WorkerProfile.java | 4 + .../server/service/slot/DefaultSlotService.java | 6 + .../server/service/slot/SlotAndWorkerProfile.java | 1 + .../resourcemanager/FakeResourceManager.java | 49 +++----- ...FakeResourceManagerForRequestSlotRetryTest.java | 133 +++++++++++++++++++++ .../resourcemanager/ResourceManagerTest.java | 115 ++++++++++++++++-- 9 files changed, 363 insertions(+), 75 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index de93bf72ed..5fe29fa6f1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -223,7 +223,9 @@ public abstract class AbstractResourceManager implements ResourceManager { @Override public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) { if (nodeEngine.getClusterService().getMember(profile.getWorker()) != null) { - return sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker()); + CompletableFuture<WorkerProfile> future = + sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker()); + return future.thenAccept(this::heartbeat); } else { return CompletableFuture.completedFuture(null); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java index 0af3738a4a..0009357fcc 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile; +import com.google.common.annotations.VisibleForTesting; import com.hazelcast.cluster.Address; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; @@ -33,7 +34,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -54,6 +54,8 @@ public class ResourceRequestHandler { private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles; private final ConcurrentMap<Address, WorkerProfile> registerWorker; + private static final int MAX_RETRY_TIMES = 3; + private final long jobId; private final List<ResourceProfile> resourceProfile; @@ -74,49 +76,94 @@ public class ResourceRequestHandler { } public CompletableFuture<List<SlotProfile>> request(Map<String, String> tags) { - List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new ArrayList<>(); - for (int i = 0; i < resourceProfile.size(); i++) { - ResourceProfile r = resourceProfile.get(i); - Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r); - if (workerProfile.isPresent()) { - // request slot to member - CompletableFuture<SlotAndWorkerProfile> internalCompletableFuture = - singleResourceRequestToMember(i, r, workerProfile.get()); - allRequestFuture.add(internalCompletableFuture); - } + requestSlotWithRetry(resourceProfile, MAX_RETRY_TIMES, tags); + return completableFuture; + } + + private CompletableFuture<SlotAndWorkerProfile> requestSlotWithRetry( + List<ResourceProfile> request, int retryTimes, Map<String, String> tags) { + if (retryTimes <= 0) { + LOGGER.fine("can't apply resource request with retry times: " + MAX_RETRY_TIMES); + return CompletableFuture.supplyAsync( + () -> { + throw new NoEnoughResourceException( + "can't apply resource request with retry times: " + + MAX_RETRY_TIMES); + }); } + List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = requestSlots(request); // all resource preCheck done, also had sent request to worker - getAllOfFuture(allRequestFuture) + return getAllOfFuture(allRequestFuture) .whenComplete( withTryCatch( LOGGER, (unused, error) -> { if (error != null) { completeRequestWithException(error); - } - if (resultSlotProfiles.size() < resourceProfile.size()) { - // meaning have some slot not request success - if (resourceManager.supportDynamicWorker()) { - applyByDynamicWorker(tags); - } else { - completeRequestWithException( - new NoEnoughResourceException( - "can't apply resource request: " - + resourceProfile.get( - findNullIndexInResultSlotProfiles()))); + } else { + List<ResourceProfile> needRequestResource = + stillNeedRequestResource(); + if (!needRequestResource.isEmpty()) { + Exception requestSlotWithRetryError = null; + try { + requestSlotWithRetry( + needRequestResource, + retryTimes - 1, + tags) + .get(); + } catch (Exception e) { + LOGGER.warning( + "request slot with retry error: " + + e.getMessage()); + requestSlotWithRetryError = e; + } + if (requestSlotWithRetryError != null) { + // meaning have some slot not request success + if (resourceManager.supportDynamicWorker()) { + applyByDynamicWorker(tags); + } else { + completeRequestWithException( + requestSlotWithRetryError); + } + } } } })); - return completableFuture; } - private int findNullIndexInResultSlotProfiles() { + private List<ResourceProfile> stillNeedRequestResource() { + List<ResourceProfile> needRequestResource = new ArrayList<>(); for (int i = 0; i < resourceProfile.size(); i++) { if (!resultSlotProfiles.containsKey(i)) { - return i; + needRequestResource.add(resourceProfile.get(i)); + } + } + return needRequestResource; + } + + private List<CompletableFuture<SlotAndWorkerProfile>> requestSlots( + List<ResourceProfile> requestProfile) { + List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new ArrayList<>(); + for (int i = 0; i < requestProfile.size(); i++) { + ResourceProfile r = requestProfile.get(i); + Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r); + if (workerProfile.isPresent()) { + // request slot to member + CompletableFuture<SlotAndWorkerProfile> internalCompletableFuture = + singleResourceRequestToMember(i, r, workerProfile.get()); + allRequestFuture.add(internalCompletableFuture); + } else { + // if no worker can provide the resource, we should return a failed future + LOGGER.fine("pre check worker resource failed, can't apply resource request: " + r); + allRequestFuture.add( + CompletableFuture.supplyAsync( + () -> { + throw new NoEnoughResourceException( + "can't apply resource request: " + r); + })); } } - return -1; + return allRequestFuture; } private void completeRequestWithException(Throwable e) { @@ -125,6 +172,7 @@ public class ResourceRequestHandler { } private void addSlotToCacheMap(int index, SlotProfile slotProfile) { + // null value means the slot request failed, no suitable slot found if (null != slotProfile) { resultSlotProfiles.put(index, slotProfile); if (resultSlotProfiles.size() == resourceProfile.size()) { @@ -134,6 +182,8 @@ public class ResourceRequestHandler { } completableFuture.complete(value); } + } else { + LOGGER.fine("no suitable slot found for resource: " + resourceProfile.get(index)); } } @@ -155,7 +205,8 @@ public class ResourceRequestHandler { })); } - private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) { + @VisibleForTesting + public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) { // Shuffle the order to ensure random selection of workers List<WorkerProfile> workerProfiles = Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0])); @@ -176,6 +227,7 @@ public class ResourceRequestHandler { // Check if there are still unassigned resources workerProfile = workerProfiles.stream() + .filter(WorkerProfile::isDynamicSlot) .filter(worker -> worker.getUnassignedResource().enoughThan(r)) .findAny(); } @@ -217,11 +269,13 @@ public class ResourceRequestHandler { private void releaseAllResourceInternal() { LOGGER.warning("apply resource not success, release all already applied resource"); - resultSlotProfiles.values().stream() - .filter(Objects::nonNull) + new ArrayList<>(resultSlotProfiles.keySet()) .forEach( - profile -> { - resourceManager.releaseResource(jobId, profile); + index -> { + SlotProfile profile = resultSlotProfiles.remove(index); + if (profile != null) { + resourceManager.releaseResource(jobId, profile); + } }); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java index e812815746..2d2a0d0416 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.resourcemanager.opeartion; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook; import org.apache.seatunnel.engine.server.service.slot.WrongTargetSlotException; @@ -36,6 +37,7 @@ public class ReleaseSlotOperation extends Operation implements IdentifiedDataSer private long jobID; private SlotProfile slotProfile; + private WorkerProfile result; public ReleaseSlotOperation() {} @@ -56,6 +58,12 @@ public class ReleaseSlotOperation extends Operation implements IdentifiedDataSer slotProfile, ExceptionUtils.getMessage(ignore)); } + result = server.getSlotService().getWorkerProfile(); + } + + @Override + public Object getResponse() { + return result; } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java index 291df1f1f8..0d0f8c8054 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java @@ -44,6 +44,8 @@ public class WorkerProfile implements IdentifiedDataSerializable { private ResourceProfile unassignedResource; + private boolean dynamicSlot; + private SlotProfile[] assignedSlots; private SlotProfile[] unassignedSlots; @@ -82,6 +84,7 @@ public class WorkerProfile implements IdentifiedDataSerializable { for (SlotProfile unassignedSlot : unassignedSlots) { out.writeObject(unassignedSlot); } + out.writeBoolean(dynamicSlot); } @Override @@ -99,5 +102,6 @@ public class WorkerProfile implements IdentifiedDataSerializable { for (int i = 0; i < unassignedSlots.length; i++) { unassignedSlots[i] = in.readObject(); } + dynamicSlot = in.readBoolean(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java index 250a6f2eb4..7c0ae38bfd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java @@ -196,6 +196,11 @@ public class DefaultSlotService implements SlotService { } } + /** + * Select the best match slot for the profile. + * + * @return the best match slot, null if no suitable slot found. + */ private SlotProfile selectBestMatchSlot(ResourceProfile profile) { if (unassignedSlots.isEmpty() && !config.isDynamicSlot()) { return null; @@ -259,6 +264,7 @@ public class DefaultSlotService implements SlotService { workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0])); workerProfile.setUnassignedResource(unassignedResource.get()); workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes()); + workerProfile.setDynamicSlot(config.isDynamicSlot()); return workerProfile; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java index dc9a46d63c..a9168518e7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java @@ -31,6 +31,7 @@ public class SlotAndWorkerProfile implements IdentifiedDataSerializable { private WorkerProfile workerProfile; + // null value means the slot request failed, no suitable slot found private SlotProfile slotProfile; public SlotAndWorkerProfile() {} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java index 8d45ef2d49..0118c15879 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java @@ -42,42 +42,28 @@ public class FakeResourceManager extends AbstractResourceManager { @Override public void init() { try { - Address address1 = new Address("localhost", 5801); - WorkerProfile workerProfile1 = - new WorkerProfile( - address1, - new ResourceProfile(), - new ResourceProfile(), - new SlotProfile[] {}, - new SlotProfile[] {}, - Collections.emptyMap()); - this.registerWorker.put(address1, workerProfile1); - - Address address2 = new Address("localhost", 5802); - WorkerProfile workerProfile2 = - new WorkerProfile( - address2, - new ResourceProfile(), - new ResourceProfile(), - new SlotProfile[] {}, - new SlotProfile[] {}, - Collections.emptyMap()); - this.registerWorker.put(address2, workerProfile2); - Address address3 = new Address("localhost", 5803); - WorkerProfile workerProfile3 = - new WorkerProfile( - address3, - new ResourceProfile(), - new ResourceProfile(), - new SlotProfile[] {}, - new SlotProfile[] {}, - Collections.emptyMap()); - this.registerWorker.put(address3, workerProfile3); + generateWorker(5801); + generateWorker(5802); + generateWorker(5803); } catch (UnknownHostException e) { throw new RuntimeException(e); } } + private void generateWorker(int port) throws UnknownHostException { + Address address = new Address("localhost", port); + WorkerProfile workerProfile = + new WorkerProfile( + address, + new ResourceProfile(), + new ResourceProfile(), + true, + new SlotProfile[] {}, + new SlotProfile[] {}, + Collections.emptyMap()); + this.registerWorker.put(address, workerProfile); + } + @Override protected <E> CompletableFuture<E> sendToMember(Operation operation, Address address) { if (operation instanceof RequestSlotOperation) { @@ -88,6 +74,7 @@ public class FakeResourceManager extends AbstractResourceManager { address, new ResourceProfile(), new ResourceProfile(), + true, new SlotProfile[] {}, new SlotProfile[] {}, Collections.emptyMap()), diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java new file mode 100644 index 0000000000..1dc427e4f8 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.resourcemanager; + +import org.apache.seatunnel.engine.common.config.EngineConfig; +import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation; +import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; +import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; +import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile; + +import com.hazelcast.cluster.Address; +import com.hazelcast.spi.impl.NodeEngine; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** Used to test ResourceManager, override init method to register more workers. */ +public class FakeResourceManagerForRequestSlotRetryTest extends AbstractResourceManager { + + private final int newWorkerCount; + private final int noSlotWorkerCount; + private final AtomicInteger queryIndex = new AtomicInteger(0); + + private final Set<Address> cannotRequestAddress = new HashSet<>(); + + public FakeResourceManagerForRequestSlotRetryTest( + NodeEngine nodeEngine, int newWorkerCount, int noSlotWorkerCount) { + super(nodeEngine, new EngineConfig()); + this.newWorkerCount = newWorkerCount; + this.noSlotWorkerCount = noSlotWorkerCount; + init(); + } + + @Override + public void init() { + try { + for (int i = 0; i < newWorkerCount; i++) { + generateWorker(5801 + i); + } + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + + private void generateWorker(int port) throws UnknownHostException { + Address address = new Address("localhost", port); + WorkerProfile workerProfile = + new WorkerProfile( + address, + new ResourceProfile(), + new ResourceProfile(), + false, + new SlotProfile[] {}, + new SlotProfile[] { + new SlotProfile(address, 1, new ResourceProfile(), ""), + new SlotProfile(address, 2, new ResourceProfile(), "") + }, + Collections.emptyMap()); + this.registerWorker.put(address, workerProfile); + } + + @Override + protected <E> CompletableFuture<E> sendToMember(Operation operation, Address address) { + if (operation instanceof RequestSlotOperation) { + if (cannotRequestAddress.contains(address)) { + throw new IllegalStateException("Cannot request slot for " + address); + } + if (queryIndex.getAndIncrement() < noSlotWorkerCount) { + cannotRequestAddress.add(address); + // query will return empty slot + return (CompletableFuture<E>) + CompletableFuture.completedFuture( + new SlotAndWorkerProfile( + new WorkerProfile( + address, + new ResourceProfile(), + new ResourceProfile(), + false, + new SlotProfile[] { + new SlotProfile( + address, 1, new ResourceProfile(), ""), + new SlotProfile( + address, 2, new ResourceProfile(), "") + }, + // no unassigned slot + new SlotProfile[] {}, + Collections.emptyMap()), + null)); + } + return (CompletableFuture<E>) + CompletableFuture.completedFuture( + new SlotAndWorkerProfile( + new WorkerProfile( + address, + new ResourceProfile(), + new ResourceProfile(), + false, + new SlotProfile[] { + new SlotProfile( + address, 1, new ResourceProfile(), "") + }, + new SlotProfile[] { + new SlotProfile( + address, 3, new ResourceProfile(), "") + }, + Collections.emptyMap()), + new SlotProfile(address, 2, new ResourceProfile(), ""))); + } else { + return super.sendToMember(operation, address); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java index 2589e6530c..abd4ccdc09 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU; import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory; import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile; import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -29,9 +30,14 @@ import org.junit.jupiter.api.Test; import com.hazelcast.cluster.Address; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -87,23 +93,110 @@ public class ResourceManagerTest extends AbstractSeaTunnelServerTest<ResourceMan public void testApplyResourceWithRandomResult() throws ExecutionException, InterruptedException { FakeResourceManager resourceManager = new FakeResourceManager(nodeEngine); + boolean hasDifferentWorker = false; + for (int i = 0; i < 5; i++) { + List<ResourceProfile> resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + resourceProfiles.add(new ResourceProfile()); + List<SlotProfile> slotProfiles = + resourceManager.applyResources(1L, resourceProfiles, null).get(); + Assertions.assertEquals(slotProfiles.size(), 5); + Set<Address> addresses = + slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet()); + hasDifferentWorker |= addresses.size() > 1; + } + Assertions.assertTrue(hasDifferentWorker, "should have different worker for each slot"); + } + @Test + public void testApplyResourceWithRetryWhenSameNodeNoSlotSuited() + throws ExecutionException, InterruptedException { + // test retry request slot times 1 + FakeResourceManagerForRequestSlotRetryTest resourceManager = + new FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2, 1); List<ResourceProfile> resourceProfiles = new ArrayList<>(); resourceProfiles.add(new ResourceProfile()); resourceProfiles.add(new ResourceProfile()); - resourceProfiles.add(new ResourceProfile()); - resourceProfiles.add(new ResourceProfile()); - resourceProfiles.add(new ResourceProfile()); List<SlotProfile> slotProfiles = resourceManager.applyResources(1L, resourceProfiles, null).get(); - Assertions.assertEquals(slotProfiles.size(), 5); + Assertions.assertEquals(slotProfiles.size(), 2); - boolean hasDifferentWorker = false; - for (int i = 0; i < 5; i++) { - Set<Address> addresses = - slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet()); - hasDifferentWorker = addresses.size() > 1; - } - Assertions.assertTrue(hasDifferentWorker, "should have different worker for each slot"); + // test retry request slot time 2 but no enough slot with worker + resourceManager = new FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2, 2); + FakeResourceManagerForRequestSlotRetryTest finalResourceManager = resourceManager; + List<ResourceProfile> finalResourceProfiles = resourceProfiles; + ExecutionException exception = + Assertions.assertThrows( + ExecutionException.class, + () -> + finalResourceManager + .applyResources(1L, finalResourceProfiles, null) + .get()); + Assertions.assertInstanceOf(NoEnoughResourceException.class, exception.getCause()); + + // test retry request slot time 4 so that more than max retry times + resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + resourceManager = new FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 5, 4); + List<ResourceProfile> finalResourceProfiles2 = resourceProfiles; + FakeResourceManagerForRequestSlotRetryTest finalResourceManager2 = resourceManager; + ExecutionException exception2 = + Assertions.assertThrows( + ExecutionException.class, + () -> + finalResourceManager2 + .applyResources(1L, finalResourceProfiles2, null) + .get()); + Assertions.assertInstanceOf( + NoEnoughResourceException.class, exception2.getCause().getCause()); + Assertions.assertEquals( + "can't apply resource request with retry times: 3", + exception2.getCause().getCause().getMessage()); + } + + @Test + public void testPreCheckWorkerResourceWithDynamicSlot() throws UnknownHostException { + testPreCheckWorkerResource(true); + testPreCheckWorkerResource(false); + } + + public void testPreCheckWorkerResource(boolean dynamicSlot) throws UnknownHostException { + List<ResourceProfile> resourceProfiles = new ArrayList<>(); + resourceProfiles.add(new ResourceProfile()); + ConcurrentMap<Address, WorkerProfile> registerWorker = new ConcurrentHashMap<>(); + Address address1 = new Address("localhost", 5801); + WorkerProfile workerProfile1 = + new WorkerProfile( + address1, + new ResourceProfile(), + new ResourceProfile(), + dynamicSlot, + new SlotProfile[] {}, + new SlotProfile[] {}, + Collections.emptyMap()); + registerWorker.put(address1, workerProfile1); + + Address address2 = new Address("localhost", 5802); + WorkerProfile workerProfile2 = + new WorkerProfile( + address2, + new ResourceProfile(), + new ResourceProfile(), + dynamicSlot, + new SlotProfile[] {}, + new SlotProfile[] {}, + Collections.emptyMap()); + registerWorker.put(address2, workerProfile2); + Optional<WorkerProfile> result = + new ResourceRequestHandler( + jobId, + resourceProfiles, + registerWorker, + (AbstractResourceManager) this.resourceManager) + .preCheckWorkerResource(new ResourceProfile()); + Assertions.assertEquals(result.isPresent(), dynamicSlot); } }