So would I seekToEnd in that case? I can't seek to the last commit I
manually comitted since I may not be assigned the same partitions as before.

On Wed, Feb 24, 2016 at 1:44 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Sure, but in that case, the commits are still being stored in Kafka, so
> resetting to the last committed position seems like what you want.
>
> -Jason
>
> On Wed, Feb 24, 2016 at 10:42 AM, craig w <codecr...@gmail.com> wrote:
>
> > In this case I am managing offsets manually. When a message is done being
> > processed, I invoke "commitSync" passing it the map of commits to sync.
> >
> > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > I think the problem is the call to position() from within the callback.
> > > When onAssigned() gets invoked, we don't have a position yet, so
> calling
> > > position() forces the consumer to request the last committed offset
> which
> > > evidently causes an infinite loop. It might be worth opening a JIRA for
> > > this since we should definitely handle it more gracefully. That aside,
> if
> > > you are trying to seek to the last committed position, there is no need
> > to
> > > do it manually since this is what the consumer does automatically by
> > > default. The only time you should need to use seek() in the
> onAssigned()
> > > callback is if you are managing the offsets yourself.
> > >
> > > -Jason
> > >
> > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com> wrote:
> > >
> > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > > provide
> > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I
> am
> > > > doing this:
> > > >
> > > > partitions.foreach( (tp: TopicPartition) => {
> > > >     consumer.seek(tp, consumer.position(tp))
> > > > })
> > > >
> > > > However, sometimes I end up an infinite loop with
> > IllegalStateExceptions
> > > > being thrown [1]:
> > > >
> > > > No current assignment for partition <topic-partition>
> > > >
> > > > I thought it was safe to seek because the consumer should have been
> > > > assigned when this method is invoked. Am I missing something?
> > > >
> > > > For what it's worth, I am manually committing offsets (using
> > commitSync).
> > > >
> > > > [1] -
> > > >
> > > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > > >
> > >
> >
> >
> >
> > --
> >
> > https://github.com/mindscratch
> > https://www.google.com/+CraigWickesser
> > https://twitter.com/mind_scratch
> > https://twitter.com/craig_links
> >
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Reply via email to