[ https://issues.apache.org/jira/browse/FLINK-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355119#comment-15355119 ]
Robert Metzger commented on FLINK-3294: --------------------------------------- Hi Jens, My code is certainly outdated by now, because the the Kafka Consumer has been refactored in the meantime. Still, it shouldn't be too hard to add it to the current codebase again. Also, the code I've posted didn't work yet (I think I was unable to retrieve committed offsets from the broker). I think its probably easier to re-implement it from scratch, using my old code as a reference. How important is the feature for you? If you want to have it included into a release, we have to hurry up, because the Flink community is currently fixing the last bugs before the 1.1.0 release, and it'll soon be out (2-4 weeks). The next release will need ~3 months. If a snapshot version is okay for you, we can do it independent of the release. > KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets() > ----------------------------------------------------------------------- > > Key: FLINK-3294 > URL: https://issues.apache.org/jira/browse/FLINK-3294 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Reporter: Robert Metzger > Assignee: Robert Metzger > > Currently, the 0.8 consumer for Kafka is committing the offsets manually into > Zookeeper so that users can track the lag using external tools. > The 0.8 consumer has a pluggable design, and this component is easily > pluggable. > Since OffsetCommitRequest version=1 (supported in 0.8.2 or later), users can > choose between two offset commit modes: > a) Let the broker commit into ZK (this is what we are doing from the consumer > b) Let the broker commit the offset into a special topic. > By adding a different "OffsetHandler" backend, users can commit offsets from > the brokers (reducing the total number of ZK connections) or into the > broker's offset topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)