[ https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14715430#comment-14715430 ]
Will Funnell commented on KAFKA-1977: ------------------------------------- I would still definitely like to see this in the new consumer, I think its a small thing to include, but very useful, especially when determining when you have reached the end of our log compacted topics. Our implementation is as follows: {code} Iterator<ConsumerMessage> iterator = new Iterator<ConsumerMessage>() { public boolean finished; private Integer partition; private ConsumerIterator<byte[], byte[]> it = stream.iterator(); private long count; @Override public boolean hasNext() { if (finished) { return false; } else { try { return it.hasNext(); } catch (Exception e) { if (hasBeenForciblyShutdownByClient(e)) { consumer.shutdown(); return false; } LOG.error("partition={} description=\"Error while fetching from Kafka\"", partition, e); throw e; } } } @Override public ConsumerMessage next() { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); count++; if (partition == null) { partition = messageAndMetadata.partition(); } if (LOG.isDebugEnabled()) { LOG.debug("count={} partition={} description=\"Messages read from Kafka\"", count, messageAndMetadata.partition()); } if (messageAndMetadata.offset() == messageAndMetadata.logEndOffset() - 1) { finished = true; LOG.info("partition=\"{}\", description=\"Finished with partition\"", messageAndMetadata.partition()); } return toConsumedMessage(messageAndMetadata); } @Override public void remove() { it.remove(); } private boolean hasBeenForciblyShutdownByClient(Exception e) { return e instanceof InterruptedException; } }; {code} Not quite sure how this translates to the new Consumer yet. > Make logEndOffset available in the Zookeeper consumer > ----------------------------------------------------- > > Key: KAFKA-1977 > URL: https://issues.apache.org/jira/browse/KAFKA-1977 > Project: Kafka > Issue Type: Improvement > Components: core > Reporter: Will Funnell > Priority: Minor > Attachments: > Make_logEndOffset_available_in_the_Zookeeper_consumer.patch > > > The requirement is to create a snapshot from the Kafka topic but NOT do > continual reads after that point. For example you might be creating a backup > of the data to a file. > In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps > was to expose the high watermark, as maxEndOffset, from the FetchResponse > object through to each MessageAndMetadata object in order to be aware when > the consumer has reached the end of each partition. > The submitted patch achieves this by adding the maxEndOffset to the > PartitionTopicInfo, which is updated when a new message arrives in the > ConsumerFetcherThread and then exposed in MessageAndMetadata. > See here for discussion: > http://search-hadoop.com/m/4TaT4TpJy71 -- This message was sent by Atlassian JIRA (v6.3.4#6332)