Hi,

I have a consumer group with multiple threads (high-level consumers) which
read from a topic.

I am also using a SimpleConsumer to read messages given a start offset. I
am getting the offset as the last produced message using the below code.
How to get the last un-consumed message?

    public long getLastOffset(SimpleConsumer consumer, String topic, int
partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, maxReads));
        kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOGGER.error("Error fetching data Offset Data the Broker.
Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }


-- 
Regards
Vamsi Subhash

Reply via email to