Hi Jason,

Thanks for info.  I will implement (by end of next week) what you have
proposed.  If I encounter any issue,  I will let you know.

Indeed, adding new API would be uphill battle.  I did follow email chain
"Re: Kafka Consumer thoughts".

Thanks,

Bhavesh

On Wed, Aug 5, 2015 at 10:03 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Bhavesh,
>
> I think your use case can be handled with the new consumer API in roughly
> the manner I suggested previously. It might be a little easier if we added
> the ability to set the end offset for consumption. Perhaps something like
> this:
>
> // stop consumption from the partition when offset is reached
> void limit(TopicPartition partition, long offset)
>
> My guess is that we'd have a bit of an uphill battle to get this into the
> first release, but it may be possible if the use case is common enough. In
> any case, I think consuming to the limit offset and manually pausing the
> partition is a viable alternative.
>
> As for your question about fail-over, the new consumer provides a similar
> capability to the old high-level consumer. Here is a link to the wiki which
> describes its design:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> -Jason
>
> On Tue, Aug 4, 2015 at 12:01 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Jason and Kafka Dev Team,
> >
> >
> >
> > First of all thanks for responding and I think you got expected behavior
> > correctly.
> >
> >
> >
> > The use-case is offset range consumption.  We store each minute highest
> > offset for each topic per partition.  So if we need to reload or
> re-consume
> > data from yesterday per say 8AM to noon, we would have offset start
> mapping
> > at 8AM and end offset mapping at noon in Time Series Database.
> >
> >
> >
> > I was trying to load this use case with New Consumer API.   Do you or
> Kafka
> > Dev team agree with request to either have API that takes in topic and
> its
> > start/end offset for High Level Consumer group  (With older consumer API
> we
> > used Simple consumer before without fail-over).  Also, for each
> > range-consumption, there will be different group id  and group id will
> not
> > be reused.  The main purpose is to reload or process past data again (due
> > to production bugs or downtime etc occasionally and let main
> consumer-group
> > continue to consume latest records).
> >
> >
> > void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
> > endOffsetPartitions)
> >
> >
> >
> > or something similar which will allow following:
> >
> >
> >
> > 1)   When consumer group already exists (meaning have consumed data and
> > committed offset to storage system either Kafka or ZK) ignore start
> offset
> > positions and use committed offset.  If not committed use start Offset
> > Partition.
> >
> > 2)   When partition consumption has reached end Offset for given
> partition,
> > pause is fine or this assigned thread become fail over or wait for
> > reassignment.
> >
> > 3)   When all are Consumer Group is done consuming all partitions offset
> > ranges (start to end), gracefully shutdown entire consumer group.
> >
> > 4)   While consuming records, if one of node or consuming thread goes
> down
> > automatic fail-over to others (Similar to High Level Consumer for OLD
> > Consumer API.   I am not sure if there exists High level and/or Simple
> > Consumer concept for New API  )
> >
> >
> >
> > I hope above explanation clarifies use-case and intended behavior.
> Thanks
> > for clarifications, and you are correct we need pause(TopicPartition tp),
> > resume(TopicPartition tp), and/or API to set to end offset for each
> > partition.
> >
> >
> >
> > Please do let us know your preference to support above simple use-case.
> >
> >
> > Thanks,
> >
> >
> > Bhavesh
> >
> > On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Bhavesh,
> > >
> > > I'm not totally sure I understand the expected behavior, but I think
> this
> > > can work. Instead of seeking to the start of the range before the poll
> > > loop, you should probably provide a ConsumerRebalanceCallback to get
> > > notifications when group assignment has changed (e.g. when one of your
> > > nodes dies). When a new partition is assigned, the callback will be
> > invoked
> > > by the consumer and you can use it to check if there's a committed
> > position
> > > in the range or if you need to seek to the beginning of the range. For
> > > example:
> > >
> > > void onPartitionsAssigned(consumer, partitions) {
> > >   for (partition : partitions) {
> > >      try {
> > >        offset = consumer.committed(partition)
> > >        consumer.seek(partition, offset)
> > >      } catch (NoOffsetForPartition) {
> > >        consumer.seek(partition, rangeStart)
> > >      }
> > >   }
> > > }
> > >
> > > If a failure occurs, then the partitions will be rebalanced across
> > > whichever consumers are still active. The case of the entire cluster
> > being
> > > rebooted is not really different. When the consumers come back, they
> > check
> > > the committed position and resume where they left off. Does that make
> > > sense?
> > >
> > > After you are finished consuming a partition's range, you can use
> > > KafkaConsumer.pause(partition) to prevent further fetches from being
> > > initiated while still maintaining the current assignment. The patch to
> > add
> > > pause() is not in trunk yet, but it probably will be before too long.
> > >
> > > One potential problem is that you wouldn't be able to reuse the same
> > group
> > > to consume a different range because of the way it depends on the
> > committed
> > > offsets. Kafka's commit API actually allows some additional metadata to
> > go
> > > along with a committed offset and that could potentially be used to tie
> > the
> > > commit to the range, but it's not yet exposed in KafkaConsumer. I
> assume
> > it
> > > will be eventually, but I'm not sure whether that will be part of the
> > > initial release.
> > >
> > >
> > > Hope that helps!
> > >
> > > Jason
> > >
> > > On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > Hello Kafka Dev Team,
> > > >
> > > >
> > > > With new Consumer API redesign  (
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> > > > ),  is there a capability to consume given the topic and partition
> > > start/
> > > > end position.  How would I achieve following use case of range
> > > consumption
> > > > with fail-over.
> > > >
> > > >
> > > > Use Case:
> > > > Ability to reload data given topic and its partition offset start/end
> > > with
> > > > High Level Consumer with fail over.   Basically, High Level Range
> > > > consumption and consumer group dies while main consumer group.
> > > >
> > > >
> > > > Suppose you have a topic called “test-topic” and its partition begin
> > and
> > > > end offset.
> > > >
> > > > {
> > > >
> > > > topic:  test-topic,
> > > >
> > > > [   {      partition id : 1 , offset start:   100,  offset end:
> > > > 500,000 },
> > > >
> > > >
> > > > {          partition id : 2 ,  offset start:   200,000, offset end:
> > > > 500,000
> > > >
> > > > ….. for n partitions
> > > >
> > > > ]
> > > >
> > > > }
> > > >
> > > > Each you create consumer group: “Range-Consumer “ and use seek method
> > and
> > > > for each partition.   Your feedback is greatly appreciated.
> > > >
> > > >
> > > > In each JVM,
> > > >
> > > >
> > > > For each consumption tread:
> > > >
> > > >
> > > > Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)
> > > >
> > > > Map<Integer, Integer> parttionTOEndOfsetMapping ….
> > > >
> > > > for(TopicPartition tp : topicPartitionlist){
> > > >
> > > > seek(TopicPartition(Parition 1), long offset)
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > while(true){
> > > >
> > > > ConsumerRecords records = consumer.poll(10000);
> > > >
> > > > // for each record check the offset
> > > >
> > > >             record = record.iterator().next();
> > > >
> > > >             if(parttionTOEndOfsetMapping(record.getPartition()) <=
> > > > record.getoffset) {
> > > >               // consume  record
> > > >
> > > >             //commit  offset
> > > >
> > > >               consumer.commit(CommitType.SYNC);
> > > >
> > > >             }else {
> > > >
> > > >                         // Should I unsubscribe it now  for this
> > > partition
> > > > ?
> > > >
> > > >                         consumer.unscribe(record.getPartition)
> > > >
> > > >             }
> > > >
> > > >
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > > Please let me know if the above approach is valid:
> > > >
> > > > 1) how will fail-over work.
> > > >
> > > > 2) how Rebooting entire consumer group impacts offset seek ? Since
> > offset
> > > > are stored by Kafka itsself.
> > > >
> > > > Thanks ,
> > > >
> > > > Bhavesh
> > > >
> > >
> >
>

Reply via email to