carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334180018
########## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ########## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** - * Test basic operations. + * Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; - TypeSerializer<Long> namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer<String> stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map<Long, Map<Integer, String>> referenceStates = new HashMap<>(); - int totalSize = 0; + /** + * Test remove existing state. + */ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** + * Test remove and get existing state. + */ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** + * Test remove absent state. + */ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** + * Test remove previously removed state. + */ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry<Long, Set<Integer>> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); Review comment: Let me try to improve... It seems sometimes we suggest to use functions to reduce duplicated codes and the other cases we are against it. Hopefully I could get better known about the standard/convention. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services