Thanks, Bill,

I was waiting for feedback on the "currentLag" proposal
before updating the KIP. Since there haven't been any
objections to my new proposal, I'm in the process of
updating the KIP document right now.

Personally, I still like the original proposal to expose the
metadata from the fetch responses as well, but there were
too many side-effects of going that route.

Once I finish updating the wiki page, I'll ping here again
to check for objections.

Thank you for taking another look!
-John

On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote:
> Thanks for the KIP update John,
> 
> The KIP as it stands LGTM.
> 
> One question, in your previous comment you stated that the KIP would
> introduce the `currentLag()` method,  but the KIP seems to show the
> `metadata()` implementation.
> FWIW I like the metadata approach as it seems to provide information the
> consumer has access to when it receives the fetch response.
> 
> Thanks again.
> 
> Bill
> 
> On Mon, Feb 22, 2021 at 10:51 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hello all,
> > 
> > Since there haven't been any big objections to my proposed
> > design fix, I'm updating the KIP now and opening the PR for
> > reviews. Hopefully, we can get this merged quickly and
> > unblock the system tests.
> > 
> > Thanks,
> > John
> > 
> > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > > Thanks for that idea, Chia-Ping,
> > > 
> > > I agree that it would be nice for users to have a single API
> > > to get all kinds of metadata, but I'd rather introduce it in
> > > a KIP that would expose more than one piece of metadata. At
> > > the moment, I am motivated to keep this as simple as
> > > possible, which means only exposing the lag, so it seems
> > > better to introduce just "currentLag".
> > > 
> > > A later KIP could introduce some kind of "local metadata"
> > > API and deprecate this method without harm. That later KIP
> > > would be in a better position to take its time to find a
> > > good name, consider which pieces of metadata would be nice
> > > to have, etc.
> > > 
> > > Thanks sincerely for all your reviews,
> > > John
> > > 
> > > On Wed, 2021-02-17 at 11:54 +0000, Chia-Ping Tsai wrote:
> > > > That new solution is good to me as it brings fewer changes than either
> > options or config.
> > > > 
> > > > one minor question - Could we return a composite object rather than
> > OptionalLong? For example:
> > > > 
> > > > class Metadata {
> > > >   long lag;
> > > > }
> > > > 
> > > > There are a bunch of 'metadata' for a partition and it is possible we
> > want to expose more information of a partition in the future. The composite
> > object open a room to carry more information without adding more Public
> > APIs to Consumer.
> > > > 
> > > > On 2021/02/17 05:21:06, John Roesler <vvcep...@apache.org> wrote:
> > > > > Hello again, all,
> > > > > 
> > > > > Thank you for your feedback and patience. I am hopeful that
> > > > > I have been able to come up with a solution that will
> > > > > satisfy everyone.
> > > > > 
> > > > > Under a previous design iteration of the desired task idling
> > > > > semantics in KIP-695, we did indeed require the behavior
> > > > > change, but while I was considering all of your feedback, I
> > > > > realized that that requirement is no longer present.
> > > > > 
> > > > > Just a little more detail in case you are curious: I had
> > > > > initially wanted to make the task idling semantics
> > > > > absolutely free of weird timing effects based on how
> > > > > frequently Streams happens to call poll, so I built in a
> > > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > > reponse from poll before proceeding to enforce processing.
> > > > > However, this resulted in a severe performance degradation,
> > > > > so I backed off to use a cache of the lag metadata.
> > > > > 
> > > > > What I realized just now is that under this change, there's
> > > > > no longer a need to return metadata (or change behavior) in
> > > > > Consumer#poll at all. The lag cache in Streams would always
> > > > > be identical to the one inside the Consumer's
> > > > > SubscriptionState. Therefore, I can instead just expose the
> > > > > Consumer's lag in a new API. Here is what I propose:
> > > > > 
> > > > > /**
> > > > >  * Get the consumer's current lag on the partition. Returns
> > > > > an "empty" {@link OptionalLong} if the lag is not known,
> > > > >  * for example if there is no position yet, or if the end
> > > > > offset is not known yet.
> > > > >  *
> > > > >  * <p>
> > > > >  * This method uses locally cached metadata and never makes
> > > > > a remote call.
> > > > >  *
> > > > >  * @param topicPartition The partition to get the lag for.
> > > > >  *
> > > > >  * @return This {@code Consumer} instance's current lag for
> > > > > the given partition.
> > > > >  *
> > > > >  * @throws IllegalStateException if the {@code
> > > > > topicPartition} is not assigned
> > > > >  **/
> > > > > @Override
> > > > > public OptionalLong currentLag(
> > > > >   TopicPartition topicPartition
> > > > > );
> > > > > 
> > > > > 
> > > > > 
> > > > > With this new API, we have a handy way to find out the lag
> > > > > of the consumer without ever incurring a remote call. There
> > > > > is no unnecessarily low-level config option to confuse
> > > > > users. And there is no change in the behavior of any
> > > > > existinng API to break users' programs.
> > > > > 
> > > > > I have implemented this end-to-end in a preview PR:
> > > > > https://github.com/apache/kafka/pull/10137
> > > > > 
> > > > > If this proposal sounds good to all of you, then I will go
> > > > > ahead and update the KIP.
> > > > > 
> > > > > Sincerely yours,
> > > > > -John
> > > > > 
> > > > > On Thu, 2021-02-11 at 12:16 +0000, Chia-Ping Tsai wrote:
> > > > > > here is my two cents. If the behavior eventually gets changed
> > (return on response), the config is more suitable as it is easier to be
> > deprecated (less changes). For example, we can introduce the config in 2.8
> > and then deprecate it in 2.9. 3.0 removes the config and supports only
> > return-on-response.
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > > > > > > Hey John, I know I'm a bit late to this party but just for the
> > record,
> > > > > > > I don't think it's *totally *unreasonable for a user to take up
> > the "poll
> > > > > > > on max timeout and assume some records will be returned"
> > approach.
> > > > > > > And I also can imagine plenty of manually-assigned consumers
> > > > > > > implemented doing exactly that.
> > > > > > > 
> > > > > > > It's too bad that we're hitting this during the 2.8 release. If
> > we were
> > > > > > > having this discussion in the context of 3.0, then I'd say go
> > for it, since
> > > > > > > it's a breaking change that would just require some modification
> > to the
> > > > > > > applications' poll loop.
> > > > > > > 
> > > > > > > A good analogy here seems to be spurious wakeups -- you generally
> > > > > > > assume that a waiting thread has woken up due to a notify event
> > in
> > > > > > > another thread, but the docs always make it very clear up front
> > that
> > > > > > > this can happen "spuriously" and therefore you need to recheck
> > whatever
> > > > > > > condition you were waiting on before assuming the thread should
> > proceed.
> > > > > > > 
> > > > > > > Since we *didn't* document this possibility up front in the case
> > of poll(),
> > > > > > > it
> > > > > > > seems unfair to suddenly change the behavior in a supposedly
> > non-breaking
> > > > > > > release. Imagine how many programs would break if spurious
> > wakeups
> > > > > > > were suddenly introduced in a release, rather than warned about
> > from
> > > > > > > the get-go (not a perfect analogy, far more programs rely on
> > wait/notify
> > > > > > > than on poll() returning records, but I think the point still
> > stands.
> > > > > > > 
> > > > > > > For the record, I also agree with Ismael that a config doesn't
> > feel ideal.
> > > > > > > There are already enough configs to present a steep learning
> > curve, so
> > > > > > > I would avoid adding one more wherever possible. And it does
> > indeed
> > > > > > > seem possible to avoid here, since it's really just a boolean
> > flag (rather
> > > > > > > than a semi-unbounded space, eg max.poll.interval.ms, or a
> > constant
> > > > > > > value, eg group.id, where a config does feel appropriate).
> > > > > > > 
> > > > > > > Given all that, I would personally advocate for the pollOptions
> > overload.
> > > > > > > The obvious advantages here are:
> > > > > > > 1) it's more future-proof, in that we can avoid having a similar
> > discussion
> > > > > > > if/when we want to consider other semantics changes to poll
> > which some
> > > > > > > users may want while others would not
> > > > > > > 2) it leaves the door open to using poll with either semantics
> > in a single
> > > > > > > consumer. I doubt that's going to be very common in terms of the
> > specific
> > > > > > > option we're discussing here, but it may be more useful for
> > other options
> > > > > > > we may add in the future
> > > > > > > 
> > > > > > > Just my 2 cents. But if the pollOptions proposal would really
> > add so much
> > > > > > > additional work that it would cause the 2.8 release to be
> > significantly
> > > > > > > delayed,
> > > > > > > then that's worth taking into account as well.
> > > > > > > 
> > > > > > > On Wed, Feb 10, 2021 at 9:35 AM John Roesler <
> > vvcep...@apache.org> wrote:
> > > > > > > 
> > > > > > > > Hello again, all.
> > > > > > > > 
> > > > > > > > I have submitted the PR:
> > > > > > > > https://github.com/apache/kafka/pull/10096
> > > > > > > > 
> > > > > > > > Ismael chimed in on the PR review to indicate that the
> > > > > > > > config approach may not be desirable.
> > > > > > > > 
> > > > > > > > How strongly do we feel that the behavior change is
> > > > > > > > unacceptable? It seems like most of the people involved felt
> > > > > > > > the behavior change is ok (although the docs were wrong).
> > > > > > > > 
> > > > > > > > The arguments against the behavior change were plausible,
> > > > > > > > but hypothetical.
> > > > > > > > 
> > > > > > > > Can everyone take a look at the PR and weigh in on whether
> > > > > > > > the complexity of an extra config option is really worth it
> > > > > > > > in this case?
> > > > > > > > 
> > > > > > > > I have to confess I'm currently leaning more toward dropping
> > > > > > > > the config and going back to the behavior change, while
> > > > > > > > correcting the docs and the system test.
> > > > > > > > 
> > > > > > > > While we are wavering on this point, the system tests
> > > > > > > > continue to fail, and the 2.8.0 release is blocked. We
> > > > > > > > should aim to make a call today.
> > > > > > > > 
> > > > > > > > Thanks all,
> > > > > > > > -John
> > > > > > > > 
> > > > > > > > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote:
> > > > > > > > > Thanks everyone for chiming in here! I'd also prefer the
> > config approach
> > > > > > > > if
> > > > > > > > > compared with API changes.
> > > > > > > > > 
> > > > > > > > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck <
> > bbej...@gmail.com> wrote:
> > > > > > > > > 
> > > > > > > > > > I meant to chime in earlier.
> > > > > > > > > > 
> > > > > > > > > > I also like the `PollOptions` idea, but I have to agree
> > that the config
> > > > > > > > > > option would be the least disruptive approach.
> > > > > > > > > > 
> > > > > > > > > > Thanks,
> > > > > > > > > > Bill
> > > > > > > > > > 
> > > > > > > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler <
> > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > > Thanks, all!
> > > > > > > > > > > 
> > > > > > > > > > > It seems that the config I proposed is a solution that
> > > > > > > > > > > everyone can be happy with, so I will go ahead with a PR
> > to
> > > > > > > > > > > fix that.
> > > > > > > > > > > 
> > > > > > > > > > > I'll update the KIP after a round of PR reviews, in case
> > > > > > > > > > > there are new concerns that arise.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks,
> > > > > > > > > > > -John
> > > > > > > > > > > 
> > > > > > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > > > > > > > > > > Thanks for providing more details.
> > > > > > > > > > > > 
> > > > > > > > > > > > Adding a config might be the way a least resistance...
> > I am fine
> > > > > > > > with
> > > > > > > > > > > that.
> > > > > > > > > > > > 
> > > > > > > > > > > > -Matthias
> > > > > > > > > > > > 
> > > > > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv
> > > > > > > > > > > > > > long_poll.mode:
> > return_on_records|return_on_response
> > > > > > > > > > > > > 
> > > > > > > > > > > > > This idea LGTM. It not only makes minimum changes to
> > current
> > > > > > > > behavior
> > > > > > > > > > > but also works for KIP-695.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler <
> > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > > > > > Hi Matthias, Chia-Ping, and Tom,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thanks for the thoughtful replies!
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Re: poll(~forever~) to block indefinitely on
> > records:
> > > > > > > > > > > > > > Thanks for your dilligence, Chia-Ping. While I
> > wouldn't
> > > > > > > > > > > > > > personally recommend for anyone to write code that
> > blocks
> > > > > > > > > > > > > > forever on I/O, I do agree this is something that
> > "real
> > > > > > > > > > > > > > people" may want to do.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Just a note for the record, this approach should
> > only be
> > > > > > > > > > > > > > used in conjunction with a manual assignment. If
> > people are
> > > > > > > > > > > > > > using a group subscription, they're setting
> > themselves up to
> > > > > > > > > > > > > > get kicked out of the group when there is low
> > volume of
> > > > > > > > > > > > > > updates on the topic. And then, when they get
> > kicked out,
> > > > > > > > > > > > > > they will never know it because they're just going
> > to be
> > > > > > > > > > > > > > blocked in `poll()` the whole time.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > However, if you don't participate in a group and
> > just:
> > > > > > > > > > > > > > 1 assign(partitions)
> > > > > > > > > > > > > > 2 poll(forever),
> > > > > > > > > > > > > > you should indeed expect to return from poll only
> > when you
> > > > > > > > > > > > > > have records.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > This possibility is the turning point for me. I'd
> > like to
> > > > > > > > > > > > > > alter my proposal to an opt-in config, detailed
> > below.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Re: Javadoc:
> > > > > > > > > > > > > > Thanks for pointing that out. It does seem like,
> > if we do
> > > > > > > > > > > > > > decide to change behavior, we should adjust the
> > Javadoc to
> > > > > > > > > > > > > > say so. That was an oversight on my part, and I
> > daresay that
> > > > > > > > > > > > > > if I had done that initially, it would have saved
> > Rajini
> > > > > > > > > > > > > > from having to dig into the code to pinpoint the
> > cause of
> > > > > > > > > > > > > > those test failures.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Re: PollOptions:
> > > > > > > > > > > > > > I actually like this option quite a bit. It seems
> > like this
> > > > > > > > > > > > > > would be warranted if we expect someone to want to
> > use the
> > > > > > > > > > > > > > same Consumer instance in both "return on metadata
> > or
> > > > > > > > > > > > > > records" and "return on only records" mode.
> > Otherwise, we
> > > > > > > > > > > > > > might as well introduce a new config.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > It also seems like the behavior I proposed in this
> > KIP is
> > > > > > > > > > > > > > somewhat "advanced", so I could certainly see
> > leaving it off
> > > > > > > > > > > > > > by default and offering an opt-in config.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > How does everyone feel about this opt-in config:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv
> > > > > > > > > > > > > > long_poll.mode:
> > return_on_records|return_on_response
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > doc:
> > > > > > > > > > > > > > * return_on_records: (default) a call to
> > > > > > > > > > > > > > Consumer#poll(timeout) will block up to the
> > timeout and
> > > > > > > > > > > > > > return early if records are received.
> > > > > > > > > > > > > > * return_on_response: a call to
> > Consumer#poll(timeout) will
> > > > > > > > > > > > > > block up to the timeout and return early if any
> > fetch
> > > > > > > > > > > > > > response is received. Use this option to get
> > updates from
> > > > > > > > > > > > > > Consumer#metadata() even if Consumer#records() is
> > empty.
> > > > > > > > > > > > > > ^^^^^^^^^^^^^^^^^^^
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > John
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley
> > wrote:
> > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > The Javadoc for KafkaConsumer#poll() includes
> > the following:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > * This method returns immediately if there are
> > records
> > > > > > > > available.
> > > > > > > > > > > *Otherwise,
> > > > > > > > > > > > > > > > it will await the passed timeout.*
> > > > > > > > > > > > > > > > * If the timeout expires, an empty record set
> > will be
> > > > > > > > returned.
> > > > > > > > > > > Note that
> > > > > > > > > > > > > > > > this method may block beyond the
> > > > > > > > > > > > > > > > * timeout in order to execute custom {@link
> > > > > > > > > > > ConsumerRebalanceListener}
> > > > > > > > > > > > > > > > callbacks.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > In other words: If the method returns before the
> > timeout
> > > > > > > > there
> > > > > > > > > > > must be
> > > > > > > > > > > > > > > records in the method result. After the timeout
> > has passed
> > > > > > > > there
> > > > > > > > > > > may be no
> > > > > > > > > > > > > > > records. It might block for longer than the
> > timeout. So I
> > > > > > > > think
> > > > > > > > > > > returning
> > > > > > > > > > > > > > > with empty records before at least the given
> > timeout has
> > > > > > > > passed
> > > > > > > > > > > breaks that
> > > > > > > > > > > > > > > contract.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > A not-much-prettier alternative to adding a new
> > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) method could
> > be
> > > > > > > > overloading
> > > > > > > > > > > poll() to
> > > > > > > > > > > > > > > take an additional parameter which controlled
> > whether an
> > > > > > > > early
> > > > > > > > > > > return with
> > > > > > > > > > > > > > > empty records was allowed. Or a
> > `poll(PollOptions)`. In the
> > > > > > > > long
> > > > > > > > > > > run it
> > > > > > > > > > > > > > > could be a mistake to include in the method name
> > exactly what
> > > > > > > > > > > might cause
> > > > > > > > > > > > > > > an early empty return.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Kind regards,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Tom
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai <
> > > > > > > > > > chia7...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Thanks for your sharing Matthias. I agree that
> > is indeed an
> > > > > > > > > > > anti-pattern
> > > > > > > > > > > > > > > > to assume poll() returns data or not.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > However, I check all usages of poll() in code
> > base. There
> > > > > > > > is an
> > > > > > > > > > > > > > > > interesting use case - poll(a bigger timeout)
> > - it implies
> > > > > > > > that
> > > > > > > > > > > callers
> > > > > > > > > > > > > > > > want to block poll()(forever) unless there are
> > available
> > > > > > > > data.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > 
> > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > 
> > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Hence, I start to worry client code like
> > aforementioned
> > > > > > > > cases
> > > > > > > > > > > get broken
> > > > > > > > > > > > > > > > due to behavior change :(
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" <
> > > > > > > > mj...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > Thanks for your email John.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I agree that it seems to be an anti-pattern
> > to write code
> > > > > > > > > > that
> > > > > > > > > > > makes
> > > > > > > > > > > > > > > > > assumptions if poll() returns data or not.
> > Thus, we
> > > > > > > > should
> > > > > > > > > > > fix-forward
> > > > > > > > > > > > > > > > > the system test from my point of view.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > From my understanding, the impact of KIP-695
> > is that we
> > > > > > > > might
> > > > > > > > > > > return
> > > > > > > > > > > > > > > > > early from poll() (ie, before the timeout
> > passed) with no
> > > > > > > > > > > data, only if
> > > > > > > > > > > > > > > > > an empty fetch request comes back and there
> > is no other
> > > > > > > > fetch
> > > > > > > > > > > request
> > > > > > > > > > > > > > > > > that did return data. Thus, for most cases,
> > poll() should
> > > > > > > > > > > still return
> > > > > > > > > > > > > > > > > early and provide data. -- Thus, I have no
> > concerns with
> > > > > > > > the
> > > > > > > > > > > slight
> > > > > > > > > > > > > > > > > behavior change.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Would be good to get input from others about
> > this
> > > > > > > > question
> > > > > > > > > > > though.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote:
> > > > > > > > > > > > > > > > > > Hello again all,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm resurrecting this thread to discuss an
> > issue that
> > > > > > > > has
> > > > > > > > > > > > > > > > > > come up after merging the code for this
> > KIP.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > The issue is that some of the system tests
> > need to be
> > > > > > > > > > > > > > > > > > updated in the same way that this
> > integration test
> > > > > > > > needed
> > > > > > > > > > to
> > > > > > > > > > > > > > > > > > be updated:
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > 
> > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > This issue was reported here:
> > > > > > > > > > > > > > > > > > 
> > https://issues.apache.org/jira/browse/KAFKA-12268
> > > > > > > > > > > > > > > > > > and there is some preliminary discussion
> > here:
> > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/10022
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > First, let me offer my apologies for
> > failing to catch
> > > > > > > > this
> > > > > > > > > > > > > > > > > > before the merge. I'm sorry that it became
> > Rajini's
> > > > > > > > work to
> > > > > > > > > > > > > > > > > > track down the cause of the failure, when
> > it was my
> > > > > > > > > > > > > > > > > > responsibility to ensure the feature was
> > merged safely.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > To recap the situation:
> > > > > > > > > > > > > > > > > > Consumer#poll(Duration) will now return
> > before the
> > > > > > > > duration
> > > > > > > > > > > > > > > > > > expires even if there are no records
> > returned if there
> > > > > > > > is
> > > > > > > > > > > > > > > > > > some returned metadata.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > This behavior was important for KIP-695.
> > In the
> > > > > > > > situation
> > > > > > > > > > > > > > > > > > where we get no records back for some
> > partition,
> > > > > > > > Streams
> > > > > > > > > > > > > > > > > > needs to have the freshest possible
> > information about
> > > > > > > > > > > > > > > > > > whether  there are no new records on the
> > broker, or
> > > > > > > > whether
> > > > > > > > > > > > > > > > > > there are records on the broker that we
> > still need to
> > > > > > > > > > fetch.
> > > > > > > > > > > > > > > > > > If that's not clear, the KIP contains the
> > full story.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > It's definitely a behavior change, but our
> > rationale
> > > > > > > > was
> > > > > > > > > > > > > > > > > > that it's an acceptable behavior change.
> > Our big
> > > > > > > > > > alternative
> > > > > > > > > > > > > > > > > > is to add a _new_ method to Consumer to
> > > > > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) or
> > something.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > It seems unreliable to expect the broker
> > to return a
> > > > > > > > > > > > > > > > > > particular record within a particular
> > timeout in
> > > > > > > > general,
> > > > > > > > > > > > > > > > > > which is what these tests are doing. The
> > broker can
> > > > > > > > decide
> > > > > > > > > > > > > > > > > > for several reasons not to return data for
> > a
> > > > > > > > partition, but
> > > > > > > > > > > > > > > > > > return data for another partition instead.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > It seems like the only case where you
> > might reasonably
> > > > > > > > try
> > > > > > > > > > > > > > > > > > to rely on that is in a test, where you
> > first write a
> > > > > > > > > > record
> > > > > > > > > > > > > > > > > > to a partition, then you assign only that
> > one
> > > > > > > > partition to
> > > > > > > > > > a
> > > > > > > > > > > > > > > > > > consumer, then you poll on the consumer,
> > expecting it
> > > > > > > > to
> > > > > > > > > > > > > > > > > > return the data you just wrote.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > So the $10 question here is whether we
> > should support
> > > > > > > > this
> > > > > > > > > > > > > > > > > > apparently artificial (testing-only) use
> > case to the
> > > > > > > > point
> > > > > > > > > > > > > > > > > > where it's worth adding a whole new method
> > to the
> > > > > > > > Consumer
> > > > > > > > > > > > > > > > > > interface.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Thanks all,
> > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, John
> > Roesler wrote:
> > > > > > > > > > > > > > > > > > > Thanks Jason,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > We would only return the metadata for
> > the latest
> > > > > > > > fetches.
> > > > > > > > > > > > > > > > > > > So, if someone wanted to use this to
> > lazily maintain
> > > > > > > > a
> > > > > > > > > > > > > > > > > > > client-side metadata map for all
> > partitions, they'd
> > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > store it separately and merge in new
> > updates as they
> > > > > > > > > > > arrive.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > This way:
> > > > > > > > > > > > > > > > > > > 1. We don't need to increase the
> > complexity of the
> > > > > > > > client
> > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > storing that metadata
> > > > > > > > > > > > > > > > > > > 2. Users will be able to treat all
> > returned metadata
> > > > > > > > as
> > > > > > > > > > > > > > > > > > > "fresh" without having to reason about
> > the
> > > > > > > > timestamps.
> > > > > > > > > > > > > > > > > > > 3. All parts of the returned
> > ConsumerRecords object
> > > > > > > > have
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > same lifecycle: all the data and
> > metadata are the
> > > > > > > > results
> > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > the most recent round of fetch responses
> > that had not
> > > > > > > > > > been
> > > > > > > > > > > > > > > > > > > previously polled.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Does that seem sensible to you? I'll
> > update the KIP
> > > > > > > > to
> > > > > > > > > > > > > > > > > > > clarify this.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, Jason
> > Gustafson
> > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Hi John,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Just one question. It wasn't very
> > clear to me
> > > > > > > > exactly
> > > > > > > > > > > when the
> > > > > > > > > > > > > > > > metadata
> > > > > > > > > > > > > > > > > > > > would be returned in
> > `ConsumerRecords`. Would we
> > > > > > > > > > > /always/ include the
> > > > > > > > > > > > > > > > > > > > metadata for all partitions that are
> > assigned, or
> > > > > > > > would
> > > > > > > > > > > it be based
> > > > > > > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > latest fetches?
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Jason
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John
> > Roesler <
> > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang!
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > All of your feedback sounds good to
> > me. I’ll
> > > > > > > > update
> > > > > > > > > > > the KIP when I
> > > > > > > > > > > > > > > > am able.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 3) I believe it is the position
> > after the fetch,
> > > > > > > > but
> > > > > > > > > > I
> > > > > > > > > > > will
> > > > > > > > > > > > > > > > confirm. I
> > > > > > > > > > > > > > > > > > > > > think omitting position may render
> > beginning and
> > > > > > > > end
> > > > > > > > > > > offsets
> > > > > > > > > > > > > > > > useless as
> > > > > > > > > > > > > > > > > > > > > well, which leaves only lag. That
> > would be fine
> > > > > > > > with
> > > > > > > > > > > me, but it
> > > > > > > > > > > > > > > > also seems
> > > > > > > > > > > > > > > > > > > > > nice to supply this extra metadata
> > since it is
> > > > > > > > well
> > > > > > > > > > > defined and
> > > > > > > > > > > > > > > > probably
> > > > > > > > > > > > > > > > > > > > > handy for others. Therefore, I’d go
> > the route of
> > > > > > > > > > > specifying the
> > > > > > > > > > > > > > > > exact
> > > > > > > > > > > > > > > > > > > > > semantics and keeping it.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Thanks for the review,
> > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36,
> > Guozhang Wang
> > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > Hello John,
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Thanks for the updates! I've made
> > a pass on
> > > > > > > > the KIP
> > > > > > > > > > > and also the
> > > > > > > > > > > > > > > > POC PR,
> > > > > > > > > > > > > > > > > > > > > > here are some minor comments:
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 1) nit: "receivedTimestamp" -> it
> > seems the
> > > > > > > > > > metadata
> > > > > > > > > > > keep getting
> > > > > > > > > > > > > > > > > > > > > updated,
> > > > > > > > > > > > > > > > > > > > > > and we do not create a new object
> > but just
> > > > > > > > update
> > > > > > > > > > > the values
> > > > > > > > > > > > > > > > in-place, so
> > > > > > > > > > > > > > > > > > > > > > maybe calling it
> > `lastUpdateTimstamp` is
> > > > > > > > better?
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 2) It will be great to verify in
> > javadocs that
> > > > > > > > the
> > > > > > > > > > > new API
> > > > > > > > > > > > > > > > > > > > > > "ConsumerRecords#metadata():
> > > > > > > > Map<TopicPartition,
> > > > > > > > > > > Metadata>" may
> > > > > > > > > > > > > > > > return a
> > > > > > > > > > > > > > > > > > > > > > superset of TopicPartitions than
> > the existing
> > > > > > > > API
> > > > > > > > > > > that returns the
> > > > > > > > > > > > > > > > data
> > > > > > > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > > > partitions, in case users assume
> > their map
> > > > > > > > > > > key-entries would
> > > > > > > > > > > > > > > > always be
> > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > same.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 3) The "position()" API of the
> > call needs
> > > > > > > > better
> > > > > > > > > > > clarification: is
> > > > > > > > > > > > > > > > it the
> > > > > > > > > > > > > > > > > > > > > > current position AFTER the records
> > are
> > > > > > > > returned, or
> > > > > > > > > > > is it BEFORE
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > records are returned? Personally
> > I'd suggest
> > > > > > > > we do
> > > > > > > > > > > not include it
> > > > > > > > > > > > > > > > if it
> > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > not used anywhere yet just to
> > avoid possible
> > > > > > > > > > > misuage, but I'm fine
> > > > > > > > > > > > > > > > if you
> > > > > > > > > > > > > > > > > > > > > > like to keep it still; in that
> > case just
> > > > > > > > clarify
> > > > > > > > > > its
> > > > > > > > > > > semantics.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Other than that,I'm +1 on the KIP
> > as well !
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Guozhang
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM
> > Walker Carlson
> > > > > > > > <
> > > > > > > > > > > > > > > > wcarl...@confluent.io>
> > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > walker
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM
> > Bruno
> > > > > > > > Cadonna <
> > > > > > > > > > > br...@confluent.io
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP, John!
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding)
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > Bruno
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > On 08.12.20 18:03, John
> > Roesler wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > There hasn't been much
> > discussion on
> > > > > > > > KIP-695
> > > > > > > > > > > so far, so I'd
> > > > > > > > > > > > > > > > > > > > > > > > > like to go ahead and call
> > for a vote.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > As a reminder, the purpose
> > of KIP-695 to
> > > > > > > > > > > improve on the
> > > > > > > > > > > > > > > > > > > > > > > > > "task idling" feature we
> > introduced in
> > > > > > > > > > > KIP-353. This KIP
> > > > > > > > > > > > > > > > > > > > > > > > > will allow Streams to offer
> > deterministic
> > > > > > > > > > time
> > > > > > > > > > > semantics in
> > > > > > > > > > > > > > > > > > > > > > > > > join-type topologies. For
> > example, it
> > > > > > > > makes
> > > > > > > > > > > sure that
> > > > > > > > > > > > > > > > > > > > > > > > > when you join two topics,
> > that we
> > > > > > > > collate the
> > > > > > > > > > > topics by
> > > > > > > > > > > > > > > > > > > > > > > > > timestamp. That was always
> > the intent
> > > > > > > > with
> > > > > > > > > > > task idling (KIP-
> > > > > > > > > > > > > > > > > > > > > > > > > 353), but it turns out the
> > previous
> > > > > > > > mechanism
> > > > > > > > > > > couldn't
> > > > > > > > > > > > > > > > > > > > > > > > > provide the desired
> > semantics.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > The details are here:
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > 
> > > 
> > 
> > 
> > 


Reply via email to