Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Just had a look at the API of `commitAsync`, and it seems like the committed offsets back to Kafka through this API (likewise for `commitSync`) need to be `lastProcessedMessageOffset + 1` ([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))). This mainly effects that when starting from group offsets in Kafka, `FlinkKafkaConsumer09` currently starts from the wrong offset. There's a separate JIRA for this bug: [FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618). Another contributor had already picked up FLINK-4618, so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR. Minus the above, this looks good to me. +1
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---