Hello again, all. I have submitted the PR: https://github.com/apache/kafka/pull/10096
Ismael chimed in on the PR review to indicate that the config approach may not be desirable. How strongly do we feel that the behavior change is unacceptable? It seems like most of the people involved felt the behavior change is ok (although the docs were wrong). The arguments against the behavior change were plausible, but hypothetical. Can everyone take a look at the PR and weigh in on whether the complexity of an extra config option is really worth it in this case? I have to confess I'm currently leaning more toward dropping the config and going back to the behavior change, while correcting the docs and the system test. While we are wavering on this point, the system tests continue to fail, and the 2.8.0 release is blocked. We should aim to make a call today. Thanks all, -John On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote: > Thanks everyone for chiming in here! I'd also prefer the config approach if > compared with API changes. > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > I meant to chime in earlier. > > > > I also like the `PollOptions` idea, but I have to agree that the config > > option would be the least disruptive approach. > > > > Thanks, > > Bill > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler <vvcep...@apache.org> wrote: > > > > > Thanks, all! > > > > > > It seems that the config I proposed is a solution that > > > everyone can be happy with, so I will go ahead with a PR to > > > fix that. > > > > > > I'll update the KIP after a round of PR reviews, in case > > > there are new concerns that arise. > > > > > > Thanks, > > > -John > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > > > Thanks for providing more details. > > > > > > > > Adding a config might be the way a least resistance... I am fine with > > > that. > > > > > > > > -Matthias > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > > > vvvvvvvvvvvvvvvvvvv > > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > This idea LGTM. It not only makes minimum changes to current behavior > > > but also works for KIP-695. > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler <vvcep...@apache.org> wrote: > > > > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > > > > > > > Thanks for the thoughtful replies! > > > > > > > > > > > > Re: poll(~forever~) to block indefinitely on records: > > > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't > > > > > > personally recommend for anyone to write code that blocks > > > > > > forever on I/O, I do agree this is something that "real > > > > > > people" may want to do. > > > > > > > > > > > > Just a note for the record, this approach should only be > > > > > > used in conjunction with a manual assignment. If people are > > > > > > using a group subscription, they're setting themselves up to > > > > > > get kicked out of the group when there is low volume of > > > > > > updates on the topic. And then, when they get kicked out, > > > > > > they will never know it because they're just going to be > > > > > > blocked in `poll()` the whole time. > > > > > > > > > > > > However, if you don't participate in a group and just: > > > > > > 1 assign(partitions) > > > > > > 2 poll(forever), > > > > > > you should indeed expect to return from poll only when you > > > > > > have records. > > > > > > > > > > > > This possibility is the turning point for me. I'd like to > > > > > > alter my proposal to an opt-in config, detailed below. > > > > > > > > > > > > Re: Javadoc: > > > > > > Thanks for pointing that out. It does seem like, if we do > > > > > > decide to change behavior, we should adjust the Javadoc to > > > > > > say so. That was an oversight on my part, and I daresay that > > > > > > if I had done that initially, it would have saved Rajini > > > > > > from having to dig into the code to pinpoint the cause of > > > > > > those test failures. > > > > > > > > > > > > Re: PollOptions: > > > > > > I actually like this option quite a bit. It seems like this > > > > > > would be warranted if we expect someone to want to use the > > > > > > same Consumer instance in both "return on metadata or > > > > > > records" and "return on only records" mode. Otherwise, we > > > > > > might as well introduce a new config. > > > > > > > > > > > > It also seems like the behavior I proposed in this KIP is > > > > > > somewhat "advanced", so I could certainly see leaving it off > > > > > > by default and offering an opt-in config. > > > > > > > > > > > > How does everyone feel about this opt-in config: > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv > > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > > > doc: > > > > > > * return_on_records: (default) a call to > > > > > > Consumer#poll(timeout) will block up to the timeout and > > > > > > return early if records are received. > > > > > > * return_on_response: a call to Consumer#poll(timeout) will > > > > > > block up to the timeout and return early if any fetch > > > > > > response is received. Use this option to get updates from > > > > > > Consumer#metadata() even if Consumer#records() is empty. > > > > > > ^^^^^^^^^^^^^^^^^^^ > > > > > > > > > > > > Thanks, > > > > > > John > > > > > > > > > > > > On Thu, 2021-02-04 at 08:44 +0000, Tom Bentley wrote: > > > > > > > Hi, > > > > > > > > > > > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > > > > > > > > > > > * This method returns immediately if there are records available. > > > *Otherwise, > > > > > > > > it will await the passed timeout.* > > > > > > > > * If the timeout expires, an empty record set will be returned. > > > Note that > > > > > > > > this method may block beyond the > > > > > > > > * timeout in order to execute custom {@link > > > ConsumerRebalanceListener} > > > > > > > > callbacks. > > > > > > > > > > > > > > > > > > > > > > In other words: If the method returns before the timeout there > > > must be > > > > > > > records in the method result. After the timeout has passed there > > > may be no > > > > > > > records. It might block for longer than the timeout. So I think > > > returning > > > > > > > with empty records before at least the given timeout has passed > > > breaks that > > > > > > > contract. > > > > > > > > > > > > > > A not-much-prettier alternative to adding a new > > > > > > > pollForRecordsOrMetadata(Duration) method could be overloading > > > poll() to > > > > > > > take an additional parameter which controlled whether an early > > > return with > > > > > > > empty records was allowed. Or a `poll(PollOptions)`. In the long > > > run it > > > > > > > could be a mistake to include in the method name exactly what > > > might cause > > > > > > > an early empty return. > > > > > > > > > > > > > > Kind regards, > > > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai < > > chia7...@apache.org> > > > wrote: > > > > > > > > > > > > > > > Thanks for your sharing Matthias. I agree that is indeed an > > > anti-pattern > > > > > > > > to assume poll() returns data or not. > > > > > > > > > > > > > > > > However, I check all usages of poll() in code base. There is an > > > > > > > > interesting use case - poll(a bigger timeout) - it implies that > > > callers > > > > > > > > want to block poll()(forever) unless there are available data. > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > > > > > > > > [2] > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > > > > > > > > > > > > > > > Hence, I start to worry client code like aforementioned cases > > > get broken > > > > > > > > due to behavior change :( > > > > > > > > > > > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org> > > > wrote: > > > > > > > > > Thanks for your email John. > > > > > > > > > > > > > > > > > > I agree that it seems to be an anti-pattern to write code > > that > > > makes > > > > > > > > > assumptions if poll() returns data or not. Thus, we should > > > fix-forward > > > > > > > > > the system test from my point of view. > > > > > > > > > > > > > > > > > > From my understanding, the impact of KIP-695 is that we might > > > return > > > > > > > > > early from poll() (ie, before the timeout passed) with no > > > data, only if > > > > > > > > > an empty fetch request comes back and there is no other fetch > > > request > > > > > > > > > that did return data. Thus, for most cases, poll() should > > > still return > > > > > > > > > early and provide data. -- Thus, I have no concerns with the > > > slight > > > > > > > > > behavior change. > > > > > > > > > > > > > > > > > > Would be good to get input from others about this question > > > though. > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote: > > > > > > > > > > Hello again all, > > > > > > > > > > > > > > > > > > > > I'm resurrecting this thread to discuss an issue that has > > > > > > > > > > come up after merging the code for this KIP. > > > > > > > > > > > > > > > > > > > > The issue is that some of the system tests need to be > > > > > > > > > > updated in the same way that this integration test needed > > to > > > > > > > > > > be updated: > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > > > > > > > > > > > > > > > > > This issue was reported here: > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12268 > > > > > > > > > > and there is some preliminary discussion here: > > > > > > > > > > https://github.com/apache/kafka/pull/10022 > > > > > > > > > > > > > > > > > > > > First, let me offer my apologies for failing to catch this > > > > > > > > > > before the merge. I'm sorry that it became Rajini's work to > > > > > > > > > > track down the cause of the failure, when it was my > > > > > > > > > > responsibility to ensure the feature was merged safely. > > > > > > > > > > > > > > > > > > > > To recap the situation: > > > > > > > > > > Consumer#poll(Duration) will now return before the duration > > > > > > > > > > expires even if there are no records returned if there is > > > > > > > > > > some returned metadata. > > > > > > > > > > > > > > > > > > > > This behavior was important for KIP-695. In the situation > > > > > > > > > > where we get no records back for some partition, Streams > > > > > > > > > > needs to have the freshest possible information about > > > > > > > > > > whether there are no new records on the broker, or whether > > > > > > > > > > there are records on the broker that we still need to > > fetch. > > > > > > > > > > If that's not clear, the KIP contains the full story. > > > > > > > > > > > > > > > > > > > > It's definitely a behavior change, but our rationale was > > > > > > > > > > that it's an acceptable behavior change. Our big > > alternative > > > > > > > > > > is to add a _new_ method to Consumer to > > > > > > > > > > pollForRecordsOrMetadata(Duration) or something. > > > > > > > > > > > > > > > > > > > > It seems unreliable to expect the broker to return a > > > > > > > > > > particular record within a particular timeout in general, > > > > > > > > > > which is what these tests are doing. The broker can decide > > > > > > > > > > for several reasons not to return data for a partition, but > > > > > > > > > > return data for another partition instead. > > > > > > > > > > > > > > > > > > > > It seems like the only case where you might reasonably try > > > > > > > > > > to rely on that is in a test, where you first write a > > record > > > > > > > > > > to a partition, then you assign only that one partition to > > a > > > > > > > > > > consumer, then you poll on the consumer, expecting it to > > > > > > > > > > return the data you just wrote. > > > > > > > > > > > > > > > > > > > > So the $10 question here is whether we should support this > > > > > > > > > > apparently artificial (testing-only) use case to the point > > > > > > > > > > where it's worth adding a whole new method to the Consumer > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > Thanks all, > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: > > > > > > > > > > > Thanks Jason, > > > > > > > > > > > > > > > > > > > > > > We would only return the metadata for the latest fetches. > > > > > > > > > > > So, if someone wanted to use this to lazily maintain a > > > > > > > > > > > client-side metadata map for all partitions, they'd have > > to > > > > > > > > > > > store it separately and merge in new updates as they > > > arrive. > > > > > > > > > > > > > > > > > > > > > > This way: > > > > > > > > > > > 1. We don't need to increase the complexity of the client > > > by > > > > > > > > > > > storing that metadata > > > > > > > > > > > 2. Users will be able to treat all returned metadata as > > > > > > > > > > > "fresh" without having to reason about the timestamps. > > > > > > > > > > > 3. All parts of the returned ConsumerRecords object have > > > the > > > > > > > > > > > same lifecycle: all the data and metadata are the results > > > of > > > > > > > > > > > the most recent round of fetch responses that had not > > been > > > > > > > > > > > previously polled. > > > > > > > > > > > > > > > > > > > > > > Does that seem sensible to you? I'll update the KIP to > > > > > > > > > > > clarify this. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > > > > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > > > > > > > Just one question. It wasn't very clear to me exactly > > > when the > > > > > > > > metadata > > > > > > > > > > > > would be returned in `ConsumerRecords`. Would we > > > /always/ include the > > > > > > > > > > > > metadata for all partitions that are assigned, or would > > > it be based > > > > > > > > on the > > > > > > > > > > > > latest fetches? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler < > > > vvcep...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang! > > > > > > > > > > > > > > > > > > > > > > > > > > All of your feedback sounds good to me. I’ll update > > > the KIP when I > > > > > > > > am able. > > > > > > > > > > > > > > > > > > > > > > > > > > 3) I believe it is the position after the fetch, but > > I > > > will > > > > > > > > confirm. I > > > > > > > > > > > > > think omitting position may render beginning and end > > > offsets > > > > > > > > useless as > > > > > > > > > > > > > well, which leaves only lag. That would be fine with > > > me, but it > > > > > > > > also seems > > > > > > > > > > > > > nice to supply this extra metadata since it is well > > > defined and > > > > > > > > probably > > > > > > > > > > > > > handy for others. Therefore, I’d go the route of > > > specifying the > > > > > > > > exact > > > > > > > > > > > > > semantics and keeping it. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the review, > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > > > > > > > > > > > Hello John, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updates! I've made a pass on the KIP > > > and also the > > > > > > > > POC PR, > > > > > > > > > > > > > > here are some minor comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the > > metadata > > > keep getting > > > > > > > > > > > > > updated, > > > > > > > > > > > > > > and we do not create a new object but just update > > > the values > > > > > > > > in-place, so > > > > > > > > > > > > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) It will be great to verify in javadocs that the > > > new API > > > > > > > > > > > > > > "ConsumerRecords#metadata(): Map<TopicPartition, > > > Metadata>" may > > > > > > > > return a > > > > > > > > > > > > > > superset of TopicPartitions than the existing API > > > that returns the > > > > > > > > data > > > > > > > > > > > > > by > > > > > > > > > > > > > > partitions, in case users assume their map > > > key-entries would > > > > > > > > always be > > > > > > > > > > > > > the > > > > > > > > > > > > > > same. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) The "position()" API of the call needs better > > > clarification: is > > > > > > > > it the > > > > > > > > > > > > > > current position AFTER the records are returned, or > > > is it BEFORE > > > > > > > > the > > > > > > > > > > > > > > records are returned? Personally I'd suggest we do > > > not include it > > > > > > > > if it > > > > > > > > > > > > > is > > > > > > > > > > > > > > not used anywhere yet just to avoid possible > > > misuage, but I'm fine > > > > > > > > if you > > > > > > > > > > > > > > like to keep it still; in that case just clarify > > its > > > semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson < > > > > > > > > wcarl...@confluent.io> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > walker > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna < > > > br...@confluent.io > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Bruno > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There hasn't been much discussion on KIP-695 > > > so far, so I'd > > > > > > > > > > > > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to > > > improve on the > > > > > > > > > > > > > > > > > "task idling" feature we introduced in > > > KIP-353. This KIP > > > > > > > > > > > > > > > > > will allow Streams to offer deterministic > > time > > > semantics in > > > > > > > > > > > > > > > > > join-type topologies. For example, it makes > > > sure that > > > > > > > > > > > > > > > > > when you join two topics, that we collate the > > > topics by > > > > > > > > > > > > > > > > > timestamp. That was always the intent with > > > task idling (KIP- > > > > > > > > > > > > > > > > > 353), but it turns out the previous mechanism > > > couldn't > > > > > > > > > > > > > > > > > provide the desired semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The details are here: > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >