    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.checkpointing;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    + * Integration tests for {@link 
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
    +   private static final int NUM_JMS = 1;
    +   private static final int NUM_TMS = 1;
    +   private static final int NUM_SLOTS_PER_TM = 1;
    +   @ClassRule
    +   public static final TemporaryFolder temporaryFolder = new 
    +   private static File haStorageDir;
    +   private static TestingServer zkServer;
    +   private static LocalFlinkMiniCluster cluster = null;
    +   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +   @BeforeClass
    +   public static void setup() throws Exception {
    +           zkServer = new TestingServer();
    +           Configuration config = new Configuration();
    +           config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
    +           config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
    +           config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
    +           haStorageDir = temporaryFolder.newFolder();
    +           config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
    +           config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
    +           config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
    +           config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +           cluster = TestBaseUtils.startCluster(config, false);
    +   }
    +   @AfterClass
    +   public static void tearDown() throws Exception {
    +           stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +           zkServer.stop();
    +           zkServer.close();
    +   }
    +   /**
    +    * Verify that we don't start a job from scratch if we cannot restore 
any of the
    +    * CompletedCheckpoints.
    +    *
    +    * <p>Synchronization for the different steps and things we want to 
observe happens via
    +    * latches in the test method and the methods of {@link 
    +    *
    +    * <p>The test follows these steps:
    +    * <ol>
    +    *     <li>Start job and block on a latch until we have done some 
    +    *     <li>Block in the special function
    +    *     <li>Move away the contents of the ZooKeeper HA directory to make 
restoring from
    +    *       checkpoints impossible
    +    *     <li>Unblock the special function, which now induces a failure
    +    *     <li>Make sure that the job does not recover successfully
    +    *     <li>Move back the HA directory
    +    *     <li>Make sure that the job recovers, we use a latch to ensure 
that the operator
    +    *       restored successfully
    +    * </ol>
    +    */
    +   @Test(timeout = 120_000L)
    +   public void testRestoreBehaviourWithFaultyStateHandles() throws 
Exception {
    +           CheckpointBlockingFunction.successfulRestores.set(0);
    +           CheckpointBlockingFunction.illegalRestores.set(0);
    +           CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +           CheckpointBlockingFunction.failedAlready.set(false);
    +           waitForCheckpointLatch = new OneShotLatch();
    +           failInCheckpointLatch = new OneShotLatch();
    +           successfulRestoreLatch = new OneShotLatch();
    +           final Deadline deadline = TEST_TIMEOUT.fromNow();
    +           StreamExecutionEnvironment env = 
    +           env.setParallelism(1);
    +           env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
    +           File checkpointLocation = temporaryFolder.newFolder();
    +           env.setStateBackend(new 
    +           DataStreamSource<String> source = env.addSource(new 
    +           source
    +                   .keyBy(new KeySelector<String, String>() {
    +                           @Override
    +                           public String getKey(String value) {
    +                                   return value;
    +                           }
    +                   })
    +                   .map(new CheckpointBlockingFunction());
    +           JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +           final JobID jobID = 
    +           // Retrieve the job manager
    +           final ActorGateway jobManager = 
Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +           cluster.submitJobDetached(jobGraph);
    +           // wait until we did some checkpoints
    +           waitForCheckpointLatch.await();
    +           // mess with the HA directory so that the job cannot restore
    +           File movedCheckpointLocation = temporaryFolder.newFolder();
    +           int numCheckpoints = 0;
    +           File[] files = haStorageDir.listFiles();
    +           assertNotNull(files);
    +           for (File file : files) {
    +                   if (file.getName().startsWith("completedCheckpoint")) {
    +                           assertTrue(file.renameTo(new 
File(movedCheckpointLocation, file.getName())));
    +                           numCheckpoints++;
    +                   }
    +           }
    +           assertTrue(numCheckpoints > 0);
    +           failInCheckpointLatch.trigger();
    +           // Ensure that we see at least one cycle where the job tries to 
restart and fails.
    +           Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call(){
    +                                   return getJobStatus(jobManager, jobID, 
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus){
    +                                   return jobStatus == 
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +           jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call() {
    +                                   return getJobStatus(jobManager, jobID, 
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus) {
    +                                   return jobStatus == JobStatus.FAILING;
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +           // move back the HA directory so that the job can restore
    +           CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +           files = movedCheckpointLocation.listFiles();
    +           assertNotNull(files);
    +           for (File file : files) {
    +                   if (file.getName().startsWith("completedCheckpoint")) {
    +                           assertTrue(file.renameTo(new File(haStorageDir, 
    +                   }
    +           }
    +           // now the job should be able to go to RUNNING again and then 
eventually to FINISHED
    +           jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call() {
    +                                   return getJobStatus(jobManager, jobID, 
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus) {
    +                                   return jobStatus == JobStatus.FINISHED;
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +           // make sure we saw a successful restore
    +           successfulRestoreLatch.await();
    +           assertThat("We saw illegal restores.", 
CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +   }
    +   /**
    +    * Requests the {@link JobStatus} of the job with the given {@link 
    +    */
    +   private Future<JobStatus> getJobStatus(
    +           final ActorGateway jobManager,
    +           final JobID jobId,
    +           final FiniteDuration timeout) {
    +           scala.concurrent.Future<Object> response =
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +           FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +           return flinkFuture.thenApply(new ApplyFunction<Object, 
JobStatus>() {
    +                   @Override
    +                   public JobStatus apply(Object value) {
    +                           if (value instanceof 
JobManagerMessages.CurrentJobStatus) {
    +                                   return 
((JobManagerMessages.CurrentJobStatus) value).status();
    +                           } else if (value instanceof 
JobManagerMessages.JobNotFound) {
    +                                   throw new RuntimeException(
    +                                           new 
IllegalStateException("Could not find job with JobId " + jobId));
    +                           } else {
    +                                   throw new RuntimeException(
    +                                           new 
IllegalStateException("Unknown JobManager response of type " + 
    +                           }
    +                   }
    +           });
    +   }
    +   private static class UnboundedSource implements SourceFunction<String> {
    +           private boolean running = true;
    +           @Override
    +           public void run(SourceContext<String> ctx) throws Exception {
    +                   while (running) {
    +                           ctx.collect("hello");
    +                           // don't overdo it ... ;-)
    +                           Thread.sleep(50);
    +                           if 
(CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
    +                                   break;
    +                           }
    +                   }
    +           }
    +           @Override
    +           public void cancel() {
    +                   running = false;
    +           }
    +   }
    +   private static class CheckpointBlockingFunction
    +                   extends RichMapFunction<String, String>
    +                   implements CheckpointedFunction {
    +           // verify that we only call initializeState()
    +           // once with isRestored() == false. All other invocations must 
have isRestored() == true. This
    +           // verifies that we don't restart a job from scratch in case 
the CompletedCheckpoints can't
    +           // be read.
    +           static AtomicInteger allowedInitializeCallsWithoutRestore = new 
    +           // we count when we see restores that are not allowed. We only
    +           // allow restores once we messed with the HA directory and 
moved it back again
    +           static AtomicInteger illegalRestores = new AtomicInteger(0);
    +           static AtomicInteger successfulRestores = new AtomicInteger(0);
    +           // whether we are after the phase where we messed with the 
ZooKeeper HA directory, i.e.
    +           // whether it's now ok for a restore to happen
    +           static AtomicBoolean afterMessWithZooKeeper = new 
    +           static AtomicBoolean failedAlready = new AtomicBoolean(false);
    +           // also have some state to write to the checkpoint
    +           private final ValueStateDescriptor<String> stateDescriptor =
    +                   new ValueStateDescriptor<>("state", 
    +           @Override
    +           public String map(String value) throws Exception {
    +                   return value;
    +           }
    +           @Override
    +           public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
    +                   if (context.getCheckpointId() > 5) {
    +                           waitForCheckpointLatch.trigger();
    +                           failInCheckpointLatch.await();
    +                           if (!failedAlready.getAndSet(true)) {
    +                                   throw new RuntimeException("Failing on 
    +                           }
    +                   }
    +           }
    +           @Override
    +           public void initializeState(FunctionInitializationContext 
context) {
    +                   if (!context.isRestored()) {
    No, this is exactly the thing we want to test. If we didn't have this check 
we would allow the case where ZooKeeper cannot read any of the state handles 
and will start the job from scratch.
    There might be other ways around it but I like this explicit way. What do 
you think?

> State loss after multiple restart attempts
> ------------------------------------------
>                 Key: FLINK-8487
>                 URL: https://issues.apache.org/jira/browse/FLINK-8487
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Fabian Hueske
>            Priority: Blocker
>             Fix For: 1.5.0, 1.4.3
> A user [reported this 
> issue|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list and analyzed the situation.
> Scenario:
> - A program that reads from Kafka and computes counts in a keyed 15 minute 
> tumbling window.  StateBackend is RocksDB and checkpointing is enabled.
> {code}
> keyBy(0)
>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>         .reduce(new ReduceFunction(), new WindowFunction())
> {code}
> - At some point HDFS went into a safe mode due to NameNode issues
> - The following exception was thrown
> {code}
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>  Operation category WRITE is not supported in state standby. Visit 
> https://s.apache.org/sbnn-error
>     ..................
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>         at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:111)
>         at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.createBasePath(FsCheckpointStreamFactory.java:132)
> {code}
> - The pipeline came back after a few restarts and checkpoint failures, after 
> the HDFS issues were resolved.
> - It was evident that operator state was lost. Either it was the Kafka 
> consumer that kept on advancing it's offset between a start and the next 
> checkpoint failure (a minute's worth) or the the operator that had partial 
> aggregates was lost. 
> The user did some in-depth analysis (see [mail 
> thread|https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E])
>  and might have (according to [~aljoscha]) identified the problem.
> [~stefanrichte...@gmail.com], can you have a look at this issue and check if 
> it is relevant?

