carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334190093
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##########
 @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, byte[] value, boo
         * @return the old state. Null will be returned if key does not exist 
or returnOldState is false.
         */
        private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, boolean returnOldState) {
+               Tuple4<Long, Long, Boolean, S> result = 
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+                       (tuple3, isRemoved) -> {
+                               long prevNode = tuple3.f0;
+                               long currentNode = tuple3.f1;
+                               long nextNode = tuple3.f2;
+                               // if the node has been logically removed, and 
can not be physically
+                               // removed here, just return null
+                               if (isRemoved && highestRequiredSnapshotVersion 
!= 0) {
+                                       return null;
+                               }
+
+                               long oldValuePointer;
+                               boolean oldValueNeedFree;
+
+                               if (highestRequiredSnapshotVersion == 0) {
+                                       // do physically remove only when there 
is no snapshot running
+                                       oldValuePointer = 
doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode);
+                                       // the node has been logically removed, 
and remove it from the set
+                                       if (isRemoved) {
+                                               
logicallyRemovedNodes.remove(currentNode);
+                                       }
+                                       oldValueNeedFree = true;
+                               } else {
+                                       int version = 
SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator);
+                                       if (version < 
highestRequiredSnapshotVersion) {
+                                               // the newest-version value may 
be used by snapshots, and update it with copy-on-write
+                                               oldValuePointer = 
updateValueWithCopyOnWrite(currentNode, null);
+                                               oldValueNeedFree = false;
+                                       } else {
+                                               // replace the newest-version 
value.
+                                               oldValuePointer = 
updateValueWithReplace(currentNode, null);
+                                               oldValueNeedFree = true;
+                                       }
+
+                                       helpSetNodeStatus(currentNode, 
NodeStatus.REMOVE);
+                                       logicallyRemovedNodes.add(currentNode);
+                               }
+
+                               S oldState = null;
+                               if (returnOldState) {
+                                       oldState = 
helpGetState(oldValuePointer);
+                               }
+
+                               if (oldValueNeedFree) {
+                                       spaceAllocator.free(oldValuePointer);
+                               }
+
+                               return oldState;
+                       });
+               return result.f2 ? result.f3 : null;
+       }
+
+       /**
+        * Iterator the skip list and perform given function.
+        *
+        * @param keyByteBuffer byte buffer storing the key.
+        * @param keyOffset offset of the key.
+        * @param keyLen length of the key.
+        * @param function the function to apply when the skip list contains 
the given key, which accepts two
+        *                 parameters: a tuple3 of [previous_node, 
current_node, next_node] and a boolean indicating
+        *                 whether the node with same key has been logically 
removed, and returns a state.
+        * @return a tuple4 of [previous_node, current_node, key_found, 
state_by_applying_function]
+        */
+       private Tuple4<Long, Long, Boolean, S> iterateAndProcess(
+                       ByteBuffer keyByteBuffer,
+                       int keyOffset,
+                       int keyLen,
+                       BiFunction<Tuple3<Long, Long, Long>, Boolean, S> 
function) {
 
 Review comment:
   Maybe leaving the boolean out is better, please check the updated codes and 
let me know how it looks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to