Since the consumer returns the offset of each message, you can manage the offset commit yourself in the application.
Thanks, Jun On Tue, Jun 17, 2014 at 2:24 AM, Achanta Vamsi Subhash < achanta.va...@flipkart.com> wrote: > Sorry. I want the first un-consumed message offset. > > > On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash < > achanta.va...@flipkart.com> wrote: > > > Hi, > > > > I have a consumer group with multiple threads (high-level consumers) > which > > read from a topic. > > > > I am also using a SimpleConsumer to read messages given a start offset. I > > am getting the offset as the last produced message using the below code. > > How to get the last un-consumed message? > > > > public long getLastOffset(SimpleConsumer consumer, String topic, int > > partition, > > long whichTime, String clientName) { > > TopicAndPartition topicAndPartition = new > TopicAndPartition(topic, > > partition); > > Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = > > new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); > > requestInfo.put(topicAndPartition, new > > PartitionOffsetRequestInfo(whichTime, maxReads)); > > kafka.javaapi.OffsetRequest request = new > > kafka.javaapi.OffsetRequest( > > requestInfo, kafka.api.OffsetRequest.CurrentVersion(), > > clientName); > > OffsetResponse response = consumer.getOffsetsBefore(request); > > > > if (response.hasError()) { > > LOGGER.error("Error fetching data Offset Data the Broker. > > Reason: " + response.errorCode(topic, partition) ); > > return 0; > > } > > long[] offsets = response.offsets(topic, partition); > > return offsets[0]; > > } > > > > > > -- > > Regards > > Vamsi Subhash > > > > > > -- > Regards > Vamsi Subhash >