So would I seekToEnd in that case? I can't seek to the last commit I manually comitted since I may not be assigned the same partitions as before.
On Wed, Feb 24, 2016 at 1:44 PM, Jason Gustafson <ja...@confluent.io> wrote: > Sure, but in that case, the commits are still being stored in Kafka, so > resetting to the last committed position seems like what you want. > > -Jason > > On Wed, Feb 24, 2016 at 10:42 AM, craig w <codecr...@gmail.com> wrote: > > > In this case I am managing offsets manually. When a message is done being > > processed, I invoke "commitSync" passing it the map of commits to sync. > > > > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > I think the problem is the call to position() from within the callback. > > > When onAssigned() gets invoked, we don't have a position yet, so > calling > > > position() forces the consumer to request the last committed offset > which > > > evidently causes an infinite loop. It might be worth opening a JIRA for > > > this since we should definitely handle it more gracefully. That aside, > if > > > you are trying to seek to the last committed position, there is no need > > to > > > do it manually since this is what the consumer does automatically by > > > default. The only time you should need to use seek() in the > onAssigned() > > > callback is if you are managing the offsets yourself. > > > > > > -Jason > > > > > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote: > > > > > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I > > > provide > > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I > am > > > > doing this: > > > > > > > > partitions.foreach( (tp: TopicPartition) => { > > > > consumer.seek(tp, consumer.position(tp)) > > > > }) > > > > > > > > However, sometimes I end up an infinite loop with > > IllegalStateExceptions > > > > being thrown [1]: > > > > > > > > No current assignment for partition <topic-partition> > > > > > > > > I thought it was safe to seek because the consumer should have been > > > > assigned when this method is invoked. Am I missing something? > > > > > > > > For what it's worth, I am manually committing offsets (using > > commitSync). > > > > > > > > [1] - > > > > > > > > > > > > > > https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228 > > > > > > > > > > > > > > > -- > > > > https://github.com/mindscratch > > https://www.google.com/+CraigWickesser > > https://twitter.com/mind_scratch > > https://twitter.com/craig_links > > > -- https://github.com/mindscratch https://www.google.com/+CraigWickesser https://twitter.com/mind_scratch https://twitter.com/craig_links