Hey Stevo, Thanks for the early testing on the new consumer! This might be a bug. I wonder if it could also be explained by partition rebalancing. In the current implementation, a rebalance will clear the old positions (including those that were seeked to). I think it's debatable whether this behavior is useful, but it may explain what you're seeing.
-Jason On Thu, Jul 23, 2015 at 2:10 AM, Stevo Slavić <ssla...@gmail.com> wrote: > Strange, if after seek I make several poll requests, eventually it will > read/return messages from offset that seek set. > > On Thu, Jul 23, 2015 at 11:03 AM, Stevo Slavić <ssla...@gmail.com> wrote: > > > Thanks Ewen for heads up. > > > > It's great that seek is not needed in between poll when business goes as > > usual. > > > > In edge case, when my logic detects it needs to go back and reread events > > from given position in history, I use seek. I found out that next poll > > after seek will not respect offset used in seek. It is strange that event > > Consumer.position returns same offset that seek has set for the consumer > > instance, but poll still does not return messages starting from that > offset. > > > > E.g. say there are 5 messages published to a single partition of a single > > topic. Consumer subscribes to that topic partition, with > smallest/earliest > > offset reset strategy configured, and consumer.position confirms that the > > consumer is at position 0. > > Then poll is issued and it returns all 5 messages. Logic processes > > messages, detects it needs to go back in history to position 0, it does > not > > commit messages but calls seek to 0, position confirms consumer is at > > offset 0. Next poll does not return any messages. > > > > So seek is not really working what it should do, according to its > javadoc: > > > > /** > > * Overrides the fetch offsets that the consumer will use on the next > > {@link #poll(long) poll(timeout)}. If this API > > * is invoked for the same partition more than once, the latest offset > > will be used on the next poll(). Note that > > * you may lose data if this API is arbitrarily used in the middle of > > consumption, to reset the fetch offsets > > */ > > > > I've checked also, calling seek multiple times does not help to get poll > > to use offset set with last seek. > > Could be something is wrong with poll implementation, making it not > > respect offset set with seek. > > > > Kind regards, > > Stevo Slavic. > > > > > > On Wed, Jul 22, 2015 at 7:47 AM, Ewen Cheslack-Postava < > e...@confluent.io> > > wrote: > > > >> It should just continue consuming using the existing offsets. It'll have > >> to > >> refresh metadata to pick up the leadership change, but once it does it > can > >> just pick up where consumption from the previous leader stopped -- all > the > >> ISRs should have the same data, so the new leader will have all the same > >> data the previous leader had (assuming unclean leader election is not > >> enabled). > >> > >> On Tue, Jul 21, 2015 at 9:11 PM, James Cheng <jch...@tivo.com> wrote: > >> > >> > > >> > > On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava < > e...@confluent.io > >> > > >> > wrote: > >> > >> > > > >> > > On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić <ssla...@gmail.com> > >> wrote: > >> > > > >> > >> Hello Apache Kafka community, > >> > >> > >> > >> I find new consumer poll/seek javadoc a bit confusing. Just by > >> reading > >> > docs > >> > >> I'm not sure what the outcome will be, what is expected in > following > >> > >> scenario: > >> > >> > >> > >> - kafkaConsumer is instantiated with auto-commit off > >> > >> - kafkaConsumer.subscribe(someTopic) > >> > >> - kafkaConsumer.position is called for every TopicPartition HLC is > >> > actively > >> > >> subscribed on > >> > >> > >> > >> and then when doing multiple poll calls in succession (without > >> calling > >> > >> commit), does seek have to be called in between poll calls to > >> position > >> > HLC > >> > >> to skip what was read in previous poll, or does HLC keep that state > >> > >> (position after poll) in memory, so that next poll (without seek in > >> > between > >> > >> two poll calls) will continue from where last poll stopped? > >> > >> > >> > > > >> > > The position is tracked in-memory within the consumer, so as long as > >> > there > >> > > isn't a consumer rebalance, consumption will just proceed with > >> subsequent > >> > > messages (i.e. the behavior I think most people would find > intuitive). > >> > > However, if a rebalance occurs (another consumer instance joins the > >> group > >> > > or some leave), then a partition may be assigned to an different > >> consumer > >> > > instance that has no idea about the current position and will > restart > >> > based > >> > > on the offset reset setting (because attempting to fetch the > committed > >> > > offset will fail since no offsets have been committed). > >> > > > >> > > >> > Ewen, > >> > > >> > What happens if there is a broker failure and a new broker becomes the > >> > partition leader? Does the high level consumer start listening to the > >> new > >> > partition leader at the in-memory position, or does it restart based > on > >> > saved offsets? > >> > > >> > Thanks, > >> > -James > >> > > >> > > -Ewen > >> > > > >> > > > >> > >> Could be it's just me not understanding this from javadoc. If not, > >> maybe > >> > >> javadoc can be improved to make this (even) more obvious. > >> > >> > >> > >> Kind regards, > >> > >> Stevo Slavic. > >> > >> > >> > > > >> > > > >> > > > >> > > -- > >> > > Thanks, > >> > > Ewen > >> > > >> > > >> > >> > >> -- > >> Thanks, > >> Ewen > >> > > > > >