[ 
https://issues.apache.org/jira/browse/KAFKA-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739059#comment-14739059
 ] 

Will Funnell edited comment on KAFKA-2500 at 9/10/15 4:26 PM:
--------------------------------------------------------------

If it helps understand our use case, this is the API we have implemented to 
consume messages, using reactive streams.

{code}
    public interface KafkaClient<T> {

    /**
     * Obtain a finite stream for use with log compacted topic that is provider 
of sequenced stream of all the messages from the previously saved position,
     * publishing them according to the demand received from its Subscriber, 
finishing when each message has been read exactly once.
     */
    Publisher<T> retrieveAll();

    /**
     * Obtain an infinite stream that is provider of sequenced stream of all 
the messages from the previously saved position,
     * publishing them according to the demand received from its Subscriber.
     */
    Publisher<T> retrieve();

    /**
     * Resets position where to consume messages from to the beginning
     *
     * @throws ResetFailed
     */
    void reset() throws ResetFailed;
}
{code}


was (Author: willf):
If it helps understand our use case, this is the API we have implemented to 
consume messages, using reactive streams.

{code}
    public interface KafkaClient<T> {

    /**
     * Obtain a finite stream for use with log compacted topic that is provider 
of sequenced stream of all the properties from the previously saved position,
     * publishing them according to the demand received from its Subscriber, 
finishing when each message has been read exactly once.
     */
    Publisher<T> retrieveAll();

    /**
     * Obtain an infinite stream that is provider of sequenced stream of all 
the properties from the previously saved position,
     * publishing them according to the demand received from its Subscriber.
     */
    Publisher<T> retrieve();

    /**
     * Resets position where to consume messages from to the beginning
     *
     * @throws ResetFailed
     */
    void reset() throws ResetFailed;
}
{code}

> Make logEndOffset available in the 0.8.3 Consumer
> -------------------------------------------------
>
>                 Key: KAFKA-2500
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2500
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>    Affects Versions: 0.8.3
>            Reporter: Will Funnell
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 0.8.3
>
>
> Originally created in the old consumer here: 
> https://issues.apache.org/jira/browse/KAFKA-1977
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> This ticket covers the addition of the functionality to the new consumer.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to