dmvk commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r814721306



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -379,39 +456,51 @@ public StringResourceVersion exists(String key) throws 
Exception {
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = 
new AtomicReference<>();
-
-        return kubeClient
-                .checkAndUpdateConfigMap(
-                        configMapName,
+        return updateConfigMap(
                         configMap -> {
-                            if (isValidOperation(configMap)) {
-                                final String content = 
configMap.getData().remove(key);
-                                if (content != null) {
-                                    try {
-                                        
stateHandleRefer.set(deserializeObject(content));
-                                    } catch (IOException e) {
-                                        LOG.warn(
-                                                "Could not retrieve the state 
handle of {} from ConfigMap {}.",
-                                                key,
-                                                configMapName,
-                                                e);
+                            final String content = 
configMap.getData().get(key);
+                            if (content != null) {
+                                try {
+                                    final StateHandleWithDeleteMarker<T> 
result =
+                                            deserializeStateHandle(content);
+                                    if (!result.isMarkedForDeletion()) {
+                                        configMap
+                                                .getData()
+                                                .put(
+                                                        key,
+                                                        encodeStateHandle(
+                                                                
InstantiationUtil.serializeObject(
+                                                                        
result.toDeleting())));
                                     }
+                                    stateHandleRefer.set(result.getInner());
+                                } catch (IOException e) {
+                                    LOG.warn(
+                                            "Could not retrieve the state 
handle of {} from ConfigMap {}.",
+                                            key,
+                                            configMapName,
+                                            e);
+                                    // TODO log / comment
+                                    
Objects.requireNonNull(configMap.getData().remove(key));
                                 }
-                                return Optional.of(configMap);
                             }
-                            return Optional.empty();
+                            return Optional.of(configMap);
                         })
-                .whenComplete(
-                        (succeed, ignore) -> {
-                            if (succeed) {
-                                if (stateHandleRefer.get() != null) {
-                                    try {
-                                        stateHandleRefer.get().discardState();
-                                    } catch (Exception e) {
-                                        throw new CompletionException(e);
-                                    }
+                .thenCompose(
+                        updated -> {
+                            // We don't care whether the configmap has been 
updated or not

Review comment:
       I think that's not enough, we should also provide an explanation why we 
can ignore this, because it's not simple to reason about (will add this)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to