Strange, if after seek I make several poll requests, eventually it will
read/return messages from offset that seek set.

On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Thanks Ewen for heads up.
>
> It's great that seek is not needed in between poll when business goes as
> usual.
>
> In edge case, when my logic detects it needs to go back and reread events
> from given position in history, I use seek. I found out that next poll
> after seek will not respect offset used in seek. It is strange that event
> Consumer.position returns same offset that seek has set for the consumer
> instance, but poll still does not return messages starting from that offset.
>
> E.g. say there are 5 messages published to a single partition of a single
> topic. Consumer subscribes to that topic partition, with smallest/earliest
> offset reset strategy configured, and consumer.position confirms that the
> consumer is at position 0.
> Then poll is issued and it returns all 5 messages. Logic processes
> messages, detects it needs to go back in history to position 0, it does not
> commit messages but calls seek to 0, position confirms consumer is at
> offset 0. Next poll does not return any messages.
>
> So seek is not really working what it should do, according to its javadoc:
>
> /**
>  * Overrides the fetch offsets that the consumer will use on the next
> {@link #poll(long) poll(timeout)}. If this API
>  * is invoked for the same partition more than once, the latest offset
> will be used on the next poll(). Note that
>  * you may lose data if this API is arbitrarily used in the middle of
> consumption, to reset the fetch offsets
>  */
>
> I've checked also, calling seek multiple times does not help to get poll
> to use offset set with last seek.
> Could be something is wrong with poll implementation, making it not
> respect offset set with seek.
>
> Kind regards,
> Stevo Slavic.
>
>
> On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
>> It should just continue consuming using the existing offsets. It'll have
>> to
>> refresh metadata to pick up the leadership change, but once it does it can
>> just pick up where consumption from the previous leader stopped -- all the
>> ISRs should have the same data, so the new leader will have all the same
>> data the previous leader had (assuming unclean leader election is not
>> enabled).
>>
>> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jch...@tivo.com> wrote:
>>
>> >
>> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <e...@confluent.io
>> >
>> > wrote:
>>
>> > >
>> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ssla...@gmail.com>
>> wrote:
>> > >
>> > >> Hello Apache Kafka community,
>> > >>
>> > >> I find new consumer poll/seek javadoc a bit confusing. Just by
>> reading
>> > docs
>> > >> I'm not sure what the outcome will be, what is expected in following
>> > >> scenario:
>> > >>
>> > >> - kafkaConsumer is instantiated with auto-commit off
>> > >> - kafkaConsumer.subscribe(someTopic)
>> > >> - kafkaConsumer.position is called for every TopicPartition HLC is
>> > actively
>> > >> subscribed on
>> > >>
>> > >> and then when doing multiple poll calls in succession (without
>> calling
>> > >> commit), does seek have to be called in between poll calls to
>> position
>> > HLC
>> > >> to skip what was read in previous poll, or does HLC keep that state
>> > >> (position after poll) in memory, so that next poll (without seek in
>> > between
>> > >> two poll calls) will continue from where last poll stopped?
>> > >>
>> > >
>> > > The position is tracked in-memory within the consumer, so as long as
>> > there
>> > > isn't a consumer rebalance, consumption will just proceed with
>> subsequent
>> > > messages (i.e. the behavior I think most people would find intuitive).
>> > > However, if a rebalance occurs (another consumer instance joins the
>> group
>> > > or some leave), then a partition may be assigned to an different
>> consumer
>> > > instance that has no idea about the current position and will restart
>> > based
>> > > on the offset reset setting (because attempting to fetch the committed
>> > > offset will fail since no offsets have been committed).
>> > >
>> >
>> > Ewen,
>> >
>> > What happens if there is a broker failure and a new broker becomes the
>> > partition leader? Does the high level consumer start listening to the
>> new
>> > partition leader at the in-memory position, or does it restart based on
>> > saved offsets?
>> >
>> > Thanks,
>> > -James
>> >
>> > > -Ewen
>> > >
>> > >
>> > >> Could be it's just me not understanding this from javadoc. If not,
>> maybe
>> > >> javadoc can be improved to make this (even) more obvious.
>> > >>
>> > >> Kind regards,
>> > >> Stevo Slavic.
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Thanks,
>> > > Ewen
>> >
>> >
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>

Reply via email to