rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r540438232
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ########## @@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController chooseController(CheckpointBarrier private boolean canTimeout(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && - barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp()); + barrier.getId() <= lastSeenBarrier && + barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 < (System.nanoTime() - firstBarrierArrivalTime); + } + + private long getArrivalTime(CheckpointBarrier announcedBarrier) { + return announcedBarrier.getCheckpointOptions().isTimeoutable() ? System.nanoTime() : Long.MAX_VALUE; Review comment: > easier to understand I agree that it might be true for some users, but not for all. During the [previous discussion](https://github.com/apache/flink/pull/13827#discussion_r527794600), and also the one before, the consensus was that it's **not** easier to understand. However, we can discuss it again. (also there are some more technical advantages of "local" timeouts) > Secondly your proposed change will not work with single input tasks without active timeouts? Why, could you explain? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org