Hello Kafka Dev Team,

With new Consumer API redesign  (
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
),  is there a capability to consume given the topic and partition  start/
end position.  How would I achieve following use case of range consumption
with fail-over.


Use Case:
Ability to reload data given topic and its partition offset start/end with
High Level Consumer with fail over.   Basically, High Level Range
consumption and consumer group dies while main consumer group.


Suppose you have a topic called “test-topic” and its partition begin and
end offset.

{

topic:  test-topic,

[   {      partition id : 1 , offset start:   100,  offset end:
500,000 },


{          partition id : 2 ,  offset start:   200,000, offset end:
500,000

….. for n partitions

]

}

Each you create consumer group: “Range-Consumer “ and use seek method and
for each partition.   Your feedback is greatly appreciated.


In each JVM,


For each consumption tread:


Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)

Map<Integer, Integer> parttionTOEndOfsetMapping ….

for(TopicPartition tp : topicPartitionlist){

seek(TopicPartition(Parition 1), long offset)

}



while(true){

ConsumerRecords records = consumer.poll(10000);

// for each record check the offset

            record = record.iterator().next();

            if(parttionTOEndOfsetMapping(record.getPartition()) <=
record.getoffset) {
              // consume  record

            //commit  offset

              consumer.commit(CommitType.SYNC);

            }else {

                        // Should I unsubscribe it now  for this partition ?

                        consumer.unscribe(record.getPartition)

            }



}




Please let me know if the above approach is valid:

1) how will fail-over work.

2) how Rebooting entire consumer group impacts offset seek ? Since offset
are stored by Kafka itsself.

Thanks ,

Bhavesh

Reply via email to