Hi John, Thanks for the feedback. I will open the Vote thread now.
Best, Dongjin On Fri, Sep 11, 2020 at 2:00 AM John Roesler <vvcep...@apache.org> wrote: > Hi Dongjin, > > Sorry for the delay. I'm glad you're still pushing this > forward. It would be nice to get this in to the 2.7 release. > > I just took another look at the KIP, and it looks good to > me! > > I think this is ready for a vote. > > Thanks, > -John > > On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote: > > Hi All, > > > > I updated the KIP > > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable > > > > and the implementation, following the discussion here. > > > > You must be working hard preparing the release of 2.6.0, so please have a > > look after your work is done. > > > > Thanks, > > Dongjin > > > > On Sun, Mar 8, 2020 at 12:20 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks Matthias, > > > > > > Good idea. I've changed the ticket name and added a note > > > clarifying that this ticket is not the same as > > > https://issues.apache.org/jira/browse/KAFKA-7224 > > > > > > Incidentally, I learned that I never documented my reasons > > > for abandoning my work on KAFKA-7224 ! I've now updated > > > that ticket, too, so your question had an unexpected side-benefit. > > > > > > Thanks, > > > -John > > > > > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote: > > > > -----BEGIN PGP SIGNED MESSAGE----- > > > > Hash: SHA512 > > > > > > > > Thanks for clarification. > > > > > > > > Can you maybe update the Jira ticket? Do we have a ticket for > > > > spill-to-disk? Maybe link to it and explain that it's two different > > > > things? Maybe even rename the ticket to something more clear, ie, > > > > "make suppress result queryable" or simliar? > > > > > > > > > > > > - -Matthias > > > > > > > > On 3/7/20 1:58 PM, John Roesler wrote: > > > > > Hey Matthias, > > > > > > > > > > I’m sorry if the ticket was poorly stated. The ticket is to add a > > > > DSL overload to pass a Materialized argument to suppress. As a > result, > > > > the result of the suppression would be queriable. > > > > > This is unrelated to “persistent buffer” aka “spill-to-disk”. > > > > > > > > > > There was some confusion before about whether this ticket could be > > > > implemented as “query the buffer”. Maybe it can, but not trivially. > > > > The obvious way is just to add a new state store which we write the > > > > results into just before we forward. I.e., it’s exactly like the > > > > materialized variant of any stateless KTable operation. > > > > > Thanks, John > > > > > > > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for > > > > > the KIP Dongjin, > > > > > > > > > > I am still not sure if I can follow, what might also be caused by > > > > > the backing JIRA ticket (maybe John can clarify the intent of the > > > > > ticket as he created it): > > > > > > > > > > Currently, suppress() only uses an in-memory buffer and my > > > > > understanding of the Jira is, to add the ability to use a > > > > > persistent buffer (ie, spill to disk backed by RocksDB). > > > > > > > > > > Adding a persistent buffer is completely unrelated to allow > > > > > querying the buffer. In fact, one could query an in-memory buffer, > > > > > too. However, querying the buffer does not really seem to be useful > > > > > as pointed out by John, as you can always query the upstream KTable > > > > > store. > > > > > > > > > > Also note that for the emit-on-window-close case the result is > > > > > deleted from the buffer when it is emitted, and thus cannot be > > > > > queried any longe r. > > > > > > > > > > > > > > > Can you please clarify if you intend to allow spilling to disk or > > > > > if you intent to enable IQ (even if I don't see why querying make > > > > > sense, as the data is either upstream or deleted). Also, if you > > > > > want to enable IQ, why do we need all those new interfaces? The > > > > > result of a suppress() is a KTable that is the same as any other > > > > > key-value/windowed/sessions store? > > > > > > > > > > We should also have corresponding Jira tickets for different cases > > > > > to avoid the confusion I am in atm :) > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 2/27/20 8:21 AM, John Roesler wrote: > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > No problem; glad we got it sorted out. > > > > > > > > > > > > > > > > Thanks again for picking this up! -John > > > > > > > > > > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote: > > > > > > > > > > I was under the impression that you wanted to expand the > > > > > > > > > > scope of the KIP > > > > > > > > > to additionally allow querying the internal buffer, not > > > > > > > > > just the result. Can you clarify whether you are proposing > > > > > > > > > to allow querying the state of the internal buffer, the > > > > > > > > > result, or both? > > > > > > > > > > > > > > > > > > Sorry for the confusion. As we already talked with, we only > > > > > > > > > need to query the suppressed output, not the internal > > > > > > > > > buffer. The current implementation is wrong. After refining > > > > > > > > > the KIP and implementation accordingly I will notify you - > > > > > > > > > I must be confused, also. > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > On Tue, Feb 25, 2020 at 12:17 AM John Roesler > > > > > > > > > <vvcep...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > > > > > Ah, I think I may have been confused. I 100% agree that > > > > > > > > > > we need a materialized variant for suppress(). Then, you > > > > > > > > > > could do: ...suppress(..., > > > > > > > > > > Materialized.as(“final-count”)) > > > > > > > > > > > > > > > > > > > > If that’s your proposal, then we are on the same page. > > > > > > > > > > > > > > > > > > > > I was under the impression that you wanted to expand the > > > > > > > > > > scope of the KIP to additionally allow querying the > > > > > > > > > > internal buffer, not just the result. Can you clarify > > > > > > > > > > whether you are proposing to allow querying the state of > > > > > > > > > > the internal buffer, the result, or both? > > > > > > > > > > > > > > > > > > > > Thanks, John > > > > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote: > > > > > > > > > > > Hi John, Thanks for your kind explanation with an > > > > > > > > > > > example. > > > > > > > > > > > > > > > > > > > > > > > But it feels like you're saying you're trying to do > > > > > > > > > > > > something different > > > > > > > > > > > than just query the windowed key and get back the > > > > > > > > > > > current count? > > > > > > > > > > > > > > > > > > > > > > Yes, for example, what if we need to retrieve the (all > > > > > > > > > > > or range) keys > > > > > > > > > > with > > > > > > > > > > > a closed window? In this example, let's imagine we need > > > > > > > > > > > to retrieve only (key=A, window=10), not (key=A, > > > > > > > > > > > window=20). > > > > > > > > > > > > > > > > > > > > > > Of course, the value accompanied by a flushed key is > > > > > > > > > > > exactly the same to the one in the upstream KTable; > > > > > > > > > > > However, if our intention is not pointing out a > > > > > > > > > > > specific key but retrieving a group of unspecified > > > > > > > > > > > keys, we stuck > > > > > > > > > > in > > > > > > > > > > > trouble - since we can't be sure which key is flushed > > > > > > > > > > > out beforehand. > > > > > > > > > > > > > > > > > > > > > > One workaround would be materializing it with > > > > > > > > > > > `suppressed.filter(e -> > > > > > > > > > > true, > > > > > > > > > > > Materialized.as("final-count"))`. But I think providing > > > > > > > > > > > a materialized variant for suppress method is better > > > > > > > > > > > than this workaround. > > > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler > > > > > > > > > > > <vvcep...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > Thanks for the response, Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > I'm sorry, but I'm still not following. It seems like > > > > > > > > > > > > the view you > > > > > > > > > > would > > > > > > > > > > > > get on the "current state of the buffer" would always > > > > > > > > > > > > be equivalent to the view of the upstream table. > > > > > > > > > > > > > > > > > > > > > > > > Let me try an example, and maybe you can point out > > > > > > > > > > > > the flaw in my reasoning. > > > > > > > > > > > > > > > > > > > > > > > > Let's say we're doing 10 ms windows with a grace > > > > > > > > > > > > period of zero. Let's also say we're computing a > > > > > > > > > > > > windowed count, and that we have a "final results" > > > > > > > > > > > > suppression after the count. Let's materialize the > > > > > > > > > > > > count as "Count" and the suppressed result as "Final > > > > > > > > > > > > Count". > > > > > > > > > > > > > > > > > > > > > > > > Suppose we get an input event: (time=10, key=A, > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 1 | > > > > > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 1 | > > > > > > > > > > > > > > > > > > > > > > > > The record is still buffered because the window > > > > > > > > > > > > isn't closed yet. Final Count is an empty table: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | > > > > > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > Now, we get a second event: (time=15, key=A, > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 2 | > > > > > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 2 | > > > > > > > > > > > > > > > > > > > > > > > > The record is still buffered because the window > > > > > > > > > > > > isn't closed yet. Final Count is an empty table: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | > > > > > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > Finally, we get a third event: (time=20, key=A, > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 2 | | > > > > > > > > > > > > 20 | A | 1 | > > > > > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 20 | A | 1 | > > > > > > > > > > > > > > > > > > > > > > > > Note that window 10 has been flushed out, because > > > > > > > > > > > > it's now closed. And window 20 is buffered because it > > > > > > > > > > > > isn't closed yet. Final Count is now: > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A | 2 | > > > > > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > Reading your email, I can't figure out what value > > > > > > > > > > > > there is in querying > > > > > > > > > > the > > > > > > > > > > > > internal suppression buffer, since it only contains > > > > > > > > > > > > exactly the same > > > > > > > > > > value > > > > > > > > > > > > as the upstream table, for each key that is still > > > > > > > > > > > > buffered. But it feels > > > > > > > > > > like > > > > > > > > > > > > you're saying you're trying to do something different > > > > > > > > > > > > than just query > > > > > > > > > > the > > > > > > > > > > > > windowed key and get back the current count? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote: > > > > > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > > > > > > > > > 'The intermediate state of the suppression' in KIP > > > > > > > > > > > > > does not mean the > > > > > > > > > > > > state > > > > > > > > > > > > > of upstream KTable - sure, the state of the > > > > > > > > > > > > > upstream KTable can be > > > > > > > > > > > > queried > > > > > > > > > > > > > by materializing the operator immediately before > > > > > > > > > > > > > the suppress as you > > > > > > > > > > > > shown. > > > > > > > > > > > > > What I meant in KIP was the final state of the > > > > > > > > > > > > > buffer, which is not > > > > > > > > > > > > emitted > > > > > > > > > > > > > yet. (I agree, the current description may be > > > > > > > > > > > > > confusing; it would be > > > > > > > > > > > > better > > > > > > > > > > > > > to change it with 'the current state of the > > > > > > > > > > > > > suppression' or 'the > > > > > > > > > > results > > > > > > > > > > > > of > > > > > > > > > > > > > the suppression', like the Jira issue > > > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> > > > > > > > > > > > > > states.) > > > > > > > > > > > > > > > > > > > > > > > > > > For a little bit more about the motivation, here is > > > > > > > > > > > > > one of my > > > > > > > > > > > > experience: I > > > > > > > > > > > > > had to build a monitoring application which > > > > > > > > > > > > > collects signals from IoT devices (say, a > > > > > > > > > > > > > semiconductor production line.) If the number of > > > > > > > > > > > > collected > > > > > > > > > > > > > signals within the time window is much less than > > > > > > > > > > > > > the expected, there > > > > > > > > > > may > > > > > > > > > > > > be > > > > > > > > > > > > > some problems like network hiccup in the systems. > > > > > > > > > > > > > We wanted to build > > > > > > > > > > the > > > > > > > > > > > > > system in the form of a dashboard, but could not by > > > > > > > > > > > > > lack of > > > > > > > > > > materializing > > > > > > > > > > > > > feature. It was precisely the case of querying only > > > > > > > > > > > > > the final > > > > > > > > > > results of > > > > > > > > > > > > a > > > > > > > > > > > > > windowed aggregation, as the Jira issue > > > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> > > > > > > > > > > > > > states. We > > > > > > > > > > finally > > > > > > > > > > > > ended > > > > > > > > > > > > > in implementing the system in an email alerting > > > > > > > > > > > > > system like this < > > > > > > > > > > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an > > > > d-t > > > > riggers/ > > > > > > > > > > > > > > > > > > and had to collect the keys and windows of trouble by hand. > > > > > > > > > > > > > I think these kinds of use cases would be much > > > > > > > > > > > > > common. Should it be described in the KIP much more > > > > > > > > > > > > > in detail? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler > > > > > > > > > > > > > <vvcep...@apache.org> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you explain more about why the internal data > > > > > > > > > > > > > > structures of > > > > > > > > > > > > suppression > > > > > > > > > > > > > > should be queriable? The motivation just says > > > > > > > > > > > > > > that users might > > > > > > > > > > want to > > > > > > > > > > > > do > > > > > > > > > > > > > > it, which seems like it could justify literally > > > > > > > > > > > > > > anything :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > One design point of Suppression is that if you > > > > > > > > > > > > > > wanted to query the > > > > > > > > > > > > “final > > > > > > > > > > > > > > state”, you can Materialize the suppress itself > > > > > > > > > > > > > > (which is why it > > > > > > > > > > needs > > > > > > > > > > > > the > > > > > > > > > > > > > > variant); if you wanted to query the > > > > > > > > > > > > > > “intermediate state”, you can materialize the > > > > > > > > > > > > > > operator immediately before the suppress. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Example: > > > > > > > > > > > > > > > > > > > > > > > > > > > > ...count(Materialized.as(“intermediate”)) > > > > > > > > > > > > > > .supress(untilWindowClosed(), > > > > > > > > > > > > > > Materialized.as(“final”)) > > > > > > > > > > > > > > > > > > > > > > > > > > > > I’m not sure what use case would require > > > > > > > > > > > > > > actually fetching from the internal buffers. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin Lee > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to reboot the discussion on KIP-508, > > > > > > > > > > > > > > > which aims to > > > > > > > > > > support a > > > > > > > > > > > > > > > Materialized variant of KTable#suppress. It > > > > > > > > > > > > > > > was initially > > > > > > > > > > submitted > > > > > > > > > > > > > > several > > > > > > > > > > > > > > > months ago but closed by the inactivity. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make > > > > +Su > > > > ppression+State+Queriable > > > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > > > > > > All kinds of feedback will be greatly > > > > > > > > > > > > > > > appreciated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > > > > > > > > > > > > > *github: > > > > > > > > > > > > > > > <http://goog_969573159/> > github.com/dongjinleekr > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr > >speakerdeck: > > > > speakerdeck.com/dongjin > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* *github: > > > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr > >speakerdeck: > > > > speakerdeck.com/dongjin > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* *github: > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > > > > > > > speakerdeck.com/dongjin > > > > > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* *github: > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr > > > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > > > > > > speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > -----BEGIN PGP SIGNATURE----- > > > > > > > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq > > > > /OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC > > > > JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s > > > > Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B > > > > 397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB > > > > 0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow > > > > fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB > > > > +WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP > > > > w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c > > > > NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn > > > > cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2 > > > > 02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU= > > > > =St1e > > > > -----END PGP SIGNATURE----- > > > > > > > > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr <https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*