dmvk commented on a change in pull request #18901: URL: https://github.com/apache/flink/pull/18901#discussion_r815896349
########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception { } }; } + + @Test + public void testReleaseAndTryRemoveIsIdempotent() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final RuntimeException discardException = + new RuntimeException("Test exception."); + store.addAndLock( + key, + new TestingLongStateHandleHelper.LongStateHandle(2L) { + + final AtomicBoolean thrown = new AtomicBoolean(false); + + @Override + public void discardState() { + if (!thrown.getAndSet(true)) { + throw discardException; + } + super.discardState(); + } + }); + + assertThat(store.getAllAndLock().size(), is(1)); + assertThat( + store.getAndLock(key), + is(notNullValue(RetrievableStateHandle.class))); + + // First remove attempt should fail when we're discarding the underlying + // state. + final Exception exception = + assertThrows( + Exception.class, () -> store.releaseAndTryRemove(key)); + assertThat(exception, FlinkMatchers.containsCause(discardException)); + + // Now we should see that the node is "soft-deleted". This means it can + // no longer be accessed by the get methods, but the underlying state + // still exists. + assertThat(store.getAllAndLock().size(), is(0)); + assertThrows(Exception.class, () -> store.getAndLock(key)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); + assertThat(getLeaderConfigMap().getData().containsKey(key), is(true)); + + // Second retry should succeed and remove the underlying state and the + // reference in config map. + assertThat(store.releaseAndTryRemove(key), is(true)); + assertThat(store.getAllAndLock().size(), is(0)); + assertThrows(Exception.class, () -> store.getAndLock(key)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1)); + assertThat(getLeaderConfigMap().getData().containsKey(key), is(false)); + }); + } + }; + } + + @Test + public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final int numKeys = 10; + + final RuntimeException discardException = + new RuntimeException("Test exception."); + for (int idx = 0; idx < numKeys; idx++) { Review comment: I've made every odd state to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org