Hello All,

I am new to Kafka, and confused in knowing things realated to offsets value
that we set over partitions in specific topic using SimpleConsumer class.
Would be great if anyone can tell me with example how to set an offset for
specific partions , for reading from beginig or from last offset, or any
specific offset i,e in between.

My scenario is
Producer genarated the data for a topic name - amit-topic, having only one
partions i.e 0.
Sample messages that I currently have in above topic is -

Hi amit
welcome
to kafka
world
Thanks
Bye

Now using Simple Consumer class trying to get have 3 scenario.
1 - Reading from beginig, for which used the java statement as -

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(-2, 1));
        kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

where 1st argument to PartitionOffsetRequestInfo is
kafka.api.OffsetRequest.LatestTime: Long = -2


2 - Reading any new messages from queue(last offset) , used the statment as
-
        ..
        ..
        requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(-1, 1));
        ..

where 1st argument to PartitionOffsetRequestInfo is
kafka.api.OffsetRequest.LatestTime: Long = -1

3 - Question is , If I want to start reading from offset which start from
"to kafka" message in above queue, what value should I set in above
statement

        requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(?, 1));

So that I get the start message as "to kafka" and next 3 message from queue
which is "world" "Thanks" and "Bye".

If some one can explain me, how this offsets work with respect to kafka
partitions, so that I can save individual consumer offset to either
zookeeper or any other persistent storage, later will use the starting
point, whenever the same consumer comes next time to consume.


Thanks
Amit

Reply via email to