rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r544876522
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java ########## @@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception { assertFalse(stateWriter.getAddedInput().isEmpty()); } + /** + * If a checkpoint announcement was processed and then UC-barrier arrives (from the upstream) + * then it should be processed by the UC controller. + */ + @Test + public void testSwitchToUnalignedByUpstream() throws Exception { + SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build(); + inputGate.setInputChannels(new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)); + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target); + CheckpointedInputGate gate = buildGate(target, 2); + + CheckpointBarrier aligned = new CheckpointBarrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE)); + + send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, gate); // process announcement but not the barrier + send(toBuffer(aligned.asUnaligned(), true), 1, gate); // pretend it came from upstream before the first (AC) barrier was picked up + } Review comment: The expectation is that it just won't fail (without the fix, it will). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org