dmvk commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r815896349



##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+                                        final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                        @Override
+                                        public void discardState() {
+                                            if (!thrown.getAndSet(true)) {
+                                                throw discardException;
+                                            }
+                                            super.discardState();
+                                        }
+                                    });
+
+                            assertThat(store.getAllAndLock().size(), is(1));
+                            assertThat(
+                                    store.getAndLock(key),
+                                    
is(notNullValue(RetrievableStateHandle.class)));
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(
+                                            Exception.class, () -> 
store.releaseAndTryRemove(key));
+                            assertThat(exception, 
FlinkMatchers.containsCause(discardException));
+
+                            // Now we should see that the node is 
"soft-deleted". This means it can
+                            // no longer be accessed by the get methods, but 
the underlying state
+                            // still exists.
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+                            // Second retry should succeed and remove the 
underlying state and the
+                            // reference in config map.
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            final int numKeys = 10;
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            for (int idx = 0; idx < numKeys; idx++) {

Review comment:
       I've made every odd state to fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to