I am reposting a question that I posted last week. On startup or recovery we would like to read the latest message in each partition. The getOffsetsBefore() below seems to return the offset of the next message that will be published to that partition.
The code below works correctly as required. Is this the best way to get the offset and is it safe to decrement the offset returned as we do in the sample code below. requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(-1, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if(readOffset != 0) readOffset--; else handle this caseā¦ From: Neha Narkhede [mailto:n...@confluent.io] Sent: Monday, December 08, 2014 12:43 PM To: Orelowitz, David Cc: users@kafka.apache.org Subject: Re: Reading only the latest message The returned latest offset - 1 will be the offset of the last message. Sorry, should've made it clear in my last email. Let me know if that helps. On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David <david.orelow...@baml.com<mailto:david.orelow...@baml.com>> wrote: Neha, This seems to return the offset of the next message that will be published. If I fetch at that offset I will block until a new message is published to that partition. I am actually trying to read the contents of the latest message in the partition, and based on info in the message resubscribe to the data source. -----Original Message----- From: Neha Narkhede [mailto:n...@confluent.io<mailto:n...@confluent.io>] Sent: Friday, December 05, 2014 8:33 PM To: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: Reading only the latest message You can use the getOffsetsBefore() API and specify -1L to get the offset of the last committed message (at the time of the request) for that partition. On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David <david.orelow...@baml.com<mailto:david.orelow...@baml.com>> wrote: > What is the best mechanism to retrieve the latest message from a kafka > partition. > > We intend for our producer, on startup or recovery, to read the > upstream sequence number in the last message in the partition and > request for the upstream system to start sending from that sequence number++. > > Currently we are creating a SimpleConsumer and then calling > getOffsetBefore() using the current wall time. We then decrement the > offset returned and retrieve the message at this offset. We do manage > the case when the offset is zero. > > It seem to work! > > Is this the right approach. > > Thanks, > David > -- Thanks, Neha ---------------------------------------------------------------------- This message, and any attachments, is for the intended recipient(s) only, may contain information that is privileged, confidential and/or proprietary and subject to important terms and conditions available at http://www.bankofamerica.com/emaildisclaimer. If you are not the intended recipient, please delete this message. -- Thanks, Neha ---------------------------------------------------------------------- This message, and any attachments, is for the intended recipient(s) only, may contain information that is privileged, confidential and/or proprietary and subject to important terms and conditions available at http://www.bankofamerica.com/emaildisclaimer. If you are not the intended recipient, please delete this message.