Sure, but in that case, the commits are still being stored in Kafka, so
resetting to the last committed position seems like what you want.

-Jason

On Wed, Feb 24, 2016 at 10:42 AM, craig w <codecr...@gmail.com> wrote:

> In this case I am managing offsets manually. When a message is done being
> processed, I invoke "commitSync" passing it the map of commits to sync.
>
> On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > I think the problem is the call to position() from within the callback.
> > When onAssigned() gets invoked, we don't have a position yet, so calling
> > position() forces the consumer to request the last committed offset which
> > evidently causes an infinite loop. It might be worth opening a JIRA for
> > this since we should definitely handle it more gracefully. That aside, if
> > you are trying to seek to the last committed position, there is no need
> to
> > do it manually since this is what the consumer does automatically by
> > default. The only time you should need to use seek() in the onAssigned()
> > callback is if you are managing the offsets yourself.
> >
> > -Jason
> >
> > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote:
> >
> > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > provide
> > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> > > doing this:
> > >
> > > partitions.foreach( (tp: TopicPartition) => {
> > >     consumer.seek(tp, consumer.position(tp))
> > > })
> > >
> > > However, sometimes I end up an infinite loop with
> IllegalStateExceptions
> > > being thrown [1]:
> > >
> > > No current assignment for partition <topic-partition>
> > >
> > > I thought it was safe to seek because the consumer should have been
> > > assigned when this method is invoked. Am I missing something?
> > >
> > > For what it's worth, I am manually committing offsets (using
> commitSync).
> > >
> > > [1] -
> > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > >
> >
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links
>

Reply via email to