You shouldn't need to seek at all. When you commit offsets for a partition,
they are stored in Kafka and become available to any member which is
assigned that partition. The default behavior after every rebalance is to
lookup the last committed offset for each assigned position and seek to
it. This allows whichever consumer is assigned the partition to pick up
from wherever the last consumer left off.

-Jason

On Wed, Feb 24, 2016 at 10:46 AM, craig w <codecr...@gmail.com> wrote:

> So would I seekToEnd in that case? I can't seek to the last commit I
> manually comitted since I may not be assigned the same partitions as
> before.
>
> On Wed, Feb 24, 2016 at 1:44 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Sure, but in that case, the commits are still being stored in Kafka, so
> > resetting to the last committed position seems like what you want.
> >
> > -Jason
> >
> > On Wed, Feb 24, 2016 at 10:42 AM, craig w <codecr...@gmail.com> wrote:
> >
> > > In this case I am managing offsets manually. When a message is done
> being
> > > processed, I invoke "commitSync" passing it the map of commits to sync.
> > >
> > > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > I think the problem is the call to position() from within the
> callback.
> > > > When onAssigned() gets invoked, we don't have a position yet, so
> > calling
> > > > position() forces the consumer to request the last committed offset
> > which
> > > > evidently causes an infinite loop. It might be worth opening a JIRA
> for
> > > > this since we should definitely handle it more gracefully. That
> aside,
> > if
> > > > you are trying to seek to the last committed position, there is no
> need
> > > to
> > > > do it manually since this is what the consumer does automatically by
> > > > default. The only time you should need to use seek() in the
> > onAssigned()
> > > > callback is if you are managing the offsets yourself.
> > > >
> > > > -Jason
> > > >
> > > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <codecr...@gmail.com>
> wrote:
> > > >
> > > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > > > provide
> > > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I
> > am
> > > > > doing this:
> > > > >
> > > > > partitions.foreach( (tp: TopicPartition) => {
> > > > >     consumer.seek(tp, consumer.position(tp))
> > > > > })
> > > > >
> > > > > However, sometimes I end up an infinite loop with
> > > IllegalStateExceptions
> > > > > being thrown [1]:
> > > > >
> > > > > No current assignment for partition <topic-partition>
> > > > >
> > > > > I thought it was safe to seek because the consumer should have been
> > > > > assigned when this method is invoked. Am I missing something?
> > > > >
> > > > > For what it's worth, I am manually committing offsets (using
> > > commitSync).
> > > > >
> > > > > [1] -
> > > > >
> > > > >
> > > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > https://github.com/mindscratch
> > > https://www.google.com/+CraigWickesser
> > > https://twitter.com/mind_scratch
> > > https://twitter.com/craig_links
> > >
> >
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links
>

Reply via email to