Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203973793 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Source for window checkpointing IT cases that can introduce artificial failures. + */ +public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> + implements ListCheckpointed<Integer>, CheckpointListener { + + /** + * Function to generate and emit the test events (and watermarks if required). + */ + @FunctionalInterface + public interface EventEmittingGenerator extends Serializable { + void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo); + } + + private static final long INITIAL = Long.MIN_VALUE; + private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE; + + @Nonnull + private final EventEmittingGenerator eventEmittingGenerator; + private final int expectedEmitCalls; + private final int failureAfterNumElements; + private final boolean usingProcessingTime; + private final AtomicLong checkpointStatus; + + private int emitCallCount; + private volatile boolean running; + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations) { + this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime); + } + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations, + @Nonnull TimeCharacteristic timeCharacteristic) { + this.eventEmittingGenerator = eventEmittingGenerator; + this.running = true; + this.emitCallCount = 0; + this.expectedEmitCalls = numberOfGeneratorInvocations; + this.failureAfterNumElements = numberOfGeneratorInvocations / 2; + this.checkpointStatus = new AtomicLong(INITIAL); + this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime; + } + + @Override + public void open(Configuration parameters) { + // non-parallel source + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { + + final RuntimeContext runtimeContext = getRuntimeContext(); + // detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt) + final boolean failThisTask = + runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0; + + // we loop longer than we have elements, to permit delayed checkpoints + // to still cause a failure + while (running) { + + // the function failed before, or we are in the elements before the failure + synchronized (ctx.getCheckpointLock()) { + eventEmittingGenerator.emitEvent(ctx, emitCallCount++); + running &= (emitCallCount < expectedEmitCalls); + } + + if (emitCallCount < failureAfterNumElements) { + Thread.sleep(1); + } else if (failThisTask && emitCallCount == failureAfterNumElements) { + // wait for a pending checkpoint that fulfills our requirements if needed + while (checkpointStatus.get() != STATEFUL_CHECKPOINT_COMPLETED) { + Thread.sleep(1); + } + throw new Exception("Artificial Failure"); + } + } + + if (usingProcessingTime) { + while (true) { --- End diff -- Maybe us `running` here?
---