[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529466#comment-15529466 ]
Tzu-Li (Gordon) Tai edited comment on FLINK-4618 at 9/28/16 12:43 PM: ---------------------------------------------------------------------- Hi [~melmoth], I just had a look at the Kafka 0.9 API, and it seems like when committing offsets using the new `KafkaConsumer` API, the correct value to commit back to Kafka is {{lastProcessedOffset + 1}} (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)). I believe correcting this should fix the issue :) Let me know if you bump into any other problems. was (Author: tzulitai): Hi [~melmoth], I just had a look at the Kafka 0.9 API, and it seems like when committing offsets using the new `KafkaConsumer` API, the correct value to commit back to Kafka is `lastProcessedOffset + 1` (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync(java.util.Map)). I believe correcting this should fix the issue :) Let me know if you bump into any other problems. > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --------------------------------------------------------------------------------------- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 > Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient > - Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient > - Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher > - Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher > - Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient > - Initiating connection to node 2147482646 at hdp1:6667. > 10:32:01,673 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient > - Completed connection to node 2147482646 > // See below! Shouldn't the offset be 4848911? > 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher > - Resetting offset for partition myTopic-0 to the committed offset 4848910 > 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848910 > 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient > - Initiating connection to node 1001 at hdp1:6667. > 10:32:01,687 DEBUG org.apache.kafka.clients.NetworkClient > - Completed connection to node 1001 > // Here record 4848910 gets consumed again! > 10:32:01,707 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:32:01,708 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:32:03,721 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,224 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,726 TRACE org.apache.kafka.clients.consumer.internals.Fetcher > - Added fetch request for partition myTopic-0 at offset 4848911 > 10:32:04,894 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 10:32:04,903 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:3079 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)