In this case I am managing offsets manually. When a message is done being
processed, I invoke "commitSync" passing it the map of commits to sync.

On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io> wrote:

> I think the problem is the call to position() from within the callback.
> When onAssigned() gets invoked, we don't have a position yet, so calling
> position() forces the consumer to request the last committed offset which
> evidently causes an infinite loop. It might be worth opening a JIRA for
> this since we should definitely handle it more gracefully. That aside, if
> you are trying to seek to the last committed position, there is no need to
> do it manually since this is what the consumer does automatically by
> default. The only time you should need to use seek() in the onAssigned()
> callback is if you are managing the offsets yourself.
>
> -Jason
>
> On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote:
>
> > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> provide
> > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> > doing this:
> >
> > partitions.foreach( (tp: TopicPartition) => {
> >     consumer.seek(tp, consumer.position(tp))
> > })
> >
> > However, sometimes I end up an infinite loop with IllegalStateExceptions
> > being thrown [1]:
> >
> > No current assignment for partition <topic-partition>
> >
> > I thought it was safe to seek because the consumer should have been
> > assigned when this method is invoked. Am I missing something?
> >
> > For what it's worth, I am manually committing offsets (using commitSync).
> >
> > [1] -
> >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> >
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Reply via email to