rkhachatryan commented on a change in pull request #14814: URL: https://github.com/apache/flink/pull/14814#discussion_r568846031
########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ########## @@ -361,6 +374,171 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception { } } + static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T> + implements OneInputStreamOperator<T, T>, BoundedOneInput { + static volatile CountDownLatch progressLatch; + static volatile CountDownLatch snapshotLatch; + static volatile boolean inputEnded; + + private transient boolean processed; + + BoundedPassThroughOperator(ChainingStrategy chainingStrategy) { + this.chainingStrategy = chainingStrategy; + } + + private static void allowSnapshots() { + snapshotLatch.countDown(); + } + + @Override + public void endInput() throws Exception { + inputEnded = true; + } + + @Override + public void processElement(StreamRecord<T> element) throws Exception { + output.collect(element); + if (!processed) { + processed = true; + progressLatch.countDown(); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + snapshotLatch.await(); + super.snapshotState(context); + } + + // -------------------------------------------------------------------- + + static CountDownLatch getProgressLatch() { + return progressLatch; + } + + static void resetForTest(int parallelism, boolean allowSnapshots) { + progressLatch = new CountDownLatch(parallelism); + snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism); + inputEnded = false; + } + } + + @Test + public void testStopSavepointWithBoundedInputConcurrently() throws Exception { + final int numTaskManagers = 2; + final int numSlotsPerTaskManager = 2; + + while (true) { + + final MiniClusterResourceFactory clusterFactory = + new MiniClusterResourceFactory( + numTaskManagers, + numSlotsPerTaskManager, + getFileBasedCheckpointsConfig()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setParallelism(1); + + // It's only possible to test this with chaining. Without it, JM fails the job before + // the downstream gets the abort notification + BoundedPassThroughOperator<Integer> operator = + new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS); + InfiniteTestSource source = new InfiniteTestSource(); + DataStream<Integer> stream = + env.addSource(source) + .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator); + + stream.addSink(new DiscardingSink<>()); + + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobId = jobGraph.getJobID(); + + MiniClusterWithClientResource cluster = clusterFactory.get(); + cluster.before(); + ClusterClient<?> client = cluster.getClusterClient(); + + try { + BoundedPassThroughOperator.resetForTest(1, false); + InfiniteTestSource.resetForTest(); + + client.submitJob(jobGraph).get(); + + BoundedPassThroughOperator.getProgressLatch().await(); + InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait + CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null); + // await checkpoint start (not explicit signals to avoid deadlocks) + Thread.sleep(500); + InfiniteTestSource.cancelAllAndAwait(); // emulate end of input Review comment: Yes, I tried to avoid using `sleep` here but ended up with either a deadlock (because of chaining) or test failure (because `BoundedPassThroughOperator#snapshotState` is too late). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org