Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/2559
  
    @StephanEwen 
    On a second look, I think the `commitSpecificOffsetsToKafka` method was 
designed to commit synchronously in the first place. `AbstractFetcher` holds a 
Map of all current pending offsets for committing by checkpointID, and on every 
`notifyCheckpointComplete` the offsets are removed from the Map before 
`commitSpecificOffsetsToKafka` is called.
    
    So, for async committing, I think we need to remove cleaning up the offsets 
in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a 
new separate callback handle method in `AbstractFetcher`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to