[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390205#comment-16390205 ]
ASF GitHub Bot commented on FLINK-8487: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5656#discussion_r172980021 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestLogger { + + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + + haStorageDir = temporaryFolder.newFolder(); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + // we have to manage this manually because we have to create the ZooKeeper server + // ahead of this + miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + NUM_TMS, + NUM_SLOTS_PER_TM)); + + miniClusterResource.before(); + } + + @AfterClass + public static void tearDown() throws Exception { + miniClusterResource.after(); + + zkServer.stop(); + zkServer.close(); + } + + /** + * Verify that we don't start a job from scratch if we cannot restore any of the + * CompletedCheckpoints. + * + * <p>Synchronization for the different steps and things we want to observe happens via + * latches in the test method and the methods of {@link CheckpointBlockingFunction}. + * + * <p>The test follows these steps: + * <ol> + * <li>Start job and block on a latch until we have done some checkpoints + * <li>Block in the special function + * <li>Move away the contents of the ZooKeeper HA directory to make restoring from + * checkpoints impossible + * <li>Unblock the special function, which now induces a failure + * <li>Make sure that the job does not recover successfully + * <li>Move back the HA directory + * <li>Make sure that the job recovers, we use a latch to ensure that the operator + * restored successfully + * </ol> + */ + @Test(timeout = 120_000L) + public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1); + CheckpointBlockingFunction.successfulRestores.set(0); + CheckpointBlockingFunction.illegalRestores.set(0); + CheckpointBlockingFunction.afterMessWithZooKeeper.set(false); + CheckpointBlockingFunction.failedAlready.set(false); + + waitForCheckpointLatch = new OneShotLatch(); + failInCheckpointLatch = new OneShotLatch(); + successfulRestoreLatch = new OneShotLatch(); + + ClusterClient<?> clusterClient = miniClusterResource.getClusterClient(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); + env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + + File checkpointLocation = temporaryFolder.newFolder(); + env.setStateBackend((StateBackend) new FsStateBackend(checkpointLocation.toURI())); + + DataStreamSource<String> source = env.addSource(new UnboundedSource()); + + source + .keyBy((str) -> str) + .map(new CheckpointBlockingFunction()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, ZooKeeperHighAvailabilityITCase.class.getClassLoader()); + + // wait until we did some checkpoints + waitForCheckpointLatch.await(); + + // mess with the HA directory so that the job cannot restore + File movedCheckpointLocation = temporaryFolder.newFolder(); + int numCheckpoints = 0; + File[] files = haStorageDir.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName()))); + numCheckpoints++; + } + } + // Note to future developers: This will break when we change Flink to not put the + // checkpoint metadata into the HA directory but instead rely on the fact that the + // actual checkpoint directory on DFS contains the checkpoint metadata. In this case, + // ZooKeeper will only contain a "handle" (read: String) that points to the metadata + // in DFS. The likely solution will be that we have to go directly to ZooKeeper, find + // out where the checkpoint is stored and mess with that. --- End diff -- The solution would be to rename the checkpoint directory directly. > State loss after multiple restart attempts > ------------------------------------------ > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.2 > Reporter: Fabian Hueske > Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .................. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)