Hey Bhavesh, I think your use case can be handled with the new consumer API in roughly the manner I suggested previously. It might be a little easier if we added the ability to set the end offset for consumption. Perhaps something like this:
// stop consumption from the partition when offset is reached void limit(TopicPartition partition, long offset) My guess is that we'd have a bit of an uphill battle to get this into the first release, but it may be possible if the use case is common enough. In any case, I think consuming to the limit offset and manually pausing the partition is a viable alternative. As for your question about fail-over, the new consumer provides a similar capability to the old high-level consumer. Here is a link to the wiki which describes its design: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design -Jason On Tue, Aug 4, 2015 at 12:01 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Jason and Kafka Dev Team, > > > > First of all thanks for responding and I think you got expected behavior > correctly. > > > > The use-case is offset range consumption. We store each minute highest > offset for each topic per partition. So if we need to reload or re-consume > data from yesterday per say 8AM to noon, we would have offset start mapping > at 8AM and end offset mapping at noon in Time Series Database. > > > > I was trying to load this use case with New Consumer API. Do you or Kafka > Dev team agree with request to either have API that takes in topic and its > start/end offset for High Level Consumer group (With older consumer API we > used Simple consumer before without fail-over). Also, for each > range-consumption, there will be different group id and group id will not > be reused. The main purpose is to reload or process past data again (due > to production bugs or downtime etc occasionally and let main consumer-group > continue to consume latest records). > > > void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[] > endOffsetPartitions) > > > > or something similar which will allow following: > > > > 1) When consumer group already exists (meaning have consumed data and > committed offset to storage system either Kafka or ZK) ignore start offset > positions and use committed offset. If not committed use start Offset > Partition. > > 2) When partition consumption has reached end Offset for given partition, > pause is fine or this assigned thread become fail over or wait for > reassignment. > > 3) When all are Consumer Group is done consuming all partitions offset > ranges (start to end), gracefully shutdown entire consumer group. > > 4) While consuming records, if one of node or consuming thread goes down > automatic fail-over to others (Similar to High Level Consumer for OLD > Consumer API. I am not sure if there exists High level and/or Simple > Consumer concept for New API ) > > > > I hope above explanation clarifies use-case and intended behavior. Thanks > for clarifications, and you are correct we need pause(TopicPartition tp), > resume(TopicPartition tp), and/or API to set to end offset for each > partition. > > > > Please do let us know your preference to support above simple use-case. > > > Thanks, > > > Bhavesh > > On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi Bhavesh, > > > > I'm not totally sure I understand the expected behavior, but I think this > > can work. Instead of seeking to the start of the range before the poll > > loop, you should probably provide a ConsumerRebalanceCallback to get > > notifications when group assignment has changed (e.g. when one of your > > nodes dies). When a new partition is assigned, the callback will be > invoked > > by the consumer and you can use it to check if there's a committed > position > > in the range or if you need to seek to the beginning of the range. For > > example: > > > > void onPartitionsAssigned(consumer, partitions) { > > for (partition : partitions) { > > try { > > offset = consumer.committed(partition) > > consumer.seek(partition, offset) > > } catch (NoOffsetForPartition) { > > consumer.seek(partition, rangeStart) > > } > > } > > } > > > > If a failure occurs, then the partitions will be rebalanced across > > whichever consumers are still active. The case of the entire cluster > being > > rebooted is not really different. When the consumers come back, they > check > > the committed position and resume where they left off. Does that make > > sense? > > > > After you are finished consuming a partition's range, you can use > > KafkaConsumer.pause(partition) to prevent further fetches from being > > initiated while still maintaining the current assignment. The patch to > add > > pause() is not in trunk yet, but it probably will be before too long. > > > > One potential problem is that you wouldn't be able to reuse the same > group > > to consume a different range because of the way it depends on the > committed > > offsets. Kafka's commit API actually allows some additional metadata to > go > > along with a committed offset and that could potentially be used to tie > the > > commit to the range, but it's not yet exposed in KafkaConsumer. I assume > it > > will be eventually, but I'm not sure whether that will be part of the > > initial release. > > > > > > Hope that helps! > > > > Jason > > > > On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com> > > wrote: > > > > > Hello Kafka Dev Team, > > > > > > > > > With new Consumer API redesign ( > > > > > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > > > ), is there a capability to consume given the topic and partition > > start/ > > > end position. How would I achieve following use case of range > > consumption > > > with fail-over. > > > > > > > > > Use Case: > > > Ability to reload data given topic and its partition offset start/end > > with > > > High Level Consumer with fail over. Basically, High Level Range > > > consumption and consumer group dies while main consumer group. > > > > > > > > > Suppose you have a topic called “test-topic” and its partition begin > and > > > end offset. > > > > > > { > > > > > > topic: test-topic, > > > > > > [ { partition id : 1 , offset start: 100, offset end: > > > 500,000 }, > > > > > > > > > { partition id : 2 , offset start: 200,000, offset end: > > > 500,000 > > > > > > ….. for n partitions > > > > > > ] > > > > > > } > > > > > > Each you create consumer group: “Range-Consumer “ and use seek method > and > > > for each partition. Your feedback is greatly appreciated. > > > > > > > > > In each JVM, > > > > > > > > > For each consumption tread: > > > > > > > > > Consumer c = KafkaConsumer( { group.id=”Range-consumer}…) > > > > > > Map<Integer, Integer> parttionTOEndOfsetMapping …. > > > > > > for(TopicPartition tp : topicPartitionlist){ > > > > > > seek(TopicPartition(Parition 1), long offset) > > > > > > } > > > > > > > > > > > > while(true){ > > > > > > ConsumerRecords records = consumer.poll(10000); > > > > > > // for each record check the offset > > > > > > record = record.iterator().next(); > > > > > > if(parttionTOEndOfsetMapping(record.getPartition()) <= > > > record.getoffset) { > > > // consume record > > > > > > //commit offset > > > > > > consumer.commit(CommitType.SYNC); > > > > > > }else { > > > > > > // Should I unsubscribe it now for this > > partition > > > ? > > > > > > consumer.unscribe(record.getPartition) > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > > > > Please let me know if the above approach is valid: > > > > > > 1) how will fail-over work. > > > > > > 2) how Rebooting entire consumer group impacts offset seek ? Since > offset > > > are stored by Kafka itsself. > > > > > > Thanks , > > > > > > Bhavesh > > > > > >