dmvk commented on a change in pull request #18901: URL: https://github.com/apache/flink/pull/18901#discussion_r814721306
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws Exception { public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); - - return kubeClient - .checkAndUpdateConfigMap( - configMapName, + return updateConfigMap( configMap -> { - if (isValidOperation(configMap)) { - final String content = configMap.getData().remove(key); - if (content != null) { - try { - stateHandleRefer.set(deserializeObject(content)); - } catch (IOException e) { - LOG.warn( - "Could not retrieve the state handle of {} from ConfigMap {}.", - key, - configMapName, - e); + final String content = configMap.getData().get(key); + if (content != null) { + try { + final StateHandleWithDeleteMarker<T> result = + deserializeStateHandle(content); + if (!result.isMarkedForDeletion()) { + configMap + .getData() + .put( + key, + encodeStateHandle( + InstantiationUtil.serializeObject( + result.toDeleting()))); } + stateHandleRefer.set(result.getInner()); + } catch (IOException e) { + LOG.warn( + "Could not retrieve the state handle of {} from ConfigMap {}.", + key, + configMapName, + e); + // TODO log / comment + Objects.requireNonNull(configMap.getData().remove(key)); } - return Optional.of(configMap); } - return Optional.empty(); + return Optional.of(configMap); }) - .whenComplete( - (succeed, ignore) -> { - if (succeed) { - if (stateHandleRefer.get() != null) { - try { - stateHandleRefer.get().discardState(); - } catch (Exception e) { - throw new CompletionException(e); - } + .thenCompose( + updated -> { + // We don't care whether the configmap has been updated or not Review comment: I think that's not enough, we should also provide an explanation why we can ignore this, because it's not simple to reason about (will add this) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org