I am using the Java Kafka 0.9 client. When I subscribe to a topic I provide
a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
doing this:

partitions.foreach( (tp: TopicPartition) => {
    consumer.seek(tp, consumer.position(tp))
})

However, sometimes I end up an infinite loop with IllegalStateExceptions
being thrown [1]:

No current assignment for partition <topic-partition>

I thought it was safe to seek because the consumer should have been
assigned when this method is invoked. Am I missing something?

For what it's worth, I am manually committing offsets (using commitSync).

[1] -
https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228

Reply via email to