rkhachatryan commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568846031



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void 
testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : 
parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws 
Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM 
fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", 
BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in 
cancelAllAndAwait
+                CompletableFuture<String> stop = 
client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid 
deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Yes, I tried to avoid using `sleep` here but ended up with either a 
deadlock (because of chaining) or test failure (because  
`BoundedPassThroughOperator#snapshotState` is too late).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to