I am reposting a question that I posted last week.

On startup or recovery we would like to read the latest message in each 
partition.
The getOffsetsBefore() below seems to return the offset of the next message 
that will be published to that partition.

The code below works correctly as required.
Is this the best way to get the offset and is it safe to decrement the offset 
returned as we do in the sample code below.

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(-1, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( 
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if(readOffset != 0)
readOffset--;
else
       handle this caseā€¦







From: Neha Narkhede [mailto:n...@confluent.io]
Sent: Monday, December 08, 2014 12:43 PM
To: Orelowitz, David
Cc: users@kafka.apache.org
Subject: Re: Reading only the latest message

The returned latest offset - 1 will be the offset of the last message. Sorry, 
should've made it clear in my last email. Let me know if that helps.

On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David 
<david.orelow...@baml.com<mailto:david.orelow...@baml.com>> wrote:
Neha,

This seems to return the offset of the next message that will be published. If 
I fetch at that offset I will block until a new message is published to that 
partition.

I am actually trying to read the contents of the latest message in the 
partition, and based on info in the message resubscribe to the data source.

-----Original Message-----
From: Neha Narkhede [mailto:n...@confluent.io<mailto:n...@confluent.io>]
Sent: Friday, December 05, 2014 8:33 PM
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Reading only the latest message

You can use the getOffsetsBefore() API and specify -1L to get the offset of the 
last committed message (at the time of the request) for that partition.

On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David 
<david.orelow...@baml.com<mailto:david.orelow...@baml.com>>
wrote:

> What is the best mechanism to retrieve the latest message from a kafka
> partition.
>
> We intend for our producer, on startup or recovery, to read the
> upstream sequence number in the last message in the partition and
> request for the upstream system to start sending from that sequence number++.
>
> Currently we are creating a SimpleConsumer and then calling
> getOffsetBefore() using the current wall time. We then decrement the
> offset returned and retrieve the message at this offset. We do manage
> the case when the offset is zero.
>
> It seem to work!
>
> Is this the right approach.
>
> Thanks,
> David
>

--
Thanks,
Neha

----------------------------------------------------------------------
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.



--
Thanks,
Neha

----------------------------------------------------------------------
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.

Reply via email to