[ 
https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14715430#comment-14715430
 ] 

Will Funnell commented on KAFKA-1977:
-------------------------------------

I would still definitely like to see this in the new consumer, I think its a 
small thing to include, but very useful, especially when determining when you 
have reached the end of our log compacted topics.

Our implementation is as follows:

{code}
        Iterator<ConsumerMessage> iterator = new Iterator<ConsumerMessage>() {

            public boolean finished;
            private Integer partition;
            private ConsumerIterator<byte[], byte[]> it = stream.iterator();

            private long count;

            @Override
            public boolean hasNext() {

                if (finished) {
                    return false;
                } else {
                    try {
                        return it.hasNext();
                    } catch (Exception e) {

                        if (hasBeenForciblyShutdownByClient(e)) {

                            consumer.shutdown();

                            return false;
                        }
                        LOG.error("partition={} description=\"Error while 
fetching from Kafka\"", partition, e);

                        throw e;
                    }
                }
            }

            @Override
            public ConsumerMessage next() {

                MessageAndMetadata<byte[], byte[]> messageAndMetadata = 
it.next();
                count++;

                if (partition == null) {
                    partition = messageAndMetadata.partition();
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("count={} partition={} description=\"Messages 
read from Kafka\"", count, messageAndMetadata.partition());
                }

                if (messageAndMetadata.offset() == 
messageAndMetadata.logEndOffset() - 1) {

                    finished = true;
                    LOG.info("partition=\"{}\", description=\"Finished with 
partition\"", messageAndMetadata.partition());
                }

                return toConsumedMessage(messageAndMetadata);
            }

            @Override
            public void remove() {
                it.remove();
            }

            private boolean hasBeenForciblyShutdownByClient(Exception e) {
                return e instanceof InterruptedException;
            }

        };

{code}

Not quite sure how this translates to the new Consumer yet.

> Make logEndOffset available in the Zookeeper consumer
> -----------------------------------------------------
>
>                 Key: KAFKA-1977
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1977
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Will Funnell
>            Priority: Minor
>         Attachments: 
> Make_logEndOffset_available_in_the_Zookeeper_consumer.patch
>
>
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to