AHeise commented on code in PR #25456: URL: https://github.com/apache/flink/pull/25456#discussion_r1806181486
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java: ########## @@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext context) throws Exception }); lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong(); // try to re-commit recovered transactions as quickly as possible - commit(lastCompletedCheckpointId); + commit(); } } + private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> + getCommitterStateSerializer() { + final CommittableCollectorSerializer<CommT> committableCollectorSerializer = + new CommittableCollectorSerializer<>( + committableSerializer, + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), + getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(), + metricGroup); + return new GlobalCommitterSerializer<>( + committableCollectorSerializer, globalCommittableSerializer, metricGroup); + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commit(lastCompletedCheckpointId); - } - - private void commit(long checkpointId) throws IOException, InterruptedException { - for (CheckpointCommittableManager<CommT> checkpoint : - committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { - checkpoint.commit(committer); + if (!commitOnInput) { + lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); + commit(); } - committableCollector.compact(); } - @Override - public void endInput() throws Exception { - final CheckpointCommittableManager<CommT> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - if (endOfInputCommittable != null) { - do { - endOfInputCommittable.commit(committer); - } while (!committableCollector.isFinished()); - } + private void commit() throws IOException, InterruptedException { + // this is true for the last commit and we need to make sure that all committables are + // indeed committed as this function will never be invoked again + boolean waitForAllCommitted = + lastCompletedCheckpointId == EOI Review Comment: Yes that is a likely scenario. Hence the second part of this statement that will only be true iff there is an EOI committable and it has received all messages. Then, the only way that could turn into infinite loop is when committables appear out of order (in respect to the checkpointId) which is possible with async retries. However, this applies also to the old version that loops on `endInput` (remember that we need to emit committables post EOI for the final checkpoint) It will be solved by the next PR that removes async retries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org