Hello all,

I have updated the KIP now. The diff is visible here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12

The PR https://github.com/apache/kafka/pull/10137 is now
available for reviews.

Thanks so much to you all for your help on this!
John

On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote:
> Thanks, Bill,
> 
> I was waiting for feedback on the "currentLag" proposal
> before updating the KIP. Since there haven't been any
> objections to my new proposal, I'm in the process of
> updating the KIP document right now.
> 
> Personally, I still like the original proposal to expose the
> metadata from the fetch responses as well, but there were
> too many side-effects of going that route.
> 
> Once I finish updating the wiki page, I'll ping here again
> to check for objections.
> 
> Thank you for taking another look!
> -John
> 
> On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote:
> > Thanks for the KIP update John,
> > 
> > The KIP as it stands LGTM.
> > 
> > One question, in your previous comment you stated that the KIP would
> > introduce the `currentLag()` method,  but the KIP seems to show the
> > `metadata()` implementation.
> > FWIW I like the metadata approach as it seems to provide information the
> > consumer has access to when it receives the fetch response.
> > 
> > Thanks again.
> > 
> > Bill
> > 
> > On Mon, Feb 22, 2021 at 10:51 AM John Roesler <vvcep...@apache.org> wrote:
> > 
> > > Hello all,
> > > 
> > > Since there haven't been any big objections to my proposed
> > > design fix, I'm updating the KIP now and opening the PR for
> > > reviews. Hopefully, we can get this merged quickly and
> > > unblock the system tests.
> > > 
> > > Thanks,
> > > John
> > > 
> > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > > > Thanks for that idea, Chia-Ping,
> > > > 
> > > > I agree that it would be nice for users to have a single API
> > > > to get all kinds of metadata, but I'd rather introduce it in
> > > > a KIP that would expose more than one piece of metadata. At
> > > > the moment, I am motivated to keep this as simple as
> > > > possible, which means only exposing the lag, so it seems
> > > > better to introduce just "currentLag".
> > > > 
> > > > A later KIP could introduce some kind of "local metadata"
> > > > API and deprecate this method without harm. That later KIP
> > > > would be in a better position to take its time to find a
> > > > good name, consider which pieces of metadata would be nice
> > > > to have, etc.
> > > > 
> > > > Thanks sincerely for all your reviews,
> > > > John
> > > > 
> > > > On Wed, 2021-02-17 at 11:54 +0000, Chia-Ping Tsai wrote:
> > > > > That new solution is good to me as it brings fewer changes than either
> > > options or config.
> > > > > 
> > > > > one minor question - Could we return a composite object rather than
> > > OptionalLong? For example:
> > > > > 
> > > > > class Metadata {
> > > > >   long lag;
> > > > > }
> > > > > 
> > > > > There are a bunch of 'metadata' for a partition and it is possible we
> > > want to expose more information of a partition in the future. The 
> > > composite
> > > object open a room to carry more information without adding more Public
> > > APIs to Consumer.
> > > > > 
> > > > > On 2021/02/17 05:21:06, John Roesler <vvcep...@apache.org> wrote:
> > > > > > Hello again, all,
> > > > > > 
> > > > > > Thank you for your feedback and patience. I am hopeful that
> > > > > > I have been able to come up with a solution that will
> > > > > > satisfy everyone.
> > > > > > 
> > > > > > Under a previous design iteration of the desired task idling
> > > > > > semantics in KIP-695, we did indeed require the behavior
> > > > > > change, but while I was considering all of your feedback, I
> > > > > > realized that that requirement is no longer present.
> > > > > > 
> > > > > > Just a little more detail in case you are curious: I had
> > > > > > initially wanted to make the task idling semantics
> > > > > > absolutely free of weird timing effects based on how
> > > > > > frequently Streams happens to call poll, so I built in a
> > > > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > > > reponse from poll before proceeding to enforce processing.
> > > > > > However, this resulted in a severe performance degradation,
> > > > > > so I backed off to use a cache of the lag metadata.
> > > > > > 
> > > > > > What I realized just now is that under this change, there's
> > > > > > no longer a need to return metadata (or change behavior) in
> > > > > > Consumer#poll at all. The lag cache in Streams would always
> > > > > > be identical to the one inside the Consumer's
> > > > > > SubscriptionState. Therefore, I can instead just expose the
> > > > > > Consumer's lag in a new API. Here is what I propose:
> > > > > > 
> > > > > > /**
> > > > > >  * Get the consumer's current lag on the partition. Returns
> > > > > > an "empty" {@link OptionalLong} if the lag is not known,
> > > > > >  * for example if there is no position yet, or if the end
> > > > > > offset is not known yet.
> > > > > >  *
> > > > > >  * <p>
> > > > > >  * This method uses locally cached metadata and never makes
> > > > > > a remote call.
> > > > > >  *
> > > > > >  * @param topicPartition The partition to get the lag for.
> > > > > >  *
> > > > > >  * @return This {@code Consumer} instance's current lag for
> > > > > > the given partition.
> > > > > >  *
> > > > > >  * @throws IllegalStateException if the {@code
> > > > > > topicPartition} is not assigned
> > > > > >  **/
> > > > > > @Override
> > > > > > public OptionalLong currentLag(
> > > > > >   TopicPartition topicPartition
> > > > > > );
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > With this new API, we have a handy way to find out the lag
> > > > > > of the consumer without ever incurring a remote call. There
> > > > > > is no unnecessarily low-level config option to confuse
> > > > > > users. And there is no change in the behavior of any
> > > > > > existinng API to break users' programs.
> > > > > > 
> > > > > > I have implemented this end-to-end in a preview PR:
> > > > > > https://github.com/apache/kafka/pull/10137
> > > > > > 
> > > > > > If this proposal sounds good to all of you, then I will go
> > > > > > ahead and update the KIP.
> > > > > > 
> > > > > > Sincerely yours,
> > > > > > -John
> > > > > > 
> > > > > > On Thu, 2021-02-11 at 12:16 +0000, Chia-Ping Tsai wrote:
> > > > > > > here is my two cents. If the behavior eventually gets changed
> > > (return on response), the config is more suitable as it is easier to be
> > > deprecated (less changes). For example, we can introduce the config in 2.8
> > > and then deprecate it in 2.9. 3.0 removes the config and supports only
> > > return-on-response.
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > > > > > > > Hey John, I know I'm a bit late to this party but just for the
> > > record,
> > > > > > > > I don't think it's *totally *unreasonable for a user to take up
> > > the "poll
> > > > > > > > on max timeout and assume some records will be returned"
> > > approach.
> > > > > > > > And I also can imagine plenty of manually-assigned consumers
> > > > > > > > implemented doing exactly that.
> > > > > > > > 
> > > > > > > > It's too bad that we're hitting this during the 2.8 release. If
> > > we were
> > > > > > > > having this discussion in the context of 3.0, then I'd say go
> > > for it, since
> > > > > > > > it's a breaking change that would just require some modification
> > > to the
> > > > > > > > applications' poll loop.
> > > > > > > > 
> > > > > > > > A good analogy here seems to be spurious wakeups -- you 
> > > > > > > > generally
> > > > > > > > assume that a waiting thread has woken up due to a notify event
> > > in
> > > > > > > > another thread, but the docs always make it very clear up front
> > > that
> > > > > > > > this can happen "spuriously" and therefore you need to recheck
> > > whatever
> > > > > > > > condition you were waiting on before assuming the thread should
> > > proceed.
> > > > > > > > 
> > > > > > > > Since we *didn't* document this possibility up front in the case
> > > of poll(),
> > > > > > > > it
> > > > > > > > seems unfair to suddenly change the behavior in a supposedly
> > > non-breaking
> > > > > > > > release. Imagine how many programs would break if spurious
> > > wakeups
> > > > > > > > were suddenly introduced in a release, rather than warned about
> > > from
> > > > > > > > the get-go (not a perfect analogy, far more programs rely on
> > > wait/notify
> > > > > > > > than on poll() returning records, but I think the point still
> > > stands.
> > > > > > > > 
> > > > > > > > For the record, I also agree with Ismael that a config doesn't
> > > feel ideal.
> > > > > > > > There are already enough configs to present a steep learning
> > > curve, so
> > > > > > > > I would avoid adding one more wherever possible. And it does
> > > indeed
> > > > > > > > seem possible to avoid here, since it's really just a boolean
> > > flag (rather
> > > > > > > > than a semi-unbounded space, eg max.poll.interval.ms, or a
> > > constant
> > > > > > > > value, eg group.id, where a config does feel appropriate).
> > > > > > > > 
> > > > > > > > Given all that, I would personally advocate for the pollOptions
> > > overload.
> > > > > > > > The obvious advantages here are:
> > > > > > > > 1) it's more future-proof, in that we can avoid having a similar
> > > discussion
> > > > > > > > if/when we want to consider other semantics changes to poll
> > > which some
> > > > > > > > users may want while others would not
> > > > > > > > 2) it leaves the door open to using poll with either semantics
> > > in a single
> > > > > > > > consumer. I doubt that's going to be very common in terms of the
> > > specific
> > > > > > > > option we're discussing here, but it may be more useful for
> > > other options
> > > > > > > > we may add in the future
> > > > > > > > 
> > > > > > > > Just my 2 cents. But if the pollOptions proposal would really
> > > add so much
> > > > > > > > additional work that it would cause the 2.8 release to be
> > > significantly
> > > > > > > > delayed,
> > > > > > > > then that's worth taking into account as well.
> > > > > > > > 
> > > > > > > > On Wed, Feb 10, 2021 at 9:35 AM John Roesler <
> > > vvcep...@apache.org> wrote:
> > > > > > > > 
> > > > > > > > > Hello again, all.
> > > > > > > > > 
> > > > > > > > > I have submitted the PR:
> > > > > > > > > https://github.com/apache/kafka/pull/10096
> > > > > > > > > 
> > > > > > > > > Ismael chimed in on the PR review to indicate that the
> > > > > > > > > config approach may not be desirable.
> > > > > > > > > 
> > > > > > > > > How strongly do we feel that the behavior change is
> > > > > > > > > unacceptable? It seems like most of the people involved felt
> > > > > > > > > the behavior change is ok (although the docs were wrong).
> > > > > > > > > 
> > > > > > > > > The arguments against the behavior change were plausible,
> > > > > > > > > but hypothetical.
> > > > > > > > > 
> > > > > > > > > Can everyone take a look at the PR and weigh in on whether
> > > > > > > > > the complexity of an extra config option is really worth it
> > > > > > > > > in this case?
> > > > > > > > > 
> > > > > > > > > I have to confess I'm currently leaning more toward dropping
> > > > > > > > > the config and going back to the behavior change, while
> > > > > > > > > correcting the docs and the system test.
> > > > > > > > > 
> > > > > > > > > While we are wavering on this point, the system tests
> > > > > > > > > continue to fail, and the 2.8.0 release is blocked. We
> > > > > > > > > should aim to make a call today.
> > > > > > > > > 
> > > > > > > > > Thanks all,
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote:
> > > > > > > > > > Thanks everyone for chiming in here! I'd also prefer the
> > > config approach
> > > > > > > > > if
> > > > > > > > > > compared with API changes.
> > > > > > > > > > 
> > > > > > > > > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck <
> > > bbej...@gmail.com> wrote:
> > > > > > > > > > 
> > > > > > > > > > > I meant to chime in earlier.
> > > > > > > > > > > 
> > > > > > > > > > > I also like the `PollOptions` idea, but I have to agree
> > > that the config
> > > > > > > > > > > option would be the least disruptive approach.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bill
> > > > > > > > > > > 
> > > > > > > > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler <
> > > vvcep...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > > 
> > > > > > > > > > > > Thanks, all!
> > > > > > > > > > > > 
> > > > > > > > > > > > It seems that the config I proposed is a solution that
> > > > > > > > > > > > everyone can be happy with, so I will go ahead with a PR
> > > to
> > > > > > > > > > > > fix that.
> > > > > > > > > > > > 
> > > > > > > > > > > > I'll update the KIP after a round of PR reviews, in case
> > > > > > > > > > > > there are new concerns that arise.
> > > > > > > > > > > > 
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > -John
> > > > > > > > > > > > 
> > > > > > > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax 
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > Thanks for providing more details.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Adding a config might be the way a least resistance...
> > > I am fine
> > > > > > > > > with
> > > > > > > > > > > > that.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv
> > > > > > > > > > > > > > > long_poll.mode:
> > > return_on_records|return_on_response
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > This idea LGTM. It not only makes minimum changes to
> > > current
> > > > > > > > > behavior
> > > > > > > > > > > > but also works for KIP-695.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler <
> > > vvcep...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > Hi Matthias, Chia-Ping, and Tom,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thanks for the thoughtful replies!
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Re: poll(~forever~) to block indefinitely on
> > > records:
> > > > > > > > > > > > > > > Thanks for your dilligence, Chia-Ping. While I
> > > wouldn't
> > > > > > > > > > > > > > > personally recommend for anyone to write code that
> > > blocks
> > > > > > > > > > > > > > > forever on I/O, I do agree this is something that
> > > "real
> > > > > > > > > > > > > > > people" may want to do.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Just a note for the record, this approach should
> > > only be
> > > > > > > > > > > > > > > used in conjunction with a manual assignment. If
> > > people are
> > > > > > > > > > > > > > > using a group subscription, they're setting
> > > themselves up to
> > > > > > > > > > > > > > > get kicked out of the group when there is low
> > > volume of
> > > > > > > > > > > > > > > updates on the topic. And then, when they get
> > > kicked out,
> > > > > > > > > > > > > > > they will never know it because they're just going
> > > to be
> > > > > > > > > > > > > > > blocked in `poll()` the whole time.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > However, if you don't participate in a group and
> > > just:
> > > > > > > > > > > > > > > 1 assign(partitions)
> > > > > > > > > > > > > > > 2 poll(forever),
> > > > > > > > > > > > > > > you should indeed expect to return from poll only
> > > when you
> > > > > > > > > > > > > > > have records.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > This possibility is the turning point for me. I'd
> > > like to
> > > > > > > > > > > > > > > alter my proposal to an opt-in config, detailed
> > > below.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Re: Javadoc:
> > > > > > > > > > > > > > > Thanks for pointing that out. It does seem like,
> > > if we do
> > > > > > > > > > > > > > > decide to change behavior, we should adjust the
> > > Javadoc to
> > > > > > > > > > > > > > > say so. That was an oversight on my part, and I
> > > daresay that
> > > > > > > > > > > > > > > if I had done that initially, it would have saved
> > > Rajini
> > > > > > > > > > > > > > > from having to dig into the code to pinpoint the
> > > cause of
> > > > > > > > > > > > > > > those test failures.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Re: PollOptions:
> > > > > > > > > > > > > > > I actually like this option quite a bit. It seems
> > > like this
> > > > > > > > > > > > > > > would be warranted if we expect someone to want to
> > > use the
> > > > > > > > > > > > > > > same Consumer instance in both "return on metadata
> > > or
> > > > > > > > > > > > > > > records" and "return on only records" mode.
> > > Otherwise, we
> > > > > > > > > > > > > > > might as well introduce a new config.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > It also seems like the behavior I proposed in this
> > > KIP is
> > > > > > > > > > > > > > > somewhat "advanced", so I could certainly see
> > > leaving it off
> > > > > > > > > > > > > > > by default and offering an opt-in config.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > How does everyone feel about this opt-in config:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv
> > > > > > > > > > > > > > > long_poll.mode:
> > > return_on_records|return_on_response
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > doc:
> > > > > > > > > > > > > > > * return_on_records: (default) a call to
> > > > > > > > > > > > > > > Consumer#poll(timeout) will block up to the
> > > timeout and
> > > > > > > > > > > > > > > return early if records are received.
> > > > > > > > > > > > > > > * return_on_response: a call to
> > > Consumer#poll(timeout) will
> > > > > > > > > > > > > > > block up to the timeout and return early if any
> > > fetch
> > > > > > > > > > > > > > > response is received. Use this option to get
> > > updates from
> > > > > > > > > > > > > > > Consumer#metadata() even if Consumer#records() is
> > > empty.
> > > > > > > > > > > > > > > ^^^^^^^^^^^^^^^^^^^
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley
> > > wrote:
> > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > The Javadoc for KafkaConsumer#poll() includes
> > > the following:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > * This method returns immediately if there are
> > > records
> > > > > > > > > available.
> > > > > > > > > > > > *Otherwise,
> > > > > > > > > > > > > > > > > it will await the passed timeout.*
> > > > > > > > > > > > > > > > > * If the timeout expires, an empty record set
> > > will be
> > > > > > > > > returned.
> > > > > > > > > > > > Note that
> > > > > > > > > > > > > > > > > this method may block beyond the
> > > > > > > > > > > > > > > > > * timeout in order to execute custom {@link
> > > > > > > > > > > > ConsumerRebalanceListener}
> > > > > > > > > > > > > > > > > callbacks.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > In other words: If the method returns before the
> > > timeout
> > > > > > > > > there
> > > > > > > > > > > > must be
> > > > > > > > > > > > > > > > records in the method result. After the timeout
> > > has passed
> > > > > > > > > there
> > > > > > > > > > > > may be no
> > > > > > > > > > > > > > > > records. It might block for longer than the
> > > timeout. So I
> > > > > > > > > think
> > > > > > > > > > > > returning
> > > > > > > > > > > > > > > > with empty records before at least the given
> > > timeout has
> > > > > > > > > passed
> > > > > > > > > > > > breaks that
> > > > > > > > > > > > > > > > contract.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > A not-much-prettier alternative to adding a new
> > > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) method could
> > > be
> > > > > > > > > overloading
> > > > > > > > > > > > poll() to
> > > > > > > > > > > > > > > > take an additional parameter which controlled
> > > whether an
> > > > > > > > > early
> > > > > > > > > > > > return with
> > > > > > > > > > > > > > > > empty records was allowed. Or a
> > > `poll(PollOptions)`. In the
> > > > > > > > > long
> > > > > > > > > > > > run it
> > > > > > > > > > > > > > > > could be a mistake to include in the method name
> > > exactly what
> > > > > > > > > > > > might cause
> > > > > > > > > > > > > > > > an early empty return.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Kind regards,
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Tom
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <
> > > > > > > > > > > chia7...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Thanks for your sharing Matthias. I agree that
> > > is indeed an
> > > > > > > > > > > > anti-pattern
> > > > > > > > > > > > > > > > > to assume poll() returns data or not.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > However, I check all usages of poll() in code
> > > base. There
> > > > > > > > > is an
> > > > > > > > > > > > > > > > > interesting use case - poll(a bigger timeout)
> > > - it implies
> > > > > > > > > that
> > > > > > > > > > > > callers
> > > > > > > > > > > > > > > > > want to block poll()(forever) unless there are
> > > available
> > > > > > > > > data.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > 
> > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > 
> > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Hence, I start to worry client code like
> > > aforementioned
> > > > > > > > > cases
> > > > > > > > > > > > get broken
> > > > > > > > > > > > > > > > > due to behavior change :(
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" <
> > > > > > > > > mj...@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Thanks for your email John.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I agree that it seems to be an anti-pattern
> > > to write code
> > > > > > > > > > > that
> > > > > > > > > > > > makes
> > > > > > > > > > > > > > > > > > assumptions if poll() returns data or not.
> > > Thus, we
> > > > > > > > > should
> > > > > > > > > > > > fix-forward
> > > > > > > > > > > > > > > > > > the system test from my point of view.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > From my understanding, the impact of KIP-695
> > > is that we
> > > > > > > > > might
> > > > > > > > > > > > return
> > > > > > > > > > > > > > > > > > early from poll() (ie, before the timeout
> > > passed) with no
> > > > > > > > > > > > data, only if
> > > > > > > > > > > > > > > > > > an empty fetch request comes back and there
> > > is no other
> > > > > > > > > fetch
> > > > > > > > > > > > request
> > > > > > > > > > > > > > > > > > that did return data. Thus, for most cases,
> > > poll() should
> > > > > > > > > > > > still return
> > > > > > > > > > > > > > > > > > early and provide data. -- Thus, I have no
> > > concerns with
> > > > > > > > > the
> > > > > > > > > > > > slight
> > > > > > > > > > > > > > > > > > behavior change.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Would be good to get input from others about
> > > this
> > > > > > > > > question
> > > > > > > > > > > > though.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote:
> > > > > > > > > > > > > > > > > > > Hello again all,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > I'm resurrecting this thread to discuss an
> > > issue that
> > > > > > > > > has
> > > > > > > > > > > > > > > > > > > come up after merging the code for this
> > > KIP.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > The issue is that some of the system tests
> > > need to be
> > > > > > > > > > > > > > > > > > > updated in the same way that this
> > > integration test
> > > > > > > > > needed
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > be updated:
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > 
> > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > This issue was reported here:
> > > > > > > > > > > > > > > > > > > 
> > > https://issues.apache.org/jira/browse/KAFKA-12268
> > > > > > > > > > > > > > > > > > > and there is some preliminary discussion
> > > here:
> > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/10022
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > First, let me offer my apologies for
> > > failing to catch
> > > > > > > > > this
> > > > > > > > > > > > > > > > > > > before the merge. I'm sorry that it became
> > > Rajini's
> > > > > > > > > work to
> > > > > > > > > > > > > > > > > > > track down the cause of the failure, when
> > > it was my
> > > > > > > > > > > > > > > > > > > responsibility to ensure the feature was
> > > merged safely.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > To recap the situation:
> > > > > > > > > > > > > > > > > > > Consumer#poll(Duration) will now return
> > > before the
> > > > > > > > > duration
> > > > > > > > > > > > > > > > > > > expires even if there are no records
> > > returned if there
> > > > > > > > > is
> > > > > > > > > > > > > > > > > > > some returned metadata.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > This behavior was important for KIP-695.
> > > In the
> > > > > > > > > situation
> > > > > > > > > > > > > > > > > > > where we get no records back for some
> > > partition,
> > > > > > > > > Streams
> > > > > > > > > > > > > > > > > > > needs to have the freshest possible
> > > information about
> > > > > > > > > > > > > > > > > > > whether  there are no new records on the
> > > broker, or
> > > > > > > > > whether
> > > > > > > > > > > > > > > > > > > there are records on the broker that we
> > > still need to
> > > > > > > > > > > fetch.
> > > > > > > > > > > > > > > > > > > If that's not clear, the KIP contains the
> > > full story.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > It's definitely a behavior change, but our
> > > rationale
> > > > > > > > > was
> > > > > > > > > > > > > > > > > > > that it's an acceptable behavior change.
> > > Our big
> > > > > > > > > > > alternative
> > > > > > > > > > > > > > > > > > > is to add a _new_ method to Consumer to
> > > > > > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) or
> > > something.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > It seems unreliable to expect the broker
> > > to return a
> > > > > > > > > > > > > > > > > > > particular record within a particular
> > > timeout in
> > > > > > > > > general,
> > > > > > > > > > > > > > > > > > > which is what these tests are doing. The
> > > broker can
> > > > > > > > > decide
> > > > > > > > > > > > > > > > > > > for several reasons not to return data for
> > > a
> > > > > > > > > partition, but
> > > > > > > > > > > > > > > > > > > return data for another partition instead.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > It seems like the only case where you
> > > might reasonably
> > > > > > > > > try
> > > > > > > > > > > > > > > > > > > to rely on that is in a test, where you
> > > first write a
> > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > to a partition, then you assign only that
> > > one
> > > > > > > > > partition to
> > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > consumer, then you poll on the consumer,
> > > expecting it
> > > > > > > > > to
> > > > > > > > > > > > > > > > > > > return the data you just wrote.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > So the $10 question here is whether we
> > > should support
> > > > > > > > > this
> > > > > > > > > > > > > > > > > > > apparently artificial (testing-only) use
> > > case to the
> > > > > > > > > point
> > > > > > > > > > > > > > > > > > > where it's worth adding a whole new method
> > > to the
> > > > > > > > > Consumer
> > > > > > > > > > > > > > > > > > > interface.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks all,
> > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, John
> > > Roesler wrote:
> > > > > > > > > > > > > > > > > > > > Thanks Jason,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > We would only return the metadata for
> > > the latest
> > > > > > > > > fetches.
> > > > > > > > > > > > > > > > > > > > So, if someone wanted to use this to
> > > lazily maintain
> > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > client-side metadata map for all
> > > partitions, they'd
> > > > > > > > > have
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > store it separately and merge in new
> > > updates as they
> > > > > > > > > > > > arrive.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > This way:
> > > > > > > > > > > > > > > > > > > > 1. We don't need to increase the
> > > complexity of the
> > > > > > > > > client
> > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > storing that metadata
> > > > > > > > > > > > > > > > > > > > 2. Users will be able to treat all
> > > returned metadata
> > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > "fresh" without having to reason about
> > > the
> > > > > > > > > timestamps.
> > > > > > > > > > > > > > > > > > > > 3. All parts of the returned
> > > ConsumerRecords object
> > > > > > > > > have
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > same lifecycle: all the data and
> > > metadata are the
> > > > > > > > > results
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > the most recent round of fetch responses
> > > that had not
> > > > > > > > > > > been
> > > > > > > > > > > > > > > > > > > > previously polled.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Does that seem sensible to you? I'll
> > > update the KIP
> > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > clarify this.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, Jason
> > > Gustafson
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > Hi John,
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Just one question. It wasn't very
> > > clear to me
> > > > > > > > > exactly
> > > > > > > > > > > > when the
> > > > > > > > > > > > > > > > > metadata
> > > > > > > > > > > > > > > > > > > > > would be returned in
> > > `ConsumerRecords`. Would we
> > > > > > > > > > > > /always/ include the
> > > > > > > > > > > > > > > > > > > > > metadata for all partitions that are
> > > assigned, or
> > > > > > > > > would
> > > > > > > > > > > > it be based
> > > > > > > > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > latest fetches?
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Jason
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John
> > > Roesler <
> > > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang!
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > All of your feedback sounds good to
> > > me. I’ll
> > > > > > > > > update
> > > > > > > > > > > > the KIP when I
> > > > > > > > > > > > > > > > > am able.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 3) I believe it is the position
> > > after the fetch,
> > > > > > > > > but
> > > > > > > > > > > I
> > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > confirm. I
> > > > > > > > > > > > > > > > > > > > > > think omitting position may render
> > > beginning and
> > > > > > > > > end
> > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > > > useless as
> > > > > > > > > > > > > > > > > > > > > > well, which leaves only lag. That
> > > would be fine
> > > > > > > > > with
> > > > > > > > > > > > me, but it
> > > > > > > > > > > > > > > > > also seems
> > > > > > > > > > > > > > > > > > > > > > nice to supply this extra metadata
> > > since it is
> > > > > > > > > well
> > > > > > > > > > > > defined and
> > > > > > > > > > > > > > > > > probably
> > > > > > > > > > > > > > > > > > > > > > handy for others. Therefore, I’d go
> > > the route of
> > > > > > > > > > > > specifying the
> > > > > > > > > > > > > > > > > exact
> > > > > > > > > > > > > > > > > > > > > > semantics and keeping it.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Thanks for the review,
> > > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36,
> > > Guozhang Wang
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > Hello John,
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Thanks for the updates! I've made
> > > a pass on
> > > > > > > > > the KIP
> > > > > > > > > > > > and also the
> > > > > > > > > > > > > > > > > POC PR,
> > > > > > > > > > > > > > > > > > > > > > > here are some minor comments:
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 1) nit: "receivedTimestamp" -> it
> > > seems the
> > > > > > > > > > > metadata
> > > > > > > > > > > > keep getting
> > > > > > > > > > > > > > > > > > > > > > updated,
> > > > > > > > > > > > > > > > > > > > > > > and we do not create a new object
> > > but just
> > > > > > > > > update
> > > > > > > > > > > > the values
> > > > > > > > > > > > > > > > > in-place, so
> > > > > > > > > > > > > > > > > > > > > > > maybe calling it
> > > `lastUpdateTimstamp` is
> > > > > > > > > better?
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 2) It will be great to verify in
> > > javadocs that
> > > > > > > > > the
> > > > > > > > > > > > new API
> > > > > > > > > > > > > > > > > > > > > > > "ConsumerRecords#metadata():
> > > > > > > > > Map<TopicPartition,
> > > > > > > > > > > > Metadata>" may
> > > > > > > > > > > > > > > > > return a
> > > > > > > > > > > > > > > > > > > > > > > superset of TopicPartitions than
> > > the existing
> > > > > > > > > API
> > > > > > > > > > > > that returns the
> > > > > > > > > > > > > > > > > data
> > > > > > > > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > > > > partitions, in case users assume
> > > their map
> > > > > > > > > > > > key-entries would
> > > > > > > > > > > > > > > > > always be
> > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > same.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 3) The "position()" API of the
> > > call needs
> > > > > > > > > better
> > > > > > > > > > > > clarification: is
> > > > > > > > > > > > > > > > > it the
> > > > > > > > > > > > > > > > > > > > > > > current position AFTER the records
> > > are
> > > > > > > > > returned, or
> > > > > > > > > > > > is it BEFORE
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > records are returned? Personally
> > > I'd suggest
> > > > > > > > > we do
> > > > > > > > > > > > not include it
> > > > > > > > > > > > > > > > > if it
> > > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > > not used anywhere yet just to
> > > avoid possible
> > > > > > > > > > > > misuage, but I'm fine
> > > > > > > > > > > > > > > > > if you
> > > > > > > > > > > > > > > > > > > > > > > like to keep it still; in that
> > > case just
> > > > > > > > > clarify
> > > > > > > > > > > its
> > > > > > > > > > > > semantics.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Other than that,I'm +1 on the KIP
> > > as well !
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM
> > > Walker Carlson
> > > > > > > > > <
> > > > > > > > > > > > > > > > > wcarl...@confluent.io>
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > walker
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM
> > > Bruno
> > > > > > > > > Cadonna <
> > > > > > > > > > > > br...@confluent.io
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP, John!
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > Bruno
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > On 08.12.20 18:03, John
> > > Roesler wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > There hasn't been much
> > > discussion on
> > > > > > > > > KIP-695
> > > > > > > > > > > > so far, so I'd
> > > > > > > > > > > > > > > > > > > > > > > > > > like to go ahead and call
> > > for a vote.
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > As a reminder, the purpose
> > > of KIP-695 to
> > > > > > > > > > > > improve on the
> > > > > > > > > > > > > > > > > > > > > > > > > > "task idling" feature we
> > > introduced in
> > > > > > > > > > > > KIP-353. This KIP
> > > > > > > > > > > > > > > > > > > > > > > > > > will allow Streams to offer
> > > deterministic
> > > > > > > > > > > time
> > > > > > > > > > > > semantics in
> > > > > > > > > > > > > > > > > > > > > > > > > > join-type topologies. For
> > > example, it
> > > > > > > > > makes
> > > > > > > > > > > > sure that
> > > > > > > > > > > > > > > > > > > > > > > > > > when you join two topics,
> > > that we
> > > > > > > > > collate the
> > > > > > > > > > > > topics by
> > > > > > > > > > > > > > > > > > > > > > > > > > timestamp. That was always
> > > the intent
> > > > > > > > > with
> > > > > > > > > > > > task idling (KIP-
> > > > > > > > > > > > > > > > > > > > > > > > > > 353), but it turns out the
> > > previous
> > > > > > > > > mechanism
> > > > > > > > > > > > couldn't
> > > > > > > > > > > > > > > > > > > > > > > > > > provide the desired
> > > semantics.
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > The details are here:
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > 
> > > > 
> > > 
> > > 
> > > 
> 
> 


Reply via email to