[ https://issues.apache.org/jira/browse/FLINK-8487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391171#comment-16391171 ]
ASF GitHub Bot commented on FLINK-8487: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5654#discussion_r173143151 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + + haStorageDir = temporaryFolder.newFolder(); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = TestBaseUtils.startCluster(config, false); + } + + @AfterClass + public static void tearDown() throws Exception { + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + + zkServer.stop(); + zkServer.close(); + } + + /** + * Verify that we don't start a job from scratch if we cannot restore any of the + * CompletedCheckpoints. + * + * <p>Synchronization for the different steps and things we want to observe happens via + * latches in the test method and the methods of {@link CheckpointBlockingFunction}. + * + * <p>The test follows these steps: + * <ol> + * <li>Start job and block on a latch until we have done some checkpoints + * <li>Block in the special function + * <li>Move away the contents of the ZooKeeper HA directory to make restoring from + * checkpoints impossible + * <li>Unblock the special function, which now induces a failure + * <li>Make sure that the job does not recover successfully + * <li>Move back the HA directory + * <li>Make sure that the job recovers, we use a latch to ensure that the operator + * restored successfully + * </ol> + */ + @Test(timeout = 120_000L) + public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1); + CheckpointBlockingFunction.successfulRestores.set(0); + CheckpointBlockingFunction.illegalRestores.set(0); + CheckpointBlockingFunction.afterMessWithZooKeeper.set(false); + CheckpointBlockingFunction.failedAlready.set(false); + + waitForCheckpointLatch = new OneShotLatch(); + failInCheckpointLatch = new OneShotLatch(); + successfulRestoreLatch = new OneShotLatch(); + + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); + env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + + File checkpointLocation = temporaryFolder.newFolder(); + env.setStateBackend(new FsStateBackend(checkpointLocation.toURI())); + + DataStreamSource<String> source = env.addSource(new UnboundedSource()); + + source + .keyBy(new KeySelector<String, String>() { + @Override + public String getKey(String value) { + return value; + } + }) + .map(new CheckpointBlockingFunction()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + // Retrieve the job manager + final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft()); + + cluster.submitJobDetached(jobGraph); + + // wait until we did some checkpoints + waitForCheckpointLatch.await(); + + // mess with the HA directory so that the job cannot restore + File movedCheckpointLocation = temporaryFolder.newFolder(); + int numCheckpoints = 0; + File[] files = haStorageDir.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName()))); + numCheckpoints++; + } + } + assertTrue(numCheckpoints > 0); + + failInCheckpointLatch.trigger(); + + // Ensure that we see at least one cycle where the job tries to restart and fails. + Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful( + new Callable<Future<JobStatus>>() { + @Override + public Future<JobStatus> call(){ + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction<JobStatus>() { + @Override + public boolean filter(JobStatus jobStatus){ + return jobStatus == JobStatus.RESTARTING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.RESTARTING, jobStatusFuture.get()); + + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable<Future<JobStatus>>() { + @Override + public Future<JobStatus> call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction<JobStatus>() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FAILING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FAILING, jobStatusFuture.get()); + + // move back the HA directory so that the job can restore + CheckpointBlockingFunction.afterMessWithZooKeeper.set(true); + + files = movedCheckpointLocation.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(haStorageDir, file.getName()))); + } + } + + // now the job should be able to go to RUNNING again and then eventually to FINISHED + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable<Future<JobStatus>>() { + @Override + public Future<JobStatus> call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction<JobStatus>() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FINISHED; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FINISHED, jobStatusFuture.get()); + + // make sure we saw a successful restore + successfulRestoreLatch.await(); + + assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0)); --- End diff -- I think it is necessary, without more in-depth re-writing of the test, see comment above. Regarding `assertThat`, I like it because it reads more like a sentence but I can also change it. > State loss after multiple restart attempts > ------------------------------------------ > > Key: FLINK-8487 > URL: https://issues.apache.org/jira/browse/FLINK-8487 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.3.2 > Reporter: Fabian Hueske > Priority: Blocker > Fix For: 1.5.0, 1.4.3 > > > A user [reported this > issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list and analyzed the situation. > Scenario: > - A program that reads from Kafka and computes counts in a keyed 15 minute > tumbling window. StateBackend is RocksDB and checkpointing is enabled. > {code} > keyBy(0) > .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) > .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) > .reduce(new ReduceFunction(), new WindowFunction()) > {code} > - At some point HDFS went into a safe mode due to NameNode issues > - The following exception was thrown > {code} > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .................. > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132) > {code} > - The pipeline came back after a few restarts and checkpoint failures, after > the HDFS issues were resolved. > - It was evident that operator state was lost. Either it was the Kafka > consumer that kept on advancing it's offset between a start and the next > checkpoint failure (a minute's worth) or the the operator that had partial > aggregates was lost. > The user did some in-depth analysis (see [mail > thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]) > and might have (according to [~aljoscha]) identified the problem. > [~stefanrichte...@gmail.com], can you have a look at this issue and check if > it is relevant? -- This message was sent by Atlassian JIRA (v7.6.3#76005)