XComp commented on a change in pull request #18901: URL: https://github.com/apache/flink/pull/18901#discussion_r813145701
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -132,31 +197,27 @@ public KubernetesStateHandleStore( final RetrievableStateHandle<T> storeHandle = storage.store(state); - final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle); + final byte[] serializedStoreHandle = + serializeOrDiscard(new StateHandleWithDeleteMarker<>(storeHandle)); // initialize flag to serve the failure case boolean discardState = true; try { // a successful operation will result in the state not being discarded discardState = - !kubeClient - .checkAndUpdateConfigMap( - configMapName, - c -> { - if (isValidOperation(c)) { - if (!c.getData().containsKey(key)) { - c.getData() - .put( - key, - encodeStateHandle( - serializedStoreHandle)); - return Optional.of(c); - } else { - throw new CompletionException( - getKeyAlreadyExistException(key)); - } + !updateConfigMap( + configMap -> { + if (!configMap.getData().containsKey(key)) { + configMap + .getData() + .put( + key, + encodeStateHandle( + serializedStoreHandle)); + return Optional.of(configMap); } - return Optional.empty(); + throw new CompletionException( Review comment: Shouldn't we overwrite the old value when calling `addAndLock` on a marked-for-deletion entry instead of throwing the `AlreadyExistException`? ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -36,19 +38,34 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; /** Tests for {@link KubernetesStateHandleStore} operations. */ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase { private static final String PREFIX = "test-prefix-"; + + private static class FailingStateHandle implements StateObject { + + @Override + public void discardState() throws Exception {} + + @Override + public long getStateSize() { + return 0; + } + } + Review comment: unused ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws Exception { public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); - - return kubeClient - .checkAndUpdateConfigMap( - configMapName, + return updateConfigMap( configMap -> { - if (isValidOperation(configMap)) { - final String content = configMap.getData().remove(key); - if (content != null) { - try { - stateHandleRefer.set(deserializeObject(content)); - } catch (IOException e) { - LOG.warn( - "Could not retrieve the state handle of {} from ConfigMap {}.", - key, - configMapName, - e); + final String content = configMap.getData().get(key); + if (content != null) { + try { + final StateHandleWithDeleteMarker<T> result = + deserializeStateHandle(content); + if (!result.isMarkedForDeletion()) { + configMap + .getData() + .put( + key, + encodeStateHandle( + InstantiationUtil.serializeObject( + result.toDeleting()))); } + stateHandleRefer.set(result.getInner()); + } catch (IOException e) { + LOG.warn( + "Could not retrieve the state handle of {} from ConfigMap {}.", + key, + configMapName, + e); + // TODO log / comment + Objects.requireNonNull(configMap.getData().remove(key)); } - return Optional.of(configMap); } - return Optional.empty(); + return Optional.of(configMap); }) - .whenComplete( - (succeed, ignore) -> { - if (succeed) { - if (stateHandleRefer.get() != null) { - try { - stateHandleRefer.get().discardState(); - } catch (Exception e) { - throw new CompletionException(e); - } + .thenCompose( + updated -> { + // We don't care whether the configmap has been updated or not Review comment: nit: this comment can be avoided by just renaming the `updated` parameter of this callback into something like `ignoredUpdatedResult` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -423,38 +512,45 @@ public boolean releaseAndTryRemove(String key) throws Exception { */ @Override public void releaseAndTryRemoveAll() throws Exception { + final List<String> validKeys = new ArrayList<>(); final List<RetrievableStateHandle<T>> validStateHandles = new ArrayList<>(); - kubeClient - .checkAndUpdateConfigMap( - configMapName, - c -> { - if (isValidOperation(c)) { - final Map<String, String> updateData = new HashMap<>(c.getData()); - c.getData().entrySet().stream() - .filter(entry -> configMapKeyFilter.test(entry.getKey())) - .forEach( - entry -> { - try { - validStateHandles.add( - deserializeObject( - entry.getValue())); - updateData.remove(entry.getKey()); - } catch (IOException e) { - LOG.warn( - "ConfigMap {} contained corrupted data. Ignoring the key {}.", - configMapName, - entry.getKey()); - } - }); - c.getData().clear(); - c.getData().putAll(updateData); - return Optional.of(c); - } - return Optional.empty(); + updateConfigMap( + configMap -> { + final Map<String, String> updateData = + new HashMap<>(configMap.getData()); + configMap.getData().entrySet().stream() + .filter(entry -> configMapKeyFilter.test(entry.getKey())) + .forEach( + entry -> { + try { + final StateHandleWithDeleteMarker<T> result = + deserializeStateHandle( + entry.getValue()); + validKeys.add(entry.getKey()); + validStateHandles.add(result.getInner()); + updateData.put( + entry.getKey(), + encodeStateHandle( + InstantiationUtil + .serializeObject( + result + .toDeleting()))); + } catch (IOException e) { + LOG.warn( + "ConfigMap {} contained corrupted data. Ignoring the key {}.", Review comment: I guess, we could clean up the entry here as well, couldn't we? ...to be consistent with the `releaseAndTryRemove` implementation? ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -69,13 +75,72 @@ * the leader could update the store. Then we will completely get rid of the lock-and-release in * Zookeeper implementation. * - * @param <T> Type of state + * @param <T> Type of the state we're storing. */ public class KubernetesStateHandleStore<T extends Serializable> implements StateHandleStore<T, StringResourceVersion> { private static final Logger LOG = LoggerFactory.getLogger(KubernetesStateHandleStore.class); + private static <T extends Serializable> StateHandleWithDeleteMarker<T> deserializeStateHandle( Review comment: Reading this I was wondering whether we could just add a prefix to the Base64 content. This would enable us to filter rightaway without serializing/deserializing the data. Or is this "too hacky"? 🤔 ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception { } }; } + + @Test + public void testReleaseAndTryRemoveIsIdempotent() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final RuntimeException discardException = + new RuntimeException("Test exception."); + store.addAndLock( + key, + new TestingLongStateHandleHelper.LongStateHandle(2L) { + + final AtomicBoolean thrown = new AtomicBoolean(false); + + @Override + public void discardState() { + if (!thrown.getAndSet(true)) { + throw discardException; + } + super.discardState(); + } + }); + + assertThat(store.getAllAndLock().size(), is(1)); + assertThat( + store.getAndLock(key), + is(notNullValue(RetrievableStateHandle.class))); + + // First remove attempt should fail when we're discarding the underlying + // state. + final Exception exception = + assertThrows( + Exception.class, + () -> { + store.releaseAndTryRemove(key); + }); + assertThat(exception, FlinkMatchers.containsCause(discardException)); + + // Now we should see that the node is "soft-deleted". This means it can + // no longer be accessed by the get methods, but the underlying state + // still exists. + assertThat(store.getAllAndLock().size(), is(0)); + assertThrows(Exception.class, () -> store.getAndLock(key)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); + assertThat(getLeaderConfigMap().getData().containsKey(key), is(true)); + + // Second retry should succeed and remove the underlying state and the + // reference in config map. + assertThat(store.releaseAndTryRemove(key), is(true)); + assertThat(store.getAllAndLock().size(), is(0)); + assertThrows(Exception.class, () -> store.getAndLock(key)); + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1)); + assertThat(getLeaderConfigMap().getData().containsKey(key), is(false)); + }); + } + }; + } + + @Test + public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + final int numKeys = 10; + + final RuntimeException discardException = + new RuntimeException("Test exception."); + for (int idx = 0; idx < numKeys; idx++) { + store.addAndLock( + key + "_" + idx, + new TestingLongStateHandleHelper.LongStateHandle(idx + 1) { Review comment: FYI: As already mentioned. I extended `LongStateHandle` to cover this functionality in PR #18869 ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception { } }; } + + @Test + public void testReleaseAndTryRemoveIsIdempotent() throws Exception { Review comment: Shouldn't we at least test the `addAndLock` on a marked-for-deletion entry? 🤔 ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws Exception { public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); - - return kubeClient - .checkAndUpdateConfigMap( - configMapName, + return updateConfigMap( configMap -> { - if (isValidOperation(configMap)) { - final String content = configMap.getData().remove(key); - if (content != null) { - try { - stateHandleRefer.set(deserializeObject(content)); - } catch (IOException e) { - LOG.warn( - "Could not retrieve the state handle of {} from ConfigMap {}.", - key, - configMapName, - e); + final String content = configMap.getData().get(key); + if (content != null) { + try { + final StateHandleWithDeleteMarker<T> result = + deserializeStateHandle(content); + if (!result.isMarkedForDeletion()) { + configMap + .getData() + .put( + key, + encodeStateHandle( + InstantiationUtil.serializeObject( + result.toDeleting()))); } + stateHandleRefer.set(result.getInner()); + } catch (IOException e) { + LOG.warn( + "Could not retrieve the state handle of {} from ConfigMap {}.", Review comment: Shall we add a sentence to the comment that we're going to remove the entry? ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -208,30 +265,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) final RetrievableStateHandle<T> newStateHandle = storage.store(state); - final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + final byte[] serializedStateHandle = + serializeOrDiscard(new StateHandleWithDeleteMarker<>(newStateHandle)); // initialize flags to serve the failure case boolean discardOldState = false; boolean discardNewState = true; try { boolean success = - kubeClient - .checkAndUpdateConfigMap( - configMapName, - c -> { - if (isValidOperation(c)) { + updateConfigMap( + configMap -> { + if (isValidOperation(configMap)) { Review comment: ```suggestion ``` This is already called in `updateConfigMap` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -208,30 +265,29 @@ public void replace(String key, StringResourceVersion resourceVersion, T state) final RetrievableStateHandle<T> newStateHandle = storage.store(state); - final byte[] serializedStateHandle = serializeOrDiscard(newStateHandle); + final byte[] serializedStateHandle = + serializeOrDiscard(new StateHandleWithDeleteMarker<>(newStateHandle)); // initialize flags to serve the failure case boolean discardOldState = false; boolean discardNewState = true; try { boolean success = - kubeClient - .checkAndUpdateConfigMap( - configMapName, - c -> { - if (isValidOperation(c)) { + updateConfigMap( + configMap -> { + if (isValidOperation(configMap)) { // Check the existence - if (c.getData().containsKey(key)) { - c.getData() + if (configMap.getData().containsKey(key)) { Review comment: Just thinking loud here: This will overwrite a marked-for-deletion entry. But that's what we want here, I guess. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -725,4 +742,157 @@ public void testRemoveAllHandles() throws Exception { } }; } + + @Test + public void testReleaseAndTryRemoveIsIdempotent() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + + final RuntimeException discardException = + new RuntimeException("Test exception."); + store.addAndLock( + key, + new TestingLongStateHandleHelper.LongStateHandle(2L) { + + final AtomicBoolean thrown = new AtomicBoolean(false); + + @Override + public void discardState() { + if (!thrown.getAndSet(true)) { + throw discardException; + } + super.discardState(); + } + }); + + assertThat(store.getAllAndLock().size(), is(1)); + assertThat( + store.getAndLock(key), + is(notNullValue(RetrievableStateHandle.class))); + + // First remove attempt should fail when we're discarding the underlying + // state. + final Exception exception = + assertThrows( + Exception.class, + () -> { + store.releaseAndTryRemove(key); + }); Review comment: ```suggestion final Exception exception = assertThrows( Exception.class, () -> store.releaseAndTryRemove(key)); ``` nit: can be shortened ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -303,7 +359,12 @@ public StringResourceVersion exists(String key) throws Exception { if (optional.isPresent()) { Review comment: I couldn't annotate the location directly in the Github UI. But we should also return `notExisting()` when the key exists but it's already marked for deletion ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws Exception { public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); - - return kubeClient - .checkAndUpdateConfigMap( - configMapName, + return updateConfigMap( configMap -> { - if (isValidOperation(configMap)) { - final String content = configMap.getData().remove(key); - if (content != null) { - try { - stateHandleRefer.set(deserializeObject(content)); - } catch (IOException e) { - LOG.warn( - "Could not retrieve the state handle of {} from ConfigMap {}.", - key, - configMapName, - e); + final String content = configMap.getData().get(key); + if (content != null) { Review comment: Reverting the condition would remove one level of indentation and might improve readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org