Sorry. I want the first un-consumed message offset.

On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> Hi,
>
> I have a consumer group with multiple threads (high-level consumers) which
> read from a topic.
>
> I am also using a SimpleConsumer to read messages given a start offset. I
> am getting the offset as the last produced message using the below code.
> How to get the last un-consumed message?
>
>     public long getLastOffset(SimpleConsumer consumer, String topic, int
> partition,
>                                      long whichTime, String clientName) {
>         TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partition);
>         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
>         requestInfo.put(topicAndPartition, new
> PartitionOffsetRequestInfo(whichTime, maxReads));
>         kafka.javaapi.OffsetRequest request = new
> kafka.javaapi.OffsetRequest(
>                 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
> clientName);
>         OffsetResponse response = consumer.getOffsetsBefore(request);
>
>         if (response.hasError()) {
>             LOGGER.error("Error fetching data Offset Data the Broker.
> Reason: " + response.errorCode(topic, partition) );
>             return 0;
>         }
>         long[] offsets = response.offsets(topic, partition);
>         return offsets[0];
>     }
>
>
> --
> Regards
> Vamsi Subhash
>



-- 
Regards
Vamsi Subhash

Reply via email to