Strange, if after seek I make several poll requests, eventually it will read/return messages from offset that seek set.
On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ssla...@gmail.com> wrote: > Thanks Ewen for heads up. > > It's great that seek is not needed in between poll when business goes as > usual. > > In edge case, when my logic detects it needs to go back and reread events > from given position in history, I use seek. I found out that next poll > after seek will not respect offset used in seek. It is strange that event > Consumer.position returns same offset that seek has set for the consumer > instance, but poll still does not return messages starting from that offset. > > E.g. say there are 5 messages published to a single partition of a single > topic. Consumer subscribes to that topic partition, with smallest/earliest > offset reset strategy configured, and consumer.position confirms that the > consumer is at position 0. > Then poll is issued and it returns all 5 messages. Logic processes > messages, detects it needs to go back in history to position 0, it does not > commit messages but calls seek to 0, position confirms consumer is at > offset 0. Next poll does not return any messages. > > So seek is not really working what it should do, according to its javadoc: > > /** > * Overrides the fetch offsets that the consumer will use on the next > {@link #poll(long) poll(timeout)}. If this API > * is invoked for the same partition more than once, the latest offset > will be used on the next poll(). Note that > * you may lose data if this API is arbitrarily used in the middle of > consumption, to reset the fetch offsets > */ > > I've checked also, calling seek multiple times does not help to get poll > to use offset set with last seek. > Could be something is wrong with poll implementation, making it not > respect offset set with seek. > > Kind regards, > Stevo Slavic. > > > On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava <e...@confluent.io> > wrote: > >> It should just continue consuming using the existing offsets. It'll have >> to >> refresh metadata to pick up the leadership change, but once it does it can >> just pick up where consumption from the previous leader stopped -- all the >> ISRs should have the same data, so the new leader will have all the same >> data the previous leader had (assuming unclean leader election is not >> enabled). >> >> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jch...@tivo.com> wrote: >> >> > >> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava <e...@confluent.io >> > >> > wrote: >> >> > > >> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ssla...@gmail.com> >> wrote: >> > > >> > >> Hello Apache Kafka community, >> > >> >> > >> I find new consumer poll/seek javadoc a bit confusing. Just by >> reading >> > docs >> > >> I'm not sure what the outcome will be, what is expected in following >> > >> scenario: >> > >> >> > >> - kafkaConsumer is instantiated with auto-commit off >> > >> - kafkaConsumer.subscribe(someTopic) >> > >> - kafkaConsumer.position is called for every TopicPartition HLC is >> > actively >> > >> subscribed on >> > >> >> > >> and then when doing multiple poll calls in succession (without >> calling >> > >> commit), does seek have to be called in between poll calls to >> position >> > HLC >> > >> to skip what was read in previous poll, or does HLC keep that state >> > >> (position after poll) in memory, so that next poll (without seek in >> > between >> > >> two poll calls) will continue from where last poll stopped? >> > >> >> > > >> > > The position is tracked in-memory within the consumer, so as long as >> > there >> > > isn't a consumer rebalance, consumption will just proceed with >> subsequent >> > > messages (i.e. the behavior I think most people would find intuitive). >> > > However, if a rebalance occurs (another consumer instance joins the >> group >> > > or some leave), then a partition may be assigned to an different >> consumer >> > > instance that has no idea about the current position and will restart >> > based >> > > on the offset reset setting (because attempting to fetch the committed >> > > offset will fail since no offsets have been committed). >> > > >> > >> > Ewen, >> > >> > What happens if there is a broker failure and a new broker becomes the >> > partition leader? Does the high level consumer start listening to the >> new >> > partition leader at the in-memory position, or does it restart based on >> > saved offsets? >> > >> > Thanks, >> > -James >> > >> > > -Ewen >> > > >> > > >> > >> Could be it's just me not understanding this from javadoc. If not, >> maybe >> > >> javadoc can be improved to make this (even) more obvious. >> > >> >> > >> Kind regards, >> > >> Stevo Slavic. >> > >> >> > > >> > > >> > > >> > > -- >> > > Thanks, >> > > Ewen >> > >> > >> >> >> -- >> Thanks, >> Ewen >> > >