Hey Stevo,

Thanks for the early testing on the new consumer! This might be a bug. I
wonder if it could also be explained by partition rebalancing. In the
current implementation, a rebalance will clear the old positions (including
those that were seeked to). I think it's debatable whether this behavior is
useful, but it may explain what you're seeing.

-Jason

On Thu, Jul 23, 2015 at 2:10 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Strange, if after seek I make several poll requests, eventually it will
> read/return messages from offset that seek set.
>
> On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>
> > Thanks Ewen for heads up.
> >
> > It's great that seek is not needed in between poll when business goes as
> > usual.
> >
> > In edge case, when my logic detects it needs to go back and reread events
> > from given position in history, I use seek. I found out that next poll
> > after seek will not respect offset used in seek. It is strange that event
> > Consumer.position returns same offset that seek has set for the consumer
> > instance, but poll still does not return messages starting from that
> offset.
> >
> > E.g. say there are 5 messages published to a single partition of a single
> > topic. Consumer subscribes to that topic partition, with
> smallest/earliest
> > offset reset strategy configured, and consumer.position confirms that the
> > consumer is at position 0.
> > Then poll is issued and it returns all 5 messages. Logic processes
> > messages, detects it needs to go back in history to position 0, it does
> not
> > commit messages but calls seek to 0, position confirms consumer is at
> > offset 0. Next poll does not return any messages.
> >
> > So seek is not really working what it should do, according to its
> javadoc:
> >
> > /**
> >  * Overrides the fetch offsets that the consumer will use on the next
> > {@link #poll(long) poll(timeout)}. If this API
> >  * is invoked for the same partition more than once, the latest offset
> > will be used on the next poll(). Note that
> >  * you may lose data if this API is arbitrarily used in the middle of
> > consumption, to reset the fetch offsets
> >  */
> >
> > I've checked also, calling seek multiple times does not help to get poll
> > to use offset set with last seek.
> > Could be something is wrong with poll implementation, making it not
> > respect offset set with seek.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> >
> > On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> >> It should just continue consuming using the existing offsets. It'll have
> >> to
> >> refresh metadata to pick up the leadership change, but once it does it
> can
> >> just pick up where consumption from the previous leader stopped -- all
> the
> >> ISRs should have the same data, so the new leader will have all the same
> >> data the previous leader had (assuming unclean leader election is not
> >> enabled).
> >>
> >> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jch...@tivo.com> wrote:
> >>
> >> >
> >> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <
> e...@confluent.io
> >> >
> >> > wrote:
> >>
> >> > >
> >> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ssla...@gmail.com>
> >> wrote:
> >> > >
> >> > >> Hello Apache Kafka community,
> >> > >>
> >> > >> I find new consumer poll/seek javadoc a bit confusing. Just by
> >> reading
> >> > docs
> >> > >> I'm not sure what the outcome will be, what is expected in
> following
> >> > >> scenario:
> >> > >>
> >> > >> - kafkaConsumer is instantiated with auto-commit off
> >> > >> - kafkaConsumer.subscribe(someTopic)
> >> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
> >> > actively
> >> > >> subscribed on
> >> > >>
> >> > >> and then when doing multiple poll calls in succession (without
> >> calling
> >> > >> commit), does seek have to be called in between poll calls to
> >> position
> >> > HLC
> >> > >> to skip what was read in previous poll, or does HLC keep that state
> >> > >> (position after poll) in memory, so that next poll (without seek in
> >> > between
> >> > >> two poll calls) will continue from where last poll stopped?
> >> > >>
> >> > >
> >> > > The position is tracked in-memory within the consumer, so as long as
> >> > there
> >> > > isn't a consumer rebalance, consumption will just proceed with
> >> subsequent
> >> > > messages (i.e. the behavior I think most people would find
> intuitive).
> >> > > However, if a rebalance occurs (another consumer instance joins the
> >> group
> >> > > or some leave), then a partition may be assigned to an different
> >> consumer
> >> > > instance that has no idea about the current position and will
> restart
> >> > based
> >> > > on the offset reset setting (because attempting to fetch the
> committed
> >> > > offset will fail since no offsets have been committed).
> >> > >
> >> >
> >> > Ewen,
> >> >
> >> > What happens if there is a broker failure and a new broker becomes the
> >> > partition leader? Does the high level consumer start listening to the
> >> new
> >> > partition leader at the in-memory position, or does it restart based
> on
> >> > saved offsets?
> >> >
> >> > Thanks,
> >> > -James
> >> >
> >> > > -Ewen
> >> > >
> >> > >
> >> > >> Could be it's just me not understanding this from javadoc. If not,
> >> maybe
> >> > >> javadoc can be improved to make this (even) more obvious.
> >> > >>
> >> > >> Kind regards,
> >> > >> Stevo Slavic.
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > Thanks,
> >> > > Ewen
> >> >
> >> >
> >>
> >>
> >> --
> >> Thanks,
> >> Ewen
> >>
> >
> >
>

Reply via email to