Sure, but in that case, the commits are still being stored in Kafka, so resetting to the last committed position seems like what you want.
-Jason On Wed, Feb 24, 2016 at 10:42 AM, craig w <codecr...@gmail.com> wrote: > In this case I am managing offsets manually. When a message is done being > processed, I invoke "commitSync" passing it the map of commits to sync. > > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > I think the problem is the call to position() from within the callback. > > When onAssigned() gets invoked, we don't have a position yet, so calling > > position() forces the consumer to request the last committed offset which > > evidently causes an infinite loop. It might be worth opening a JIRA for > > this since we should definitely handle it more gracefully. That aside, if > > you are trying to seek to the last committed position, there is no need > to > > do it manually since this is what the consumer does automatically by > > default. The only time you should need to use seek() in the onAssigned() > > callback is if you are managing the offsets yourself. > > > > -Jason > > > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote: > > > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I > > provide > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am > > > doing this: > > > > > > partitions.foreach( (tp: TopicPartition) => { > > > consumer.seek(tp, consumer.position(tp)) > > > }) > > > > > > However, sometimes I end up an infinite loop with > IllegalStateExceptions > > > being thrown [1]: > > > > > > No current assignment for partition <topic-partition> > > > > > > I thought it was safe to seek because the consumer should have been > > > assigned when this method is invoked. Am I missing something? > > > > > > For what it's worth, I am manually committing offsets (using > commitSync). > > > > > > [1] - > > > > > > > > > https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228 > > > > > > > > > -- > > https://github.com/mindscratch > https://www.google.com/+CraigWickesser > https://twitter.com/mind_scratch > https://twitter.com/craig_links >