Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173142759
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +   private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(5, TimeUnit.MINUTES);
    +
    +   private static final int NUM_JMS = 1;
    +   private static final int NUM_TMS = 1;
    +   private static final int NUM_SLOTS_PER_TM = 1;
    +
    +   @ClassRule
    +   public static final TemporaryFolder temporaryFolder = new 
TemporaryFolder();
    +
    +   private static File haStorageDir;
    +
    +   private static TestingServer zkServer;
    +
    +   private static LocalFlinkMiniCluster cluster = null;
    +
    +   private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +   private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +   private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +   @BeforeClass
    +   public static void setup() throws Exception {
    +           zkServer = new TestingServer();
    +
    +           Configuration config = new Configuration();
    +           config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
    +           config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
    +           config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
    +
    +           haStorageDir = temporaryFolder.newFolder();
    +
    +           config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haStorageDir.toString());
    +           config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
UUID.randomUUID().toString());
    +           config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
    +           config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +           cluster = TestBaseUtils.startCluster(config, false);
    +   }
    +
    +   @AfterClass
    +   public static void tearDown() throws Exception {
    +           stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +           zkServer.stop();
    +           zkServer.close();
    +   }
    +
    +   /**
    +    * Verify that we don't start a job from scratch if we cannot restore 
any of the
    +    * CompletedCheckpoints.
    +    *
    +    * <p>Synchronization for the different steps and things we want to 
observe happens via
    +    * latches in the test method and the methods of {@link 
CheckpointBlockingFunction}.
    +    *
    +    * <p>The test follows these steps:
    +    * <ol>
    +    *     <li>Start job and block on a latch until we have done some 
checkpoints
    +    *     <li>Block in the special function
    +    *     <li>Move away the contents of the ZooKeeper HA directory to make 
restoring from
    +    *       checkpoints impossible
    +    *     <li>Unblock the special function, which now induces a failure
    +    *     <li>Make sure that the job does not recover successfully
    +    *     <li>Move back the HA directory
    +    *     <li>Make sure that the job recovers, we use a latch to ensure 
that the operator
    +    *       restored successfully
    +    * </ol>
    +    */
    +   @Test(timeout = 120_000L)
    +   public void testRestoreBehaviourWithFaultyStateHandles() throws 
Exception {
    +           
CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +           CheckpointBlockingFunction.successfulRestores.set(0);
    +           CheckpointBlockingFunction.illegalRestores.set(0);
    +           CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +           CheckpointBlockingFunction.failedAlready.set(false);
    +
    +           waitForCheckpointLatch = new OneShotLatch();
    +           failInCheckpointLatch = new OneShotLatch();
    +           successfulRestoreLatch = new OneShotLatch();
    +
    +           final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(1);
    +           
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0));
    +           env.enableCheckpointing(10); // Flink doesn't allow lower than 
10 ms
    +
    +           File checkpointLocation = temporaryFolder.newFolder();
    +           env.setStateBackend(new 
FsStateBackend(checkpointLocation.toURI()));
    +
    +           DataStreamSource<String> source = env.addSource(new 
UnboundedSource());
    +
    +           source
    +                   .keyBy(new KeySelector<String, String>() {
    +                           @Override
    +                           public String getKey(String value) {
    +                                   return value;
    +                           }
    +                   })
    +                   .map(new CheckpointBlockingFunction());
    +
    +           JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +           final JobID jobID = 
Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +           // Retrieve the job manager
    +           final ActorGateway jobManager = 
Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +           cluster.submitJobDetached(jobGraph);
    +
    +           // wait until we did some checkpoints
    +           waitForCheckpointLatch.await();
    +
    +           // mess with the HA directory so that the job cannot restore
    +           File movedCheckpointLocation = temporaryFolder.newFolder();
    +           int numCheckpoints = 0;
    +           File[] files = haStorageDir.listFiles();
    +           assertNotNull(files);
    +           for (File file : files) {
    +                   if (file.getName().startsWith("completedCheckpoint")) {
    +                           assertTrue(file.renameTo(new 
File(movedCheckpointLocation, file.getName())));
    +                           numCheckpoints++;
    +                   }
    +           }
    +           assertTrue(numCheckpoints > 0);
    +
    +           failInCheckpointLatch.trigger();
    +
    +           // Ensure that we see at least one cycle where the job tries to 
restart and fails.
    +           Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call(){
    +                                   return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus){
    +                                   return jobStatus == 
JobStatus.RESTARTING;
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +           jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call() {
    +                                   return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus) {
    +                                   return jobStatus == JobStatus.FAILING;
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +           // move back the HA directory so that the job can restore
    +           CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +           files = movedCheckpointLocation.listFiles();
    +           assertNotNull(files);
    +           for (File file : files) {
    +                   if (file.getName().startsWith("completedCheckpoint")) {
    +                           assertTrue(file.renameTo(new File(haStorageDir, 
file.getName())));
    +                   }
    +           }
    +
    +           // now the job should be able to go to RUNNING again and then 
eventually to FINISHED
    +           jobStatusFuture = FutureUtils.retrySuccessful(
    +                   new Callable<Future<JobStatus>>() {
    +                           @Override
    +                           public Future<JobStatus> call() {
    +                                   return getJobStatus(jobManager, jobID, 
TEST_TIMEOUT);
    +                           }
    +                   },
    +                   new FilterFunction<JobStatus>() {
    +                           @Override
    +                           public boolean filter(JobStatus jobStatus) {
    +                                   return jobStatus == JobStatus.FINISHED;
    +                           }
    +                   },
    +                   deadline,
    +                   TestingUtils.defaultExecutor());
    +           assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +           // make sure we saw a successful restore
    +           successfulRestoreLatch.await();
    +
    +           assertThat("We saw illegal restores.", 
CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +   }
    +
    +   /**
    +    * Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
    +    */
    +   private Future<JobStatus> getJobStatus(
    +           final ActorGateway jobManager,
    +           final JobID jobId,
    +           final FiniteDuration timeout) {
    +
    +           scala.concurrent.Future<Object> response =
    +                   
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +           FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +           return flinkFuture.thenApply(new ApplyFunction<Object, 
JobStatus>() {
    +                   @Override
    +                   public JobStatus apply(Object value) {
    +                           if (value instanceof 
JobManagerMessages.CurrentJobStatus) {
    +                                   return 
((JobManagerMessages.CurrentJobStatus) value).status();
    +                           } else if (value instanceof 
JobManagerMessages.JobNotFound) {
    +                                   throw new RuntimeException(
    +                                           new 
IllegalStateException("Could not find job with JobId " + jobId));
    +                           } else {
    +                                   throw new RuntimeException(
    +                                           new 
IllegalStateException("Unknown JobManager response of type " + 
value.getClass()));
    +                           }
    +                   }
    +           });
    +   }
    +
    +   private static class UnboundedSource implements SourceFunction<String> {
    +           private boolean running = true;
    +
    +           @Override
    +           public void run(SourceContext<String> ctx) throws Exception {
    +                   while (running) {
    +                           ctx.collect("hello");
    +                           // don't overdo it ... ;-)
    +                           Thread.sleep(50);
    +                           if 
(CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
    +                                   break;
    +                           }
    +                   }
    +           }
    +
    +           @Override
    +           public void cancel() {
    +                   running = false;
    +           }
    +   }
    +
    +   private static class CheckpointBlockingFunction
    +                   extends RichMapFunction<String, String>
    +                   implements CheckpointedFunction {
    +
    +           // verify that we only call initializeState()
    +           // once with isRestored() == false. All other invocations must 
have isRestored() == true. This
    +           // verifies that we don't restart a job from scratch in case 
the CompletedCheckpoints can't
    +           // be read.
    +           static AtomicInteger allowedInitializeCallsWithoutRestore = new 
AtomicInteger(1);
    +
    +           // we count when we see restores that are not allowed. We only
    +           // allow restores once we messed with the HA directory and 
moved it back again
    +           static AtomicInteger illegalRestores = new AtomicInteger(0);
    +           static AtomicInteger successfulRestores = new AtomicInteger(0);
    +
    +           // whether we are after the phase where we messed with the 
ZooKeeper HA directory, i.e.
    +           // whether it's now ok for a restore to happen
    +           static AtomicBoolean afterMessWithZooKeeper = new 
AtomicBoolean(false);
    +
    +           static AtomicBoolean failedAlready = new AtomicBoolean(false);
    +
    +           // also have some state to write to the checkpoint
    +           private final ValueStateDescriptor<String> stateDescriptor =
    +                   new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE);
    +
    +           @Override
    +           public String map(String value) throws Exception {
    +                   
getRuntimeContext().getState(stateDescriptor).update("42");
    +                   return value;
    +           }
    +
    +           @Override
    +           public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
    +                   if (context.getCheckpointId() > 5) {
    +                           waitForCheckpointLatch.trigger();
    +                           failInCheckpointLatch.await();
    +                           if (!failedAlready.getAndSet(true)) {
    +                                   throw new RuntimeException("Failing on 
purpose.");
    +                           }
    +                   }
    +           }
    +
    +           @Override
    +           public void initializeState(FunctionInitializationContext 
context) {
    +                   if (!context.isRestored()) {
    --- End diff --
    
    No, this is exactly the thing we want to test. If we didn't have this check 
we would allow the case where ZooKeeper cannot read any of the state handles 
and will start the job from scratch.
    
    There might be other ways around it but I like this explicit way. What do 
you think?


---

Reply via email to