dmvk commented on a change in pull request #18901: URL: https://github.com/apache/flink/pull/18901#discussion_r815862606
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -509,17 +619,88 @@ public String toString() { return this.getClass().getSimpleName() + "{configMapName='" + configMapName + "'}"; } - private RetrievableStateHandle<T> deserializeObject(String content) throws IOException { - checkNotNull(content, "Content should not be null."); + private boolean isValidOperation(KubernetesConfigMap c) { + return lockIdentity == null || KubernetesLeaderElector.hasLeadership(c, lockIdentity); + } - final byte[] data = Base64.getDecoder().decode(content); + @VisibleForTesting + CompletableFuture<Boolean> updateConfigMap( + Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateFn) { + return kubeClient.checkAndUpdateConfigMap( + configMapName, + configMap -> { + if (isValidOperation(configMap)) { + return updateFn.apply(configMap); + } + return Optional.empty(); + }); + } - try { - return deserialize(data); - } catch (IOException | ClassNotFoundException e) { - throw new IOException( - "Failed to deserialize state handle from ConfigMap data " + content + '.', e); + /** + * Adds entry into the ConfigMap. If the entry already exists and contains delete marker, the + * try finish the removal before the actual update. + */ + private Optional<KubernetesConfigMap> addEntry( + KubernetesConfigMap configMap, String key, byte[] serializedStateHandle) + throws Exception { + final String content = configMap.getData().get(key); + if (content != null) { + try { + final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content); + if (stateHandle.isMarkedForDeletion()) { + // This might be a left-over after the fail-over. As the remove operation is + // idempotent let's try to finish it. + if (!releaseAndTryRemove(key)) { + throw new IllegalStateException( + "Unable to remove the marked as deleting entry."); + } + } else { + throw getKeyAlreadyExistException(key); + } + } catch (IOException e) { + // Just log the invalid entry, it will be overridden + // by the update code path below. + logInvalidEntry(key, configMapName, e); + } } + configMap.getData().put(key, toBase64(serializedStateHandle)); + return Optional.of(configMap); + } + + /** + * Replace the entry in the ConfigMap. If the entry already exists and contains delete marker, + * we treat is as non-existent and perform the best effort removal. + */ + private Optional<KubernetesConfigMap> replaceEntry( + KubernetesConfigMap configMap, + String key, + byte[] serializedStateHandle, + AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef) + throws NotExistException { + final String content = configMap.getData().get(key); + if (content != null) { + try { + final StateHandleWithDeleteMarker<T> stateHandle = deserializeStateHandle(content); + oldStateHandleRef.set(stateHandle.getInner()); + if (stateHandle.isMarkedForDeletion()) { + final NotExistException exception = getKeyNotExistException(key); + try { + // Try to finish the removal. We don't really care whether this succeeds or + // not, from the "replace" point of view, the entry doesn't exist. + releaseAndTryRemove(key); Review comment: we do in testReplaceWithDeletingKey (only for the "happy case scenario" though, I'll try to cover the removal failing as well) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org