[ https://issues.apache.org/jira/browse/KAFKA-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739059#comment-14739059 ]
Will Funnell edited comment on KAFKA-2500 at 9/10/15 4:26 PM: -------------------------------------------------------------- If it helps understand our use case, this is the API we have implemented to consume messages, using reactive streams. {code} public interface KafkaClient<T> { /** * Obtain a finite stream for use with log compacted topic that is provider of sequenced stream of all the messages from the previously saved position, * publishing them according to the demand received from its Subscriber, finishing when each message has been read exactly once. */ Publisher<T> retrieveAll(); /** * Obtain an infinite stream that is provider of sequenced stream of all the messages from the previously saved position, * publishing them according to the demand received from its Subscriber. */ Publisher<T> retrieve(); /** * Resets position where to consume messages from to the beginning * * @throws ResetFailed */ void reset() throws ResetFailed; } {code} was (Author: willf): If it helps understand our use case, this is the API we have implemented to consume messages, using reactive streams. {code} public interface KafkaClient<T> { /** * Obtain a finite stream for use with log compacted topic that is provider of sequenced stream of all the properties from the previously saved position, * publishing them according to the demand received from its Subscriber, finishing when each message has been read exactly once. */ Publisher<T> retrieveAll(); /** * Obtain an infinite stream that is provider of sequenced stream of all the properties from the previously saved position, * publishing them according to the demand received from its Subscriber. */ Publisher<T> retrieve(); /** * Resets position where to consume messages from to the beginning * * @throws ResetFailed */ void reset() throws ResetFailed; } {code} > Make logEndOffset available in the 0.8.3 Consumer > ------------------------------------------------- > > Key: KAFKA-2500 > URL: https://issues.apache.org/jira/browse/KAFKA-2500 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Affects Versions: 0.8.3 > Reporter: Will Funnell > Assignee: Jason Gustafson > Priority: Critical > Fix For: 0.8.3 > > > Originally created in the old consumer here: > https://issues.apache.org/jira/browse/KAFKA-1977 > The requirement is to create a snapshot from the Kafka topic but NOT do > continual reads after that point. For example you might be creating a backup > of the data to a file. > This ticket covers the addition of the functionality to the new consumer. > In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps > was to expose the high watermark, as maxEndOffset, from the FetchResponse > object through to each MessageAndMetadata object in order to be aware when > the consumer has reached the end of each partition. > The submitted patch achieves this by adding the maxEndOffset to the > PartitionTopicInfo, which is updated when a new message arrives in the > ConsumerFetcherThread and then exposed in MessageAndMetadata. > See here for discussion: > http://search-hadoop.com/m/4TaT4TpJy71 -- This message was sent by Atlassian JIRA (v6.3.4#6332)