xintongsong commented on a change in pull request #13871: URL: https://github.com/apache/flink/pull/13871#discussion_r518506150
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointStoreUtil.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link CheckpointStoreUtil} implementation for ZooKeeper. + * + */ +public class ZooKeeperCheckpointStoreUtil implements CheckpointStoreUtil { Review comment: The class could be singleton. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointStoreUtil.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.kubernetes.utils.Constants.CHECKPOINT_ID_KEY_PREFIX; + +/** + * {@link CheckpointStoreUtil} implementation for Kubernetes. + * + */ +public class KubernetesCheckpointStoreUtil implements CheckpointStoreUtil { Review comment: The class could be singleton. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java ########## @@ -41,22 +64,43 @@ private final T state; - private TestingRetrievableStateHandle(T state) { + private FunctionWithException<T, T, IOException> retrieveStateFunction; + + private RunnableWithException discardStateRunnable; + + private Function<T, Long> getStateSizeFunction; + + private TestingRetrievableStateHandle( + T state, + FunctionWithException<T, T, IOException> retrieveStateFunction, + RunnableWithException discardStateRunnable, + Function<T, Long> getStateSizeFunction) { this.state = state; + this.retrieveStateFunction = retrieveStateFunction; Review comment: I think we can combine these two arguments. The `state` can be covered by a `retrieveStateFunction` that always returns `state`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java ########## @@ -41,22 +64,43 @@ private final T state; - private TestingRetrievableStateHandle(T state) { + private FunctionWithException<T, T, IOException> retrieveStateFunction; + + private RunnableWithException discardStateRunnable; + + private Function<T, Long> getStateSizeFunction; Review comment: I would vote for not allowing null values. We can provide no-op functions as default and do null checks in the setters in `TestingRetrievableStateStorageHelper`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingRetrievableStateStorageHelper.java ########## @@ -41,22 +64,43 @@ private final T state; - private TestingRetrievableStateHandle(T state) { + private FunctionWithException<T, T, IOException> retrieveStateFunction; + + private RunnableWithException discardStateRunnable; + + private Function<T, Long> getStateSizeFunction; Review comment: And +1 for making them `final` here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java ########## @@ -118,33 +112,22 @@ public boolean requiresExternalizedCheckpoints() { } /** - * Gets the latest checkpoint from ZooKeeper and removes all others. - * - * <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper, - * this will only recover the latest and discard the others. Otherwise, there is no guarantee - * that the history of checkpoints is consistent. + * Recover all the valid checkpoints from state handle store. All the successfully recovered checkpoints will + * be added to {@link #completedCheckpoints} sorted by checkpoint id. */ @Override public void recover() throws Exception { - LOG.info("Recovering checkpoints from ZooKeeper."); + LOG.info("Recovering checkpoints from {}.", checkpointStateHandleStore); // Get all there is first - List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; - while (true) { - try { - initialCheckpoints = checkpointsInZooKeeper.getAll(); - break; - } - catch (ConcurrentModificationException e) { - LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); - } - } + final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints = + checkpointStateHandleStore.getAll(); Review comment: Trying to understand why the retry loop was needed before and not needed now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org