xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r552509039



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -463,7 +473,7 @@ public boolean isAllocated(int index, JobID jobId, 
AllocationID allocationId) {
         TaskSlot<T> taskSlot = taskSlots.get(index);
         if (taskSlot != null) {
             return taskSlot.isAllocated(jobId, allocationId);
-        } else if (index < 0) {
+        } else if (index >= numberSlots) {

Review comment:
       If we also insert dynamic slot to the `taskSlot`, we won't need this 
`else-if` branch anymore.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -95,6 +96,9 @@
     /** The table state. */
     private volatile State state;
 
+    /** Current index for dynamic slot, should always not less than 
numberSlots */
+    private AtomicInteger dynamicSlotIndex;

Review comment:
       I think `TaskSlotTableImpl` is not designed to be thread-safe, and 
should always be accessed from the rpc main thread. So we should not need 
`AtomicInteger` here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -321,6 +325,12 @@ public boolean allocateSlot(
             return false;
         }
 
+        // The negative index indicate that the SlotManger allocate a dynamic 
slot, we transfer the
+        // index to an increasing number not less than the numberSlots.
+        if (index < 0) {
+            index = nextDynamicSlotIndex();
+        }

Review comment:
       It's quite implicit that the method argument is overwritten in the 
middle of the method body.
   
   I would suggest the following to convert `index` into a `effectiveIndex` at 
the beginning of this method. (Or maybe rename the argument to `requestedIndex` 
and convert it to `index`). Then use the effective index for the rest of the 
method.
   
   That also means all the `index < 0` checks should be replaced with `index >= 
numberSlots`. Maybe introduce a util method `isDynamicIndex`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
+            if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+                // If the slot is a dynamic slot with expected jobId and 
allocationId, it should be
+                // treated as duplicate allocate request.
+                return true;
+            }

Review comment:
       These boolean expressions in the `if` and `return` statements have 
become quite hard to understand.
   Maybe we can wrap them into separate methods with meaningful names.
   Something like:
   ```
   if (isAllocationIdExist()) {
     return isDuplicateSlot();
   } else if (isSlotIndexTaken()) {
     return false;
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
+            if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+                // If the slot is a dynamic slot with expected jobId and 
allocationId, it should be
+                // treated as duplicate allocate request.
+                return true;
+            }

Review comment:
       I think this is a reported issue, FLINK-15660.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -329,7 +339,7 @@ public boolean allocateSlot(
                         jobId,
                         allocationId,
                         memoryVerificationExecutor);
-        if (index >= 0) {
+        if (index < numberSlots) {

Review comment:
       Now since the dynamic slots also have unique indexes, we can also insert 
them into `taskSlots`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to