Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 @StephanEwen On a second look, I think the `commitSpecificOffsetsToKafka` method was designed to commit synchronously in the first place. `AbstractFetcher` holds a Map of all current pending offsets for committing by checkpointID, and on every `notifyCheckpointComplete` the offsets are removed from the Map before `commitSpecificOffsetsToKafka` is called. So, for async committing, I think we need to remove cleaning up the offsets in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a new separate callback handle method in `AbstractFetcher`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---