Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6376#discussion_r203973793
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
 ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.CheckpointListener;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Source for window checkpointing IT cases that can introduce artificial 
failures.
    + */
    +public class FailingSource extends RichSourceFunction<Tuple2<Long, 
IntType>>
    +   implements ListCheckpointed<Integer>, CheckpointListener {
    +
    +   /**
    +    * Function to generate and emit the test events (and watermarks if 
required).
    +    */
    +   @FunctionalInterface
    +   public interface EventEmittingGenerator extends Serializable {
    +           void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int 
eventSequenceNo);
    +   }
    +
    +   private static final long INITIAL = Long.MIN_VALUE;
    +   private static final long STATEFUL_CHECKPOINT_COMPLETED = 
Long.MIN_VALUE;
    +
    +   @Nonnull
    +   private final EventEmittingGenerator eventEmittingGenerator;
    +   private final int expectedEmitCalls;
    +   private final int failureAfterNumElements;
    +   private final boolean usingProcessingTime;
    +   private final AtomicLong checkpointStatus;
    +
    +   private int emitCallCount;
    +   private volatile boolean running;
    +
    +   public FailingSource(
    +           @Nonnull EventEmittingGenerator eventEmittingGenerator,
    +           @Nonnegative int numberOfGeneratorInvocations) {
    +           this(eventEmittingGenerator, numberOfGeneratorInvocations, 
TimeCharacteristic.EventTime);
    +   }
    +
    +   public FailingSource(
    +           @Nonnull EventEmittingGenerator eventEmittingGenerator,
    +           @Nonnegative int numberOfGeneratorInvocations,
    +           @Nonnull TimeCharacteristic timeCharacteristic) {
    +           this.eventEmittingGenerator = eventEmittingGenerator;
    +           this.running = true;
    +           this.emitCallCount = 0;
    +           this.expectedEmitCalls = numberOfGeneratorInvocations;
    +           this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
    +           this.checkpointStatus = new AtomicLong(INITIAL);
    +           this.usingProcessingTime = timeCharacteristic == 
TimeCharacteristic.ProcessingTime;
    +   }
    +
    +   @Override
    +   public void open(Configuration parameters) {
    +           // non-parallel source
    +           assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
    +   }
    +
    +   @Override
    +   public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws 
Exception {
    +
    +           final RuntimeContext runtimeContext = getRuntimeContext();
    +           // detect if this task is "the chosen one" and should fail (via 
subtaskidx), if it did not fail before (via attempt)
    +           final boolean failThisTask =
    +                   runtimeContext.getAttemptNumber() == 0 && 
runtimeContext.getIndexOfThisSubtask() == 0;
    +
    +           // we loop longer than we have elements, to permit delayed 
checkpoints
    +           // to still cause a failure
    +           while (running) {
    +
    +                   // the function failed before, or we are in the 
elements before the failure
    +                   synchronized (ctx.getCheckpointLock()) {
    +                           eventEmittingGenerator.emitEvent(ctx, 
emitCallCount++);
    +                           running &= (emitCallCount < expectedEmitCalls);
    +                   }
    +
    +                   if (emitCallCount < failureAfterNumElements) {
    +                           Thread.sleep(1);
    +                   } else if (failThisTask && emitCallCount == 
failureAfterNumElements) {
    +                           // wait for a pending checkpoint that fulfills 
our requirements if needed
    +                           while (checkpointStatus.get() != 
STATEFUL_CHECKPOINT_COMPLETED) {
    +                                   Thread.sleep(1);
    +                           }
    +                           throw new Exception("Artificial Failure");
    +                   }
    +           }
    +
    +           if (usingProcessingTime) {
    +                   while (true) {
    --- End diff --
    
    Maybe us `running` here?


---

Reply via email to