I think the problem is the call to position() from within the callback.
When onAssigned() gets invoked, we don't have a position yet, so calling
position() forces the consumer to request the last committed offset which
evidently causes an infinite loop. It might be worth opening a JIRA for
this since we should definitely handle it more gracefully. That aside, if
you are trying to seek to the last committed position, there is no need to
do it manually since this is what the consumer does automatically by
default. The only time you should need to use seek() in the onAssigned()
callback is if you are managing the offsets yourself.

-Jason

On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote:

> I am using the Java Kafka 0.9 client. When I subscribe to a topic I provide
> a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> doing this:
>
> partitions.foreach( (tp: TopicPartition) => {
>     consumer.seek(tp, consumer.position(tp))
> })
>
> However, sometimes I end up an infinite loop with IllegalStateExceptions
> being thrown [1]:
>
> No current assignment for partition <topic-partition>
>
> I thought it was safe to seek because the consumer should have been
> assigned when this method is invoked. Am I missing something?
>
> For what it's worth, I am manually committing offsets (using commitSync).
>
> [1] -
>
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
>

Reply via email to