This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5bf6f7892a [Improve][Zeta] Add retry when request slot on resource 
manager (#7049)
5bf6f7892a is described below

commit 5bf6f7892a65ed26ef0bee3d6e21638bd7cab37b
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Wed Jul 10 22:28:42 2024 +0800

    [Improve][Zeta] Add retry when request slot on resource manager (#7049)
---
 .../resourcemanager/AbstractResourceManager.java   |   4 +-
 .../resourcemanager/ResourceRequestHandler.java    | 118 +++++++++++++-----
 .../opeartion/ReleaseSlotOperation.java            |   8 ++
 .../resourcemanager/worker/WorkerProfile.java      |   4 +
 .../server/service/slot/DefaultSlotService.java    |   6 +
 .../server/service/slot/SlotAndWorkerProfile.java  |   1 +
 .../resourcemanager/FakeResourceManager.java       |  49 +++-----
 ...FakeResourceManagerForRequestSlotRetryTest.java | 133 +++++++++++++++++++++
 .../resourcemanager/ResourceManagerTest.java       | 115 ++++++++++++++++--
 9 files changed, 363 insertions(+), 75 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index de93bf72ed..5fe29fa6f1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -223,7 +223,9 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
     @Override
     public CompletableFuture<Void> releaseResource(long jobId, SlotProfile 
profile) {
         if (nodeEngine.getClusterService().getMember(profile.getWorker()) != 
null) {
-            return sendToMember(new ReleaseSlotOperation(jobId, profile), 
profile.getWorker());
+            CompletableFuture<WorkerProfile> future =
+                    sendToMember(new ReleaseSlotOperation(jobId, profile), 
profile.getWorker());
+            return future.thenAccept(this::heartbeat);
         } else {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 0af3738a4a..0009357fcc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -33,7 +34,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -54,6 +54,8 @@ public class ResourceRequestHandler {
     private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
     private final ConcurrentMap<Address, WorkerProfile> registerWorker;
 
+    private static final int MAX_RETRY_TIMES = 3;
+
     private final long jobId;
 
     private final List<ResourceProfile> resourceProfile;
@@ -74,49 +76,94 @@ public class ResourceRequestHandler {
     }
 
     public CompletableFuture<List<SlotProfile>> request(Map<String, String> 
tags) {
-        List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new 
ArrayList<>();
-        for (int i = 0; i < resourceProfile.size(); i++) {
-            ResourceProfile r = resourceProfile.get(i);
-            Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
-            if (workerProfile.isPresent()) {
-                // request slot to member
-                CompletableFuture<SlotAndWorkerProfile> 
internalCompletableFuture =
-                        singleResourceRequestToMember(i, r, 
workerProfile.get());
-                allRequestFuture.add(internalCompletableFuture);
-            }
+        requestSlotWithRetry(resourceProfile, MAX_RETRY_TIMES, tags);
+        return completableFuture;
+    }
+
+    private CompletableFuture<SlotAndWorkerProfile> requestSlotWithRetry(
+            List<ResourceProfile> request, int retryTimes, Map<String, String> 
tags) {
+        if (retryTimes <= 0) {
+            LOGGER.fine("can't apply resource request with retry times: " + 
MAX_RETRY_TIMES);
+            return CompletableFuture.supplyAsync(
+                    () -> {
+                        throw new NoEnoughResourceException(
+                                "can't apply resource request with retry 
times: "
+                                        + MAX_RETRY_TIMES);
+                    });
         }
+        List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = 
requestSlots(request);
         // all resource preCheck done, also had sent request to worker
-        getAllOfFuture(allRequestFuture)
+        return getAllOfFuture(allRequestFuture)
                 .whenComplete(
                         withTryCatch(
                                 LOGGER,
                                 (unused, error) -> {
                                     if (error != null) {
                                         completeRequestWithException(error);
-                                    }
-                                    if (resultSlotProfiles.size() < 
resourceProfile.size()) {
-                                        // meaning have some slot not request 
success
-                                        if 
(resourceManager.supportDynamicWorker()) {
-                                            applyByDynamicWorker(tags);
-                                        } else {
-                                            completeRequestWithException(
-                                                    new 
NoEnoughResourceException(
-                                                            "can't apply 
resource request: "
-                                                                    + 
resourceProfile.get(
-                                                                            
findNullIndexInResultSlotProfiles())));
+                                    } else {
+                                        List<ResourceProfile> 
needRequestResource =
+                                                stillNeedRequestResource();
+                                        if (!needRequestResource.isEmpty()) {
+                                            Exception 
requestSlotWithRetryError = null;
+                                            try {
+                                                requestSlotWithRetry(
+                                                                
needRequestResource,
+                                                                retryTimes - 1,
+                                                                tags)
+                                                        .get();
+                                            } catch (Exception e) {
+                                                LOGGER.warning(
+                                                        "request slot with 
retry error: "
+                                                                + 
e.getMessage());
+                                                requestSlotWithRetryError = e;
+                                            }
+                                            if (requestSlotWithRetryError != 
null) {
+                                                // meaning have some slot not 
request success
+                                                if 
(resourceManager.supportDynamicWorker()) {
+                                                    applyByDynamicWorker(tags);
+                                                } else {
+                                                    
completeRequestWithException(
+                                                            
requestSlotWithRetryError);
+                                                }
+                                            }
                                         }
                                     }
                                 }));
-        return completableFuture;
     }
 
-    private int findNullIndexInResultSlotProfiles() {
+    private List<ResourceProfile> stillNeedRequestResource() {
+        List<ResourceProfile> needRequestResource = new ArrayList<>();
         for (int i = 0; i < resourceProfile.size(); i++) {
             if (!resultSlotProfiles.containsKey(i)) {
-                return i;
+                needRequestResource.add(resourceProfile.get(i));
+            }
+        }
+        return needRequestResource;
+    }
+
+    private List<CompletableFuture<SlotAndWorkerProfile>> requestSlots(
+            List<ResourceProfile> requestProfile) {
+        List<CompletableFuture<SlotAndWorkerProfile>> allRequestFuture = new 
ArrayList<>();
+        for (int i = 0; i < requestProfile.size(); i++) {
+            ResourceProfile r = requestProfile.get(i);
+            Optional<WorkerProfile> workerProfile = preCheckWorkerResource(r);
+            if (workerProfile.isPresent()) {
+                // request slot to member
+                CompletableFuture<SlotAndWorkerProfile> 
internalCompletableFuture =
+                        singleResourceRequestToMember(i, r, 
workerProfile.get());
+                allRequestFuture.add(internalCompletableFuture);
+            } else {
+                // if no worker can provide the resource, we should return a 
failed future
+                LOGGER.fine("pre check worker resource failed, can't apply 
resource request: " + r);
+                allRequestFuture.add(
+                        CompletableFuture.supplyAsync(
+                                () -> {
+                                    throw new NoEnoughResourceException(
+                                            "can't apply resource request: " + 
r);
+                                }));
             }
         }
-        return -1;
+        return allRequestFuture;
     }
 
     private void completeRequestWithException(Throwable e) {
@@ -125,6 +172,7 @@ public class ResourceRequestHandler {
     }
 
     private void addSlotToCacheMap(int index, SlotProfile slotProfile) {
+        // null value means the slot request failed, no suitable slot found
         if (null != slotProfile) {
             resultSlotProfiles.put(index, slotProfile);
             if (resultSlotProfiles.size() == resourceProfile.size()) {
@@ -134,6 +182,8 @@ public class ResourceRequestHandler {
                 }
                 completableFuture.complete(value);
             }
+        } else {
+            LOGGER.fine("no suitable slot found for resource: " + 
resourceProfile.get(index));
         }
     }
 
@@ -155,7 +205,8 @@ public class ResourceRequestHandler {
                         }));
     }
 
-    private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+    @VisibleForTesting
+    public Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
         // Shuffle the order to ensure random selection of workers
         List<WorkerProfile> workerProfiles =
                 Arrays.asList(registerWorker.values().toArray(new 
WorkerProfile[0]));
@@ -176,6 +227,7 @@ public class ResourceRequestHandler {
             // Check if there are still unassigned resources
             workerProfile =
                     workerProfiles.stream()
+                            .filter(WorkerProfile::isDynamicSlot)
                             .filter(worker -> 
worker.getUnassignedResource().enoughThan(r))
                             .findAny();
         }
@@ -217,11 +269,13 @@ public class ResourceRequestHandler {
 
     private void releaseAllResourceInternal() {
         LOGGER.warning("apply resource not success, release all already 
applied resource");
-        resultSlotProfiles.values().stream()
-                .filter(Objects::nonNull)
+        new ArrayList<>(resultSlotProfiles.keySet())
                 .forEach(
-                        profile -> {
-                            resourceManager.releaseResource(jobId, profile);
+                        index -> {
+                            SlotProfile profile = 
resultSlotProfiles.remove(index);
+                            if (profile != null) {
+                                resourceManager.releaseResource(jobId, 
profile);
+                            }
                         });
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
index e812815746..2d2a0d0416 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/ReleaseSlotOperation.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.engine.server.resourcemanager.opeartion;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 import 
org.apache.seatunnel.engine.server.service.slot.WrongTargetSlotException;
 
@@ -36,6 +37,7 @@ public class ReleaseSlotOperation extends Operation 
implements IdentifiedDataSer
 
     private long jobID;
     private SlotProfile slotProfile;
+    private WorkerProfile result;
 
     public ReleaseSlotOperation() {}
 
@@ -56,6 +58,12 @@ public class ReleaseSlotOperation extends Operation 
implements IdentifiedDataSer
                     slotProfile,
                     ExceptionUtils.getMessage(ignore));
         }
+        result = server.getSlotService().getWorkerProfile();
+    }
+
+    @Override
+    public Object getResponse() {
+        return result;
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 291df1f1f8..0d0f8c8054 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -44,6 +44,8 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
 
     private ResourceProfile unassignedResource;
 
+    private boolean dynamicSlot;
+
     private SlotProfile[] assignedSlots;
 
     private SlotProfile[] unassignedSlots;
@@ -82,6 +84,7 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
         for (SlotProfile unassignedSlot : unassignedSlots) {
             out.writeObject(unassignedSlot);
         }
+        out.writeBoolean(dynamicSlot);
     }
 
     @Override
@@ -99,5 +102,6 @@ public class WorkerProfile implements 
IdentifiedDataSerializable {
         for (int i = 0; i < unassignedSlots.length; i++) {
             unassignedSlots[i] = in.readObject();
         }
+        dynamicSlot = in.readBoolean();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 250a6f2eb4..7c0ae38bfd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -196,6 +196,11 @@ public class DefaultSlotService implements SlotService {
         }
     }
 
+    /**
+     * Select the best match slot for the profile.
+     *
+     * @return the best match slot, null if no suitable slot found.
+     */
     private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
         if (unassignedSlots.isEmpty() && !config.isDynamicSlot()) {
             return null;
@@ -259,6 +264,7 @@ public class DefaultSlotService implements SlotService {
         workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new 
SlotProfile[0]));
         workerProfile.setUnassignedResource(unassignedResource.get());
         
workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
+        workerProfile.setDynamicSlot(config.isDynamicSlot());
         return workerProfile;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index dc9a46d63c..a9168518e7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -31,6 +31,7 @@ public class SlotAndWorkerProfile implements 
IdentifiedDataSerializable {
 
     private WorkerProfile workerProfile;
 
+    // null value means the slot request failed, no suitable slot found
     private SlotProfile slotProfile;
 
     public SlotAndWorkerProfile() {}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
index 8d45ef2d49..0118c15879 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManager.java
@@ -42,42 +42,28 @@ public class FakeResourceManager extends 
AbstractResourceManager {
     @Override
     public void init() {
         try {
-            Address address1 = new Address("localhost", 5801);
-            WorkerProfile workerProfile1 =
-                    new WorkerProfile(
-                            address1,
-                            new ResourceProfile(),
-                            new ResourceProfile(),
-                            new SlotProfile[] {},
-                            new SlotProfile[] {},
-                            Collections.emptyMap());
-            this.registerWorker.put(address1, workerProfile1);
-
-            Address address2 = new Address("localhost", 5802);
-            WorkerProfile workerProfile2 =
-                    new WorkerProfile(
-                            address2,
-                            new ResourceProfile(),
-                            new ResourceProfile(),
-                            new SlotProfile[] {},
-                            new SlotProfile[] {},
-                            Collections.emptyMap());
-            this.registerWorker.put(address2, workerProfile2);
-            Address address3 = new Address("localhost", 5803);
-            WorkerProfile workerProfile3 =
-                    new WorkerProfile(
-                            address3,
-                            new ResourceProfile(),
-                            new ResourceProfile(),
-                            new SlotProfile[] {},
-                            new SlotProfile[] {},
-                            Collections.emptyMap());
-            this.registerWorker.put(address3, workerProfile3);
+            generateWorker(5801);
+            generateWorker(5802);
+            generateWorker(5803);
         } catch (UnknownHostException e) {
             throw new RuntimeException(e);
         }
     }
 
+    private void generateWorker(int port) throws UnknownHostException {
+        Address address = new Address("localhost", port);
+        WorkerProfile workerProfile =
+                new WorkerProfile(
+                        address,
+                        new ResourceProfile(),
+                        new ResourceProfile(),
+                        true,
+                        new SlotProfile[] {},
+                        new SlotProfile[] {},
+                        Collections.emptyMap());
+        this.registerWorker.put(address, workerProfile);
+    }
+
     @Override
     protected <E> CompletableFuture<E> sendToMember(Operation operation, 
Address address) {
         if (operation instanceof RequestSlotOperation) {
@@ -88,6 +74,7 @@ public class FakeResourceManager extends 
AbstractResourceManager {
                                             address,
                                             new ResourceProfile(),
                                             new ResourceProfile(),
+                                            true,
                                             new SlotProfile[] {},
                                             new SlotProfile[] {},
                                             Collections.emptyMap()),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
new file mode 100644
index 0000000000..1dc427e4f8
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FakeResourceManagerForRequestSlotRetryTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager;
+
+import org.apache.seatunnel.engine.common.config.EngineConfig;
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
+import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Used to test ResourceManager, override init method to register more 
workers. */
+public class FakeResourceManagerForRequestSlotRetryTest extends 
AbstractResourceManager {
+
+    private final int newWorkerCount;
+    private final int noSlotWorkerCount;
+    private final AtomicInteger queryIndex = new AtomicInteger(0);
+
+    private final Set<Address> cannotRequestAddress = new HashSet<>();
+
+    public FakeResourceManagerForRequestSlotRetryTest(
+            NodeEngine nodeEngine, int newWorkerCount, int noSlotWorkerCount) {
+        super(nodeEngine, new EngineConfig());
+        this.newWorkerCount = newWorkerCount;
+        this.noSlotWorkerCount = noSlotWorkerCount;
+        init();
+    }
+
+    @Override
+    public void init() {
+        try {
+            for (int i = 0; i < newWorkerCount; i++) {
+                generateWorker(5801 + i);
+            }
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void generateWorker(int port) throws UnknownHostException {
+        Address address = new Address("localhost", port);
+        WorkerProfile workerProfile =
+                new WorkerProfile(
+                        address,
+                        new ResourceProfile(),
+                        new ResourceProfile(),
+                        false,
+                        new SlotProfile[] {},
+                        new SlotProfile[] {
+                            new SlotProfile(address, 1, new ResourceProfile(), 
""),
+                            new SlotProfile(address, 2, new ResourceProfile(), 
"")
+                        },
+                        Collections.emptyMap());
+        this.registerWorker.put(address, workerProfile);
+    }
+
+    @Override
+    protected <E> CompletableFuture<E> sendToMember(Operation operation, 
Address address) {
+        if (operation instanceof RequestSlotOperation) {
+            if (cannotRequestAddress.contains(address)) {
+                throw new IllegalStateException("Cannot request slot for " + 
address);
+            }
+            if (queryIndex.getAndIncrement() < noSlotWorkerCount) {
+                cannotRequestAddress.add(address);
+                // query will return empty slot
+                return (CompletableFuture<E>)
+                        CompletableFuture.completedFuture(
+                                new SlotAndWorkerProfile(
+                                        new WorkerProfile(
+                                                address,
+                                                new ResourceProfile(),
+                                                new ResourceProfile(),
+                                                false,
+                                                new SlotProfile[] {
+                                                    new SlotProfile(
+                                                            address, 1, new 
ResourceProfile(), ""),
+                                                    new SlotProfile(
+                                                            address, 2, new 
ResourceProfile(), "")
+                                                },
+                                                // no unassigned slot
+                                                new SlotProfile[] {},
+                                                Collections.emptyMap()),
+                                        null));
+            }
+            return (CompletableFuture<E>)
+                    CompletableFuture.completedFuture(
+                            new SlotAndWorkerProfile(
+                                    new WorkerProfile(
+                                            address,
+                                            new ResourceProfile(),
+                                            new ResourceProfile(),
+                                            false,
+                                            new SlotProfile[] {
+                                                new SlotProfile(
+                                                        address, 1, new 
ResourceProfile(), "")
+                                            },
+                                            new SlotProfile[] {
+                                                new SlotProfile(
+                                                        address, 3, new 
ResourceProfile(), "")
+                                            },
+                                            Collections.emptyMap()),
+                                    new SlotProfile(address, 2, new 
ResourceProfile(), "")));
+        } else {
+            return super.sendToMember(operation, address);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
index 2589e6530c..abd4ccdc09 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -29,9 +30,14 @@ import org.junit.jupiter.api.Test;
 
 import com.hazelcast.cluster.Address;
 
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
@@ -87,23 +93,110 @@ public class ResourceManagerTest extends 
AbstractSeaTunnelServerTest<ResourceMan
     public void testApplyResourceWithRandomResult()
             throws ExecutionException, InterruptedException {
         FakeResourceManager resourceManager = new 
FakeResourceManager(nodeEngine);
+        boolean hasDifferentWorker = false;
+        for (int i = 0; i < 5; i++) {
+            List<ResourceProfile> resourceProfiles = new ArrayList<>();
+            resourceProfiles.add(new ResourceProfile());
+            resourceProfiles.add(new ResourceProfile());
+            resourceProfiles.add(new ResourceProfile());
+            resourceProfiles.add(new ResourceProfile());
+            resourceProfiles.add(new ResourceProfile());
+            List<SlotProfile> slotProfiles =
+                    resourceManager.applyResources(1L, resourceProfiles, 
null).get();
+            Assertions.assertEquals(slotProfiles.size(), 5);
+            Set<Address> addresses =
+                    
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
+            hasDifferentWorker |= addresses.size() > 1;
+        }
+        Assertions.assertTrue(hasDifferentWorker, "should have different 
worker for each slot");
+    }
 
+    @Test
+    public void testApplyResourceWithRetryWhenSameNodeNoSlotSuited()
+            throws ExecutionException, InterruptedException {
+        // test retry request slot times 1
+        FakeResourceManagerForRequestSlotRetryTest resourceManager =
+                new FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2, 
1);
         List<ResourceProfile> resourceProfiles = new ArrayList<>();
         resourceProfiles.add(new ResourceProfile());
         resourceProfiles.add(new ResourceProfile());
-        resourceProfiles.add(new ResourceProfile());
-        resourceProfiles.add(new ResourceProfile());
-        resourceProfiles.add(new ResourceProfile());
         List<SlotProfile> slotProfiles =
                 resourceManager.applyResources(1L, resourceProfiles, 
null).get();
-        Assertions.assertEquals(slotProfiles.size(), 5);
+        Assertions.assertEquals(slotProfiles.size(), 2);
 
-        boolean hasDifferentWorker = false;
-        for (int i = 0; i < 5; i++) {
-            Set<Address> addresses =
-                    
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
-            hasDifferentWorker = addresses.size() > 1;
-        }
-        Assertions.assertTrue(hasDifferentWorker, "should have different 
worker for each slot");
+        // test retry request slot time 2 but no enough slot with worker
+        resourceManager = new 
FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 2, 2);
+        FakeResourceManagerForRequestSlotRetryTest finalResourceManager = 
resourceManager;
+        List<ResourceProfile> finalResourceProfiles = resourceProfiles;
+        ExecutionException exception =
+                Assertions.assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                finalResourceManager
+                                        .applyResources(1L, 
finalResourceProfiles, null)
+                                        .get());
+        Assertions.assertInstanceOf(NoEnoughResourceException.class, 
exception.getCause());
+
+        // test retry request slot time 4 so that more than max retry times
+        resourceProfiles = new ArrayList<>();
+        resourceProfiles.add(new ResourceProfile());
+        resourceManager = new 
FakeResourceManagerForRequestSlotRetryTest(nodeEngine, 5, 4);
+        List<ResourceProfile> finalResourceProfiles2 = resourceProfiles;
+        FakeResourceManagerForRequestSlotRetryTest finalResourceManager2 = 
resourceManager;
+        ExecutionException exception2 =
+                Assertions.assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                finalResourceManager2
+                                        .applyResources(1L, 
finalResourceProfiles2, null)
+                                        .get());
+        Assertions.assertInstanceOf(
+                NoEnoughResourceException.class, 
exception2.getCause().getCause());
+        Assertions.assertEquals(
+                "can't apply resource request with retry times: 3",
+                exception2.getCause().getCause().getMessage());
+    }
+
+    @Test
+    public void testPreCheckWorkerResourceWithDynamicSlot() throws 
UnknownHostException {
+        testPreCheckWorkerResource(true);
+        testPreCheckWorkerResource(false);
+    }
+
+    public void testPreCheckWorkerResource(boolean dynamicSlot) throws 
UnknownHostException {
+        List<ResourceProfile> resourceProfiles = new ArrayList<>();
+        resourceProfiles.add(new ResourceProfile());
+        ConcurrentMap<Address, WorkerProfile> registerWorker = new 
ConcurrentHashMap<>();
+        Address address1 = new Address("localhost", 5801);
+        WorkerProfile workerProfile1 =
+                new WorkerProfile(
+                        address1,
+                        new ResourceProfile(),
+                        new ResourceProfile(),
+                        dynamicSlot,
+                        new SlotProfile[] {},
+                        new SlotProfile[] {},
+                        Collections.emptyMap());
+        registerWorker.put(address1, workerProfile1);
+
+        Address address2 = new Address("localhost", 5802);
+        WorkerProfile workerProfile2 =
+                new WorkerProfile(
+                        address2,
+                        new ResourceProfile(),
+                        new ResourceProfile(),
+                        dynamicSlot,
+                        new SlotProfile[] {},
+                        new SlotProfile[] {},
+                        Collections.emptyMap());
+        registerWorker.put(address2, workerProfile2);
+        Optional<WorkerProfile> result =
+                new ResourceRequestHandler(
+                                jobId,
+                                resourceProfiles,
+                                registerWorker,
+                                (AbstractResourceManager) this.resourceManager)
+                        .preCheckWorkerResource(new ResourceProfile());
+        Assertions.assertEquals(result.isPresent(), dynamicSlot);
     }
 }


Reply via email to