Hello John, Thanks for the new PR, and sorry for zig-zagging this KIP design. I made a pass on the PR and it looks good to me overall. Left some minor comments.
Guozhang On Mon, Feb 22, 2021 at 9:06 AM John Roesler <vvcep...@apache.org> wrote: > Hello all, > > I have updated the KIP now. The diff is visible here: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12 > > The PR https://github.com/apache/kafka/pull/10137 is now > available for reviews. > > Thanks so much to you all for your help on this! > John > > On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote: > > Thanks, Bill, > > > > I was waiting for feedback on the "currentLag" proposal > > before updating the KIP. Since there haven't been any > > objections to my new proposal, I'm in the process of > > updating the KIP document right now. > > > > Personally, I still like the original proposal to expose the > > metadata from the fetch responses as well, but there were > > too many side-effects of going that route. > > > > Once I finish updating the wiki page, I'll ping here again > > to check for objections. > > > > Thank you for taking another look! > > -John > > > > On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote: > > > Thanks for the KIP update John, > > > > > > The KIP as it stands LGTM. > > > > > > One question, in your previous comment you stated that the KIP would > > > introduce the `currentLag()` method, but the KIP seems to show the > > > `metadata()` implementation. > > > FWIW I like the metadata approach as it seems to provide information > the > > > consumer has access to when it receives the fetch response. > > > > > > Thanks again. > > > > > > Bill > > > > > > On Mon, Feb 22, 2021 at 10:51 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > > > Hello all, > > > > > > > > Since there haven't been any big objections to my proposed > > > > design fix, I'm updating the KIP now and opening the PR for > > > > reviews. Hopefully, we can get this merged quickly and > > > > unblock the system tests. > > > > > > > > Thanks, > > > > John > > > > > > > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > > > > > Thanks for that idea, Chia-Ping, > > > > > > > > > > I agree that it would be nice for users to have a single API > > > > > to get all kinds of metadata, but I'd rather introduce it in > > > > > a KIP that would expose more than one piece of metadata. At > > > > > the moment, I am motivated to keep this as simple as > > > > > possible, which means only exposing the lag, so it seems > > > > > better to introduce just "currentLag". > > > > > > > > > > A later KIP could introduce some kind of "local metadata" > > > > > API and deprecate this method without harm. That later KIP > > > > > would be in a better position to take its time to find a > > > > > good name, consider which pieces of metadata would be nice > > > > > to have, etc. > > > > > > > > > > Thanks sincerely for all your reviews, > > > > > John > > > > > > > > > > On Wed, 2021-02-17 at 11:54 +0000, Chia-Ping Tsai wrote: > > > > > > That new solution is good to me as it brings fewer changes than > either > > > > options or config. > > > > > > > > > > > > one minor question - Could we return a composite object rather > than > > > > OptionalLong? For example: > > > > > > > > > > > > class Metadata { > > > > > > long lag; > > > > > > } > > > > > > > > > > > > There are a bunch of 'metadata' for a partition and it is > possible we > > > > want to expose more information of a partition in the future. The > composite > > > > object open a room to carry more information without adding more > Public > > > > APIs to Consumer. > > > > > > > > > > > > On 2021/02/17 05:21:06, John Roesler <vvcep...@apache.org> > wrote: > > > > > > > Hello again, all, > > > > > > > > > > > > > > Thank you for your feedback and patience. I am hopeful that > > > > > > > I have been able to come up with a solution that will > > > > > > > satisfy everyone. > > > > > > > > > > > > > > Under a previous design iteration of the desired task idling > > > > > > > semantics in KIP-695, we did indeed require the behavior > > > > > > > change, but while I was considering all of your feedback, I > > > > > > > realized that that requirement is no longer present. > > > > > > > > > > > > > > Just a little more detail in case you are curious: I had > > > > > > > initially wanted to make the task idling semantics > > > > > > > absolutely free of weird timing effects based on how > > > > > > > frequently Streams happens to call poll, so I built in a > > > > > > > mechanism that would force Streams to get back a _fresh_ lag > > > > > > > reponse from poll before proceeding to enforce processing. > > > > > > > However, this resulted in a severe performance degradation, > > > > > > > so I backed off to use a cache of the lag metadata. > > > > > > > > > > > > > > What I realized just now is that under this change, there's > > > > > > > no longer a need to return metadata (or change behavior) in > > > > > > > Consumer#poll at all. The lag cache in Streams would always > > > > > > > be identical to the one inside the Consumer's > > > > > > > SubscriptionState. Therefore, I can instead just expose the > > > > > > > Consumer's lag in a new API. Here is what I propose: > > > > > > > > > > > > > > /** > > > > > > > * Get the consumer's current lag on the partition. Returns > > > > > > > an "empty" {@link OptionalLong} if the lag is not known, > > > > > > > * for example if there is no position yet, or if the end > > > > > > > offset is not known yet. > > > > > > > * > > > > > > > * <p> > > > > > > > * This method uses locally cached metadata and never makes > > > > > > > a remote call. > > > > > > > * > > > > > > > * @param topicPartition The partition to get the lag for. > > > > > > > * > > > > > > > * @return This {@code Consumer} instance's current lag for > > > > > > > the given partition. > > > > > > > * > > > > > > > * @throws IllegalStateException if the {@code > > > > > > > topicPartition} is not assigned > > > > > > > **/ > > > > > > > @Override > > > > > > > public OptionalLong currentLag( > > > > > > > TopicPartition topicPartition > > > > > > > ); > > > > > > > > > > > > > > > > > > > > > > > > > > > > With this new API, we have a handy way to find out the lag > > > > > > > of the consumer without ever incurring a remote call. There > > > > > > > is no unnecessarily low-level config option to confuse > > > > > > > users. And there is no change in the behavior of any > > > > > > > existinng API to break users' programs. > > > > > > > > > > > > > > I have implemented this end-to-end in a preview PR: > > > > > > > https://github.com/apache/kafka/pull/10137 > > > > > > > > > > > > > > If this proposal sounds good to all of you, then I will go > > > > > > > ahead and update the KIP. > > > > > > > > > > > > > > Sincerely yours, > > > > > > > -John > > > > > > > > > > > > > > On Thu, 2021-02-11 at 12:16 +0000, Chia-Ping Tsai wrote: > > > > > > > > here is my two cents. If the behavior eventually gets changed > > > > (return on response), the config is more suitable as it is easier to > be > > > > deprecated (less changes). For example, we can introduce the config > in 2.8 > > > > and then deprecate it in 2.9. 3.0 removes the config and supports > only > > > > return-on-response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > > > > > > > > > Hey John, I know I'm a bit late to this party but just for > the > > > > record, > > > > > > > > > I don't think it's *totally *unreasonable for a user to > take up > > > > the "poll > > > > > > > > > on max timeout and assume some records will be returned" > > > > approach. > > > > > > > > > And I also can imagine plenty of manually-assigned > consumers > > > > > > > > > implemented doing exactly that. > > > > > > > > > > > > > > > > > > It's too bad that we're hitting this during the 2.8 > release. If > > > > we were > > > > > > > > > having this discussion in the context of 3.0, then I'd say > go > > > > for it, since > > > > > > > > > it's a breaking change that would just require some > modification > > > > to the > > > > > > > > > applications' poll loop. > > > > > > > > > > > > > > > > > > A good analogy here seems to be spurious wakeups -- you > generally > > > > > > > > > assume that a waiting thread has woken up due to a notify > event > > > > in > > > > > > > > > another thread, but the docs always make it very clear up > front > > > > that > > > > > > > > > this can happen "spuriously" and therefore you need to > recheck > > > > whatever > > > > > > > > > condition you were waiting on before assuming the thread > should > > > > proceed. > > > > > > > > > > > > > > > > > > Since we *didn't* document this possibility up front in > the case > > > > of poll(), > > > > > > > > > it > > > > > > > > > seems unfair to suddenly change the behavior in a > supposedly > > > > non-breaking > > > > > > > > > release. Imagine how many programs would break if spurious > > > > wakeups > > > > > > > > > were suddenly introduced in a release, rather than warned > about > > > > from > > > > > > > > > the get-go (not a perfect analogy, far more programs rely > on > > > > wait/notify > > > > > > > > > than on poll() returning records, but I think the point > still > > > > stands. > > > > > > > > > > > > > > > > > > For the record, I also agree with Ismael that a config > doesn't > > > > feel ideal. > > > > > > > > > There are already enough configs to present a steep > learning > > > > curve, so > > > > > > > > > I would avoid adding one more wherever possible. And it > does > > > > indeed > > > > > > > > > seem possible to avoid here, since it's really just a > boolean > > > > flag (rather > > > > > > > > > than a semi-unbounded space, eg max.poll.interval.ms, or a > > > > constant > > > > > > > > > value, eg group.id, where a config does feel appropriate). > > > > > > > > > > > > > > > > > > Given all that, I would personally advocate for the > pollOptions > > > > overload. > > > > > > > > > The obvious advantages here are: > > > > > > > > > 1) it's more future-proof, in that we can avoid having a > similar > > > > discussion > > > > > > > > > if/when we want to consider other semantics changes to poll > > > > which some > > > > > > > > > users may want while others would not > > > > > > > > > 2) it leaves the door open to using poll with either > semantics > > > > in a single > > > > > > > > > consumer. I doubt that's going to be very common in terms > of the > > > > specific > > > > > > > > > option we're discussing here, but it may be more useful for > > > > other options > > > > > > > > > we may add in the future > > > > > > > > > > > > > > > > > > Just my 2 cents. But if the pollOptions proposal would > really > > > > add so much > > > > > > > > > additional work that it would cause the 2.8 release to be > > > > significantly > > > > > > > > > delayed, > > > > > > > > > then that's worth taking into account as well. > > > > > > > > > > > > > > > > > > On Wed, Feb 10, 2021 at 9:35 AM John Roesler < > > > > vvcep...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > Hello again, all. > > > > > > > > > > > > > > > > > > > > I have submitted the PR: > > > > > > > > > > https://github.com/apache/kafka/pull/10096 > > > > > > > > > > > > > > > > > > > > Ismael chimed in on the PR review to indicate that the > > > > > > > > > > config approach may not be desirable. > > > > > > > > > > > > > > > > > > > > How strongly do we feel that the behavior change is > > > > > > > > > > unacceptable? It seems like most of the people involved > felt > > > > > > > > > > the behavior change is ok (although the docs were wrong). > > > > > > > > > > > > > > > > > > > > The arguments against the behavior change were plausible, > > > > > > > > > > but hypothetical. > > > > > > > > > > > > > > > > > > > > Can everyone take a look at the PR and weigh in on > whether > > > > > > > > > > the complexity of an extra config option is really worth > it > > > > > > > > > > in this case? > > > > > > > > > > > > > > > > > > > > I have to confess I'm currently leaning more toward > dropping > > > > > > > > > > the config and going back to the behavior change, while > > > > > > > > > > correcting the docs and the system test. > > > > > > > > > > > > > > > > > > > > While we are wavering on this point, the system tests > > > > > > > > > > continue to fail, and the 2.8.0 release is blocked. We > > > > > > > > > > should aim to make a call today. > > > > > > > > > > > > > > > > > > > > Thanks all, > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote: > > > > > > > > > > > Thanks everyone for chiming in here! I'd also prefer > the > > > > config approach > > > > > > > > > > if > > > > > > > > > > > compared with API changes. > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck < > > > > bbej...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > I meant to chime in earlier. > > > > > > > > > > > > > > > > > > > > > > > > I also like the `PollOptions` idea, but I have to > agree > > > > that the config > > > > > > > > > > > > option would be the least disruptive approach. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Bill > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler < > > > > vvcep...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, all! > > > > > > > > > > > > > > > > > > > > > > > > > > It seems that the config I proposed is a solution > that > > > > > > > > > > > > > everyone can be happy with, so I will go ahead > with a PR > > > > to > > > > > > > > > > > > > fix that. > > > > > > > > > > > > > > > > > > > > > > > > > > I'll update the KIP after a round of PR reviews, > in case > > > > > > > > > > > > > there are new concerns that arise. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax > wrote: > > > > > > > > > > > > > > Thanks for providing more details. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Adding a config might be the way a least > resistance... > > > > I am fine > > > > > > > > > > with > > > > > > > > > > > > > that. > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv > > > > > > > > > > > > > > > > long_poll.mode: > > > > return_on_records|return_on_response > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This idea LGTM. It not only makes minimum > changes to > > > > current > > > > > > > > > > behavior > > > > > > > > > > > > > but also works for KIP-695. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler < > > > > vvcep...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the thoughtful replies! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Re: poll(~forever~) to block indefinitely on > > > > records: > > > > > > > > > > > > > > > > Thanks for your dilligence, Chia-Ping. While > I > > > > wouldn't > > > > > > > > > > > > > > > > personally recommend for anyone to write > code that > > > > blocks > > > > > > > > > > > > > > > > forever on I/O, I do agree this is something > that > > > > "real > > > > > > > > > > > > > > > > people" may want to do. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Just a note for the record, this approach > should > > > > only be > > > > > > > > > > > > > > > > used in conjunction with a manual > assignment. If > > > > people are > > > > > > > > > > > > > > > > using a group subscription, they're setting > > > > themselves up to > > > > > > > > > > > > > > > > get kicked out of the group when there is low > > > > volume of > > > > > > > > > > > > > > > > updates on the topic. And then, when they get > > > > kicked out, > > > > > > > > > > > > > > > > they will never know it because they're just > going > > > > to be > > > > > > > > > > > > > > > > blocked in `poll()` the whole time. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, if you don't participate in a group > and > > > > just: > > > > > > > > > > > > > > > > 1 assign(partitions) > > > > > > > > > > > > > > > > 2 poll(forever), > > > > > > > > > > > > > > > > you should indeed expect to return from poll > only > > > > when you > > > > > > > > > > > > > > > > have records. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This possibility is the turning point for > me. I'd > > > > like to > > > > > > > > > > > > > > > > alter my proposal to an opt-in config, > detailed > > > > below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Re: Javadoc: > > > > > > > > > > > > > > > > Thanks for pointing that out. It does seem > like, > > > > if we do > > > > > > > > > > > > > > > > decide to change behavior, we should adjust > the > > > > Javadoc to > > > > > > > > > > > > > > > > say so. That was an oversight on my part, > and I > > > > daresay that > > > > > > > > > > > > > > > > if I had done that initially, it would have > saved > > > > Rajini > > > > > > > > > > > > > > > > from having to dig into the code to pinpoint > the > > > > cause of > > > > > > > > > > > > > > > > those test failures. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Re: PollOptions: > > > > > > > > > > > > > > > > I actually like this option quite a bit. It > seems > > > > like this > > > > > > > > > > > > > > > > would be warranted if we expect someone to > want to > > > > use the > > > > > > > > > > > > > > > > same Consumer instance in both "return on > metadata > > > > or > > > > > > > > > > > > > > > > records" and "return on only records" mode. > > > > Otherwise, we > > > > > > > > > > > > > > > > might as well introduce a new config. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It also seems like the behavior I proposed > in this > > > > KIP is > > > > > > > > > > > > > > > > somewhat "advanced", so I could certainly see > > > > leaving it off > > > > > > > > > > > > > > > > by default and offering an opt-in config. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How does everyone feel about this opt-in > config: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > vvvvvvvvvvvvvvvvvvv > > > > > > > > > > > > > > > > long_poll.mode: > > > > return_on_records|return_on_response > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > doc: > > > > > > > > > > > > > > > > * return_on_records: (default) a call to > > > > > > > > > > > > > > > > Consumer#poll(timeout) will block up to the > > > > timeout and > > > > > > > > > > > > > > > > return early if records are received. > > > > > > > > > > > > > > > > * return_on_response: a call to > > > > Consumer#poll(timeout) will > > > > > > > > > > > > > > > > block up to the timeout and return early if > any > > > > fetch > > > > > > > > > > > > > > > > response is received. Use this option to get > > > > updates from > > > > > > > > > > > > > > > > Consumer#metadata() even if > Consumer#records() is > > > > empty. > > > > > > > > > > > > > > > > ^^^^^^^^^^^^^^^^^^^ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 2021-02-04 at 08:44 +0000, Tom > Bentley > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The Javadoc for KafkaConsumer#poll() > includes > > > > the following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > * This method returns immediately if there > are > > > > records > > > > > > > > > > available. > > > > > > > > > > > > > *Otherwise, > > > > > > > > > > > > > > > > > > it will await the passed timeout.* > > > > > > > > > > > > > > > > > > * If the timeout expires, an empty > record set > > > > will be > > > > > > > > > > returned. > > > > > > > > > > > > > Note that > > > > > > > > > > > > > > > > > > this method may block beyond the > > > > > > > > > > > > > > > > > > * timeout in order to execute custom > {@link > > > > > > > > > > > > > ConsumerRebalanceListener} > > > > > > > > > > > > > > > > > > callbacks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In other words: If the method returns > before the > > > > timeout > > > > > > > > > > there > > > > > > > > > > > > > must be > > > > > > > > > > > > > > > > > records in the method result. After the > timeout > > > > has passed > > > > > > > > > > there > > > > > > > > > > > > > may be no > > > > > > > > > > > > > > > > > records. It might block for longer than the > > > > timeout. So I > > > > > > > > > > think > > > > > > > > > > > > > returning > > > > > > > > > > > > > > > > > with empty records before at least the > given > > > > timeout has > > > > > > > > > > passed > > > > > > > > > > > > > breaks that > > > > > > > > > > > > > > > > > contract. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A not-much-prettier alternative to adding > a new > > > > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) method > could > > > > be > > > > > > > > > > overloading > > > > > > > > > > > > > poll() to > > > > > > > > > > > > > > > > > take an additional parameter which > controlled > > > > whether an > > > > > > > > > > early > > > > > > > > > > > > > return with > > > > > > > > > > > > > > > > > empty records was allowed. Or a > > > > `poll(PollOptions)`. In the > > > > > > > > > > long > > > > > > > > > > > > > run it > > > > > > > > > > > > > > > > > could be a mistake to include in the > method name > > > > exactly what > > > > > > > > > > > > > might cause > > > > > > > > > > > > > > > > > an early empty return. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Kind regards, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping > Tsai < > > > > > > > > > > > > chia7...@apache.org> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your sharing Matthias. I > agree that > > > > is indeed an > > > > > > > > > > > > > anti-pattern > > > > > > > > > > > > > > > > > > to assume poll() returns data or not. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, I check all usages of poll() in > code > > > > base. There > > > > > > > > > > is an > > > > > > > > > > > > > > > > > > interesting use case - poll(a bigger > timeout) > > > > - it implies > > > > > > > > > > that > > > > > > > > > > > > > callers > > > > > > > > > > > > > > > > > > want to block poll()(forever) unless > there are > > > > available > > > > > > > > > > data. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hence, I start to worry client code like > > > > aforementioned > > > > > > > > > > cases > > > > > > > > > > > > > get broken > > > > > > > > > > > > > > > > > > due to behavior change :( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2021/02/03 22:59:09, "Matthias J. > Sax" < > > > > > > > > > > mj...@apache.org> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for your email John. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that it seems to be an > anti-pattern > > > > to write code > > > > > > > > > > > > that > > > > > > > > > > > > > makes > > > > > > > > > > > > > > > > > > > assumptions if poll() returns data or > not. > > > > Thus, we > > > > > > > > > > should > > > > > > > > > > > > > fix-forward > > > > > > > > > > > > > > > > > > > the system test from my point of view. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > From my understanding, the impact of > KIP-695 > > > > is that we > > > > > > > > > > might > > > > > > > > > > > > > return > > > > > > > > > > > > > > > > > > > early from poll() (ie, before the > timeout > > > > passed) with no > > > > > > > > > > > > > data, only if > > > > > > > > > > > > > > > > > > > an empty fetch request comes back and > there > > > > is no other > > > > > > > > > > fetch > > > > > > > > > > > > > request > > > > > > > > > > > > > > > > > > > that did return data. Thus, for most > cases, > > > > poll() should > > > > > > > > > > > > > still return > > > > > > > > > > > > > > > > > > > early and provide data. -- Thus, I > have no > > > > concerns with > > > > > > > > > > the > > > > > > > > > > > > > slight > > > > > > > > > > > > > > > > > > > behavior change. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Would be good to get input from others > about > > > > this > > > > > > > > > > question > > > > > > > > > > > > > though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2/3/21 10:06 AM, John Roesler wrote: > > > > > > > > > > > > > > > > > > > > Hello again all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm resurrecting this thread to > discuss an > > > > issue that > > > > > > > > > > has > > > > > > > > > > > > > > > > > > > > come up after merging the code for > this > > > > KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The issue is that some of the system > tests > > > > need to be > > > > > > > > > > > > > > > > > > > > updated in the same way that this > > > > integration test > > > > > > > > > > needed > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > be updated: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This issue was reported here: > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12268 > > > > > > > > > > > > > > > > > > > > and there is some preliminary > discussion > > > > here: > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/10022 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > First, let me offer my apologies for > > > > failing to catch > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > before the merge. I'm sorry that it > became > > > > Rajini's > > > > > > > > > > work to > > > > > > > > > > > > > > > > > > > > track down the cause of the failure, > when > > > > it was my > > > > > > > > > > > > > > > > > > > > responsibility to ensure the feature > was > > > > merged safely. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To recap the situation: > > > > > > > > > > > > > > > > > > > > Consumer#poll(Duration) will now > return > > > > before the > > > > > > > > > > duration > > > > > > > > > > > > > > > > > > > > expires even if there are no records > > > > returned if there > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > some returned metadata. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This behavior was important for > KIP-695. > > > > In the > > > > > > > > > > situation > > > > > > > > > > > > > > > > > > > > where we get no records back for some > > > > partition, > > > > > > > > > > Streams > > > > > > > > > > > > > > > > > > > > needs to have the freshest possible > > > > information about > > > > > > > > > > > > > > > > > > > > whether there are no new records on > the > > > > broker, or > > > > > > > > > > whether > > > > > > > > > > > > > > > > > > > > there are records on the broker that > we > > > > still need to > > > > > > > > > > > > fetch. > > > > > > > > > > > > > > > > > > > > If that's not clear, the KIP > contains the > > > > full story. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's definitely a behavior change, > but our > > > > rationale > > > > > > > > > > was > > > > > > > > > > > > > > > > > > > > that it's an acceptable behavior > change. > > > > Our big > > > > > > > > > > > > alternative > > > > > > > > > > > > > > > > > > > > is to add a _new_ method to Consumer > to > > > > > > > > > > > > > > > > > > > > pollForRecordsOrMetadata(Duration) or > > > > something. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems unreliable to expect the > broker > > > > to return a > > > > > > > > > > > > > > > > > > > > particular record within a particular > > > > timeout in > > > > > > > > > > general, > > > > > > > > > > > > > > > > > > > > which is what these tests are doing. > The > > > > broker can > > > > > > > > > > decide > > > > > > > > > > > > > > > > > > > > for several reasons not to return > data for > > > > a > > > > > > > > > > partition, but > > > > > > > > > > > > > > > > > > > > return data for another partition > instead. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems like the only case where you > > > > might reasonably > > > > > > > > > > try > > > > > > > > > > > > > > > > > > > > to rely on that is in a test, where > you > > > > first write a > > > > > > > > > > > > record > > > > > > > > > > > > > > > > > > > > to a partition, then you assign only > that > > > > one > > > > > > > > > > partition to > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > > consumer, then you poll on the > consumer, > > > > expecting it > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > return the data you just wrote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So the $10 question here is whether > we > > > > should support > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > apparently artificial (testing-only) > use > > > > case to the > > > > > > > > > > point > > > > > > > > > > > > > > > > > > > > where it's worth adding a whole new > method > > > > to the > > > > > > > > > > Consumer > > > > > > > > > > > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks all, > > > > > > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, 2020-12-17 at 13:18 -0600, > John > > > > Roesler wrote: > > > > > > > > > > > > > > > > > > > > > Thanks Jason, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would only return the metadata > for > > > > the latest > > > > > > > > > > fetches. > > > > > > > > > > > > > > > > > > > > > So, if someone wanted to use this > to > > > > lazily maintain > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > > > client-side metadata map for all > > > > partitions, they'd > > > > > > > > > > have > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > store it separately and merge in > new > > > > updates as they > > > > > > > > > > > > > arrive. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This way: > > > > > > > > > > > > > > > > > > > > > 1. We don't need to increase the > > > > complexity of the > > > > > > > > > > client > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > > > > > > storing that metadata > > > > > > > > > > > > > > > > > > > > > 2. Users will be able to treat all > > > > returned metadata > > > > > > > > > > as > > > > > > > > > > > > > > > > > > > > > "fresh" without having to reason > about > > > > the > > > > > > > > > > timestamps. > > > > > > > > > > > > > > > > > > > > > 3. All parts of the returned > > > > ConsumerRecords object > > > > > > > > > > have > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > same lifecycle: all the data and > > > > metadata are the > > > > > > > > > > results > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > > the most recent round of fetch > responses > > > > that had not > > > > > > > > > > > > been > > > > > > > > > > > > > > > > > > > > > previously polled. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does that seem sensible to you? > I'll > > > > update the KIP > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > clarify this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-12-16 at 10:29 -0800, > Jason > > > > Gustafson > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Just one question. It wasn't very > > > > clear to me > > > > > > > > > > exactly > > > > > > > > > > > > > when the > > > > > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > > > > would be returned in > > > > `ConsumerRecords`. Would we > > > > > > > > > > > > > /always/ include the > > > > > > > > > > > > > > > > > > > > > > metadata for all partitions that > are > > > > assigned, or > > > > > > > > > > would > > > > > > > > > > > > > it be based > > > > > > > > > > > > > > > > > > on the > > > > > > > > > > > > > > > > > > > > > > latest fetches? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM > John > > > > Roesler < > > > > > > > > > > > > > vvcep...@apache.org> > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Guozhang! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > All of your feedback sounds > good to > > > > me. I’ll > > > > > > > > > > update > > > > > > > > > > > > > the KIP when I > > > > > > > > > > > > > > > > > > am able. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) I believe it is the position > > > > after the fetch, > > > > > > > > > > but > > > > > > > > > > > > I > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > > > confirm. I > > > > > > > > > > > > > > > > > > > > > > > think omitting position may > render > > > > beginning and > > > > > > > > > > end > > > > > > > > > > > > > offsets > > > > > > > > > > > > > > > > > > useless as > > > > > > > > > > > > > > > > > > > > > > > well, which leaves only lag. > That > > > > would be fine > > > > > > > > > > with > > > > > > > > > > > > > me, but it > > > > > > > > > > > > > > > > > > also seems > > > > > > > > > > > > > > > > > > > > > > > nice to supply this extra > metadata > > > > since it is > > > > > > > > > > well > > > > > > > > > > > > > defined and > > > > > > > > > > > > > > > > > > probably > > > > > > > > > > > > > > > > > > > > > > > handy for others. Therefore, > I’d go > > > > the route of > > > > > > > > > > > > > specifying the > > > > > > > > > > > > > > > > > > exact > > > > > > > > > > > > > > > > > > > > > > > semantics and keeping it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the review, > > > > > > > > > > > > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36, > > > > Guozhang Wang > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hello John, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updates! I've > made > > > > a pass on > > > > > > > > > > the KIP > > > > > > > > > > > > > and also the > > > > > > > > > > > > > > > > > > POC PR, > > > > > > > > > > > > > > > > > > > > > > > > here are some minor comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) nit: "receivedTimestamp" > -> it > > > > seems the > > > > > > > > > > > > metadata > > > > > > > > > > > > > keep getting > > > > > > > > > > > > > > > > > > > > > > > updated, > > > > > > > > > > > > > > > > > > > > > > > > and we do not create a new > object > > > > but just > > > > > > > > > > update > > > > > > > > > > > > > the values > > > > > > > > > > > > > > > > > > in-place, so > > > > > > > > > > > > > > > > > > > > > > > > maybe calling it > > > > `lastUpdateTimstamp` is > > > > > > > > > > better? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) It will be great to > verify in > > > > javadocs that > > > > > > > > > > the > > > > > > > > > > > > > new API > > > > > > > > > > > > > > > > > > > > > > > > "ConsumerRecords#metadata(): > > > > > > > > > > Map<TopicPartition, > > > > > > > > > > > > > Metadata>" may > > > > > > > > > > > > > > > > > > return a > > > > > > > > > > > > > > > > > > > > > > > > superset of TopicPartitions > than > > > > the existing > > > > > > > > > > API > > > > > > > > > > > > > that returns the > > > > > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > > > > > > > > > partitions, in case users > assume > > > > their map > > > > > > > > > > > > > key-entries would > > > > > > > > > > > > > > > > > > always be > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > same. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3) The "position()" API of > the > > > > call needs > > > > > > > > > > better > > > > > > > > > > > > > clarification: is > > > > > > > > > > > > > > > > > > it the > > > > > > > > > > > > > > > > > > > > > > > > current position AFTER the > records > > > > are > > > > > > > > > > returned, or > > > > > > > > > > > > > is it BEFORE > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > > > records are returned? > Personally > > > > I'd suggest > > > > > > > > > > we do > > > > > > > > > > > > > not include it > > > > > > > > > > > > > > > > > > if it > > > > > > > > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > > > > > not used anywhere yet just to > > > > avoid possible > > > > > > > > > > > > > misuage, but I'm fine > > > > > > > > > > > > > > > > > > if you > > > > > > > > > > > > > > > > > > > > > > > > like to keep it still; in > that > > > > case just > > > > > > > > > > clarify > > > > > > > > > > > > its > > > > > > > > > > > > > semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Other than that,I'm +1 on > the KIP > > > > as well ! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 > AM > > > > Walker Carlson > > > > > > > > > > < > > > > > > > > > > > > > > > > > > wcarl...@confluent.io> > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > walker > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2020 at > 11:40 AM > > > > Bruno > > > > > > > > > > Cadonna < > > > > > > > > > > > > > br...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > > > > > > > > > > > Bruno > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 08.12.20 18:03, John > > > > Roesler wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There hasn't been much > > > > discussion on > > > > > > > > > > KIP-695 > > > > > > > > > > > > > so far, so I'd > > > > > > > > > > > > > > > > > > > > > > > > > > > like to go ahead and > call > > > > for a vote. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As a reminder, the > purpose > > > > of KIP-695 to > > > > > > > > > > > > > improve on the > > > > > > > > > > > > > > > > > > > > > > > > > > > "task idling" feature > we > > > > introduced in > > > > > > > > > > > > > KIP-353. This KIP > > > > > > > > > > > > > > > > > > > > > > > > > > > will allow Streams to > offer > > > > deterministic > > > > > > > > > > > > time > > > > > > > > > > > > > semantics in > > > > > > > > > > > > > > > > > > > > > > > > > > > join-type topologies. > For > > > > example, it > > > > > > > > > > makes > > > > > > > > > > > > > sure that > > > > > > > > > > > > > > > > > > > > > > > > > > > when you join two > topics, > > > > that we > > > > > > > > > > collate the > > > > > > > > > > > > > topics by > > > > > > > > > > > > > > > > > > > > > > > > > > > timestamp. That was > always > > > > the intent > > > > > > > > > > with > > > > > > > > > > > > > task idling (KIP- > > > > > > > > > > > > > > > > > > > > > > > > > > > 353), but it turns out > the > > > > previous > > > > > > > > > > mechanism > > > > > > > > > > > > > couldn't > > > > > > > > > > > > > > > > > > > > > > > > > > > provide the desired > > > > semantics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The details are here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang