Since the consumer returns the offset of each message, you can manage the
offset commit yourself in the application.

Thanks,

Jun


On Tue, Jun 17, 2014 at 2:24 AM, Achanta Vamsi Subhash <
achanta.va...@flipkart.com> wrote:

> Sorry. I want the first un-consumed message offset.
>
>
> On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash <
> achanta.va...@flipkart.com> wrote:
>
> > Hi,
> >
> > I have a consumer group with multiple threads (high-level consumers)
> which
> > read from a topic.
> >
> > I am also using a SimpleConsumer to read messages given a start offset. I
> > am getting the offset as the last produced message using the below code.
> > How to get the last un-consumed message?
> >
> >     public long getLastOffset(SimpleConsumer consumer, String topic, int
> > partition,
> >                                      long whichTime, String clientName) {
> >         TopicAndPartition topicAndPartition = new
> TopicAndPartition(topic,
> > partition);
> >         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> > new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
> >         requestInfo.put(topicAndPartition, new
> > PartitionOffsetRequestInfo(whichTime, maxReads));
> >         kafka.javaapi.OffsetRequest request = new
> > kafka.javaapi.OffsetRequest(
> >                 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
> > clientName);
> >         OffsetResponse response = consumer.getOffsetsBefore(request);
> >
> >         if (response.hasError()) {
> >             LOGGER.error("Error fetching data Offset Data the Broker.
> > Reason: " + response.errorCode(topic, partition) );
> >             return 0;
> >         }
> >         long[] offsets = response.offsets(topic, partition);
> >         return offsets[0];
> >     }
> >
> >
> > --
> > Regards
> > Vamsi Subhash
> >
>
>
>
> --
> Regards
> Vamsi Subhash
>

Reply via email to