dmvk commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r817458916



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.
+     */
+    private Optional<KubernetesConfigMap> addEntry(
+            KubernetesConfigMap configMap, String key, byte[] 
serializedStateHandle)
+            throws Exception {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                if (stateHandle.isMarkedForDeletion()) {
+                    // This might be a left-over after the fail-over. As the 
remove operation is
+                    // idempotent let's try to finish it.
+                    if (!releaseAndTryRemove(key)) {
+                        throw new IllegalStateException(
+                                "Unable to remove the marked as deleting 
entry.");
+                    }
+                } else {
+                    throw getKeyAlreadyExistException(key);
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be overridden
+                // by the update code path below.
+                logInvalidEntry(key, configMapName, e);
+            }
         }
+        configMap.getData().put(key, toBase64(serializedStateHandle));
+        return Optional.of(configMap);
+    }
+
+    /**
+     * Replace the entry in the ConfigMap. If the entry already exists and 
contains delete marker,
+     * we treat is as non-existent and perform the best effort removal.
+     */
+    private Optional<KubernetesConfigMap> replaceEntry(
+            KubernetesConfigMap configMap,
+            String key,
+            byte[] serializedStateHandle,
+            AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef)
+            throws NotExistException {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                oldStateHandleRef.set(stateHandle.getInner());
+                if (stateHandle.isMarkedForDeletion()) {
+                    final NotExistException exception = 
getKeyNotExistException(key);
+                    try {
+                        // Try to finish the removal. We don't really care 
whether this succeeds or
+                        // not, from the "replace" point of view, the entry 
doesn't exist.
+                        releaseAndTryRemove(key);
+                    } catch (Exception e) {
+                        exception.addSuppressed(e);
+                    }
+                    throw exception;
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be logged by the update 
code path below.

Review comment:
       đź‘Ť 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to