In this case I am managing offsets manually. When a message is done being processed, I invoke "commitSync" passing it the map of commits to sync.
On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io> wrote: > I think the problem is the call to position() from within the callback. > When onAssigned() gets invoked, we don't have a position yet, so calling > position() forces the consumer to request the last committed offset which > evidently causes an infinite loop. It might be worth opening a JIRA for > this since we should definitely handle it more gracefully. That aside, if > you are trying to seek to the last committed position, there is no need to > do it manually since this is what the consumer does automatically by > default. The only time you should need to use seek() in the onAssigned() > callback is if you are managing the offsets yourself. > > -Jason > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote: > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I > provide > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am > > doing this: > > > > partitions.foreach( (tp: TopicPartition) => { > > consumer.seek(tp, consumer.position(tp)) > > }) > > > > However, sometimes I end up an infinite loop with IllegalStateExceptions > > being thrown [1]: > > > > No current assignment for partition <topic-partition> > > > > I thought it was safe to seek because the consumer should have been > > assigned when this method is invoked. Am I missing something? > > > > For what it's worth, I am manually committing offsets (using commitSync). > > > > [1] - > > > > > https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228 > > > -- https://github.com/mindscratch https://www.google.com/+CraigWickesser https://twitter.com/mind_scratch https://twitter.com/craig_links