dmvk commented on a change in pull request #18901: URL: https://github.com/apache/flink/pull/18901#discussion_r817459723
########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -272,6 +327,101 @@ public void testReplaceWithKeyNotExist() throws Exception { }; } + @Test + public void testReplaceWithDeletingKey() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final TestingLongStateHandleHelper.LongStateHandle oldState = + addDeletingEntry(getLeaderConfigMap(), key, 1337); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); + + assertThat(store.exists(key), is(StringResourceVersion.notExisting())); + assertThrows( + StateHandleStore.NotExistException.class, + () -> + store.replace( + key, + StringResourceVersion.notExisting(), + newState)); + + // Both handles should have been stored and discarded. + assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2)); + assertThat(oldState.isDiscarded(), is(true)); + assertThat(newState.isDiscarded(), is(true)); + }); + } + }; + } + + @Test + public void testReplaceWithDeletingKeyWithFailingDiscard() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final Exception discardException = new Exception("Unable to discard."); + final TestingLongStateHandleHelper.LongStateHandle oldState = + addDeletingEntry( + getLeaderConfigMap(), + key, + new TestingLongStateHandleHelper.LongStateHandle( + 1337, + discardIdx -> { + throw discardException; + })); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final TestingLongStateHandleHelper.LongStateHandle newState = + new TestingLongStateHandleHelper.LongStateHandle(23456L); + + assertThat(store.exists(key), is(StringResourceVersion.notExisting())); + final StateHandleStore.NotExistException exception = + assertThrows( + StateHandleStore.NotExistException.class, + () -> + store.replace( + key, + StringResourceVersion.notExisting(), + newState)); + assertThat(exception.getSuppressed().length, is(1)); + assertThat( + exception.getSuppressed()[0], + FlinkMatchers.containsCause(discardException)); + + // Only the new handle should have been stored and discarded. Review comment: đź‘Ť -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org