Hi All, I updated the KIP <https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable> and the implementation, following the discussion here.
You must be working hard preparing the release of 2.6.0, so please have a look after your work is done. Thanks, Dongjin On Sun, Mar 8, 2020 at 12:20 PM John Roesler <vvcep...@apache.org> wrote: > Thanks Matthias, > > Good idea. I've changed the ticket name and added a note > clarifying that this ticket is not the same as > https://issues.apache.org/jira/browse/KAFKA-7224 > > Incidentally, I learned that I never documented my reasons > for abandoning my work on KAFKA-7224 ! I've now updated > that ticket, too, so your question had an unexpected side-benefit. > > Thanks, > -John > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote: > > -----BEGIN PGP SIGNED MESSAGE----- > > Hash: SHA512 > > > > Thanks for clarification. > > > > Can you maybe update the Jira ticket? Do we have a ticket for > > spill-to-disk? Maybe link to it and explain that it's two different > > things? Maybe even rename the ticket to something more clear, ie, > > "make suppress result queryable" or simliar? > > > > > > - -Matthias > > > > On 3/7/20 1:58 PM, John Roesler wrote: > > > Hey Matthias, > > > > > > I’m sorry if the ticket was poorly stated. The ticket is to add a > > DSL overload to pass a Materialized argument to suppress. As a result, > > the result of the suppression would be queriable. > > > > > > This is unrelated to “persistent buffer” aka “spill-to-disk”. > > > > > > There was some confusion before about whether this ticket could be > > implemented as “query the buffer”. Maybe it can, but not trivially. > > The obvious way is just to add a new state store which we write the > > results into just before we forward. I.e., it’s exactly like the > > materialized variant of any stateless KTable operation. > > > > > > Thanks, John > > > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for > > > the KIP Dongjin, > > > > > > I am still not sure if I can follow, what might also be caused by > > > the backing JIRA ticket (maybe John can clarify the intent of the > > > ticket as he created it): > > > > > > Currently, suppress() only uses an in-memory buffer and my > > > understanding of the Jira is, to add the ability to use a > > > persistent buffer (ie, spill to disk backed by RocksDB). > > > > > > Adding a persistent buffer is completely unrelated to allow > > > querying the buffer. In fact, one could query an in-memory buffer, > > > too. However, querying the buffer does not really seem to be useful > > > as pointed out by John, as you can always query the upstream KTable > > > store. > > > > > > Also note that for the emit-on-window-close case the result is > > > deleted from the buffer when it is emitted, and thus cannot be > > > queried any longe r. > > > > > > > > > Can you please clarify if you intend to allow spilling to disk or > > > if you intent to enable IQ (even if I don't see why querying make > > > sense, as the data is either upstream or deleted). Also, if you > > > want to enable IQ, why do we need all those new interfaces? The > > > result of a suppress() is a KTable that is the same as any other > > > key-value/windowed/sessions store? > > > > > > We should also have corresponding Jira tickets for different cases > > > to avoid the confusion I am in atm :) > > > > > > > > > -Matthias > > > > > > > > > On 2/27/20 8:21 AM, John Roesler wrote: > > >>>> Hi Dongjin, > > >>>> > > >>>> No problem; glad we got it sorted out. > > >>>> > > >>>> Thanks again for picking this up! -John > > >>>> > > >>>> On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote: > > >>>>>> I was under the impression that you wanted to expand the > > >>>>>> scope of the KIP > > >>>>> to additionally allow querying the internal buffer, not > > >>>>> just the result. Can you clarify whether you are proposing > > >>>>> to allow querying the state of the internal buffer, the > > >>>>> result, or both? > > >>>>> > > >>>>> Sorry for the confusion. As we already talked with, we only > > >>>>> need to query the suppressed output, not the internal > > >>>>> buffer. The current implementation is wrong. After refining > > >>>>> the KIP and implementation accordingly I will notify you - > > >>>>> I must be confused, also. > > >>>>> > > >>>>> Thanks, Dongjin > > >>>>> > > >>>>> On Tue, Feb 25, 2020 at 12:17 AM John Roesler > > >>>>> <vvcep...@apache.org> wrote: > > >>>>> > > >>>>>> Hi Dongjin, > > >>>>>> > > >>>>>> Ah, I think I may have been confused. I 100% agree that > > >>>>>> we need a materialized variant for suppress(). Then, you > > >>>>>> could do: ...suppress(..., > > >>>>>> Materialized.as(“final-count”)) > > >>>>>> > > >>>>>> If that’s your proposal, then we are on the same page. > > >>>>>> > > >>>>>> I was under the impression that you wanted to expand the > > >>>>>> scope of the KIP to additionally allow querying the > > >>>>>> internal buffer, not just the result. Can you clarify > > >>>>>> whether you are proposing to allow querying the state of > > >>>>>> the internal buffer, the result, or both? > > >>>>>> > > >>>>>> Thanks, John > > >>>>>> > > >>>>>> On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote: > > >>>>>>> Hi John, Thanks for your kind explanation with an > > >>>>>>> example. > > >>>>>>> > > >>>>>>>> But it feels like you're saying you're trying to do > > >>>>>>>> something different > > >>>>>>> than just query the windowed key and get back the > > >>>>>>> current count? > > >>>>>>> > > >>>>>>> Yes, for example, what if we need to retrieve the (all > > >>>>>>> or range) keys > > >>>>>> with > > >>>>>>> a closed window? In this example, let's imagine we need > > >>>>>>> to retrieve only (key=A, window=10), not (key=A, > > >>>>>>> window=20). > > >>>>>>> > > >>>>>>> Of course, the value accompanied by a flushed key is > > >>>>>>> exactly the same to the one in the upstream KTable; > > >>>>>>> However, if our intention is not pointing out a > > >>>>>>> specific key but retrieving a group of unspecified > > >>>>>>> keys, we stuck > > >>>>>> in > > >>>>>>> trouble - since we can't be sure which key is flushed > > >>>>>>> out beforehand. > > >>>>>>> > > >>>>>>> One workaround would be materializing it with > > >>>>>>> `suppressed.filter(e -> > > >>>>>> true, > > >>>>>>> Materialized.as("final-count"))`. But I think providing > > >>>>>>> a materialized variant for suppress method is better > > >>>>>>> than this workaround. > > >>>>>>> > > >>>>>>> Thanks, Dongjin > > >>>>>>> > > >>>>>>> On Thu, Feb 20, 2020 at 1:26 AM John Roesler > > >>>>>>> <vvcep...@apache.org> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>>> Thanks for the response, Dongjin, > > >>>>>>>> > > >>>>>>>> I'm sorry, but I'm still not following. It seems like > > >>>>>>>> the view you > > >>>>>> would > > >>>>>>>> get on the "current state of the buffer" would always > > >>>>>>>> be equivalent to the view of the upstream table. > > >>>>>>>> > > >>>>>>>> Let me try an example, and maybe you can point out > > >>>>>>>> the flaw in my reasoning. > > >>>>>>>> > > >>>>>>>> Let's say we're doing 10 ms windows with a grace > > >>>>>>>> period of zero. Let's also say we're computing a > > >>>>>>>> windowed count, and that we have a "final results" > > >>>>>>>> suppression after the count. Let's materialize the > > >>>>>>>> count as "Count" and the suppressed result as "Final > > >>>>>>>> Count". > > >>>>>>>> > > >>>>>>>> Suppose we get an input event: (time=10, key=A, > > >>>>>>>> value=...) > > >>>>>>>> > > >>>>>>>> Then, Count will look like: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 1 | > > >>>>>>>> > > >>>>>>>> The (internal) suppression buffer will contain: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 1 | > > >>>>>>>> > > >>>>>>>> The record is still buffered because the window > > >>>>>>>> isn't closed yet. Final Count is an empty table: > > >>>>>>>> > > >>>>>>>> | window | key | value | > > >>>>>>>> > > >>>>>>>> --------------- > > >>>>>>>> > > >>>>>>>> Now, we get a second event: (time=15, key=A, > > >>>>>>>> value=...) > > >>>>>>>> > > >>>>>>>> Then, Count will look like: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 2 | > > >>>>>>>> > > >>>>>>>> The (internal) suppression buffer will contain: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 2 | > > >>>>>>>> > > >>>>>>>> The record is still buffered because the window > > >>>>>>>> isn't closed yet. Final Count is an empty table: > > >>>>>>>> > > >>>>>>>> | window | key | value | > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> --------------- > > >>>>>>>> > > >>>>>>>> Finally, we get a third event: (time=20, key=A, > > >>>>>>>> value=...) > > >>>>>>>> > > >>>>>>>> Then, Count will look like: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 2 | | > > >>>>>>>> 20 | A | 1 | > > >>>>>>>> > > >>>>>>>> The (internal) suppression buffer will contain: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 20 | A | 1 | > > >>>>>>>> > > >>>>>>>> Note that window 10 has been flushed out, because > > >>>>>>>> it's now closed. And window 20 is buffered because it > > >>>>>>>> isn't closed yet. Final Count is now: > > >>>>>>>> > > >>>>>>>> | window | key | value | | 10 | A | 2 | > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> --------------- > > >>>>>>>> > > >>>>>>>> Reading your email, I can't figure out what value > > >>>>>>>> there is in querying > > >>>>>> the > > >>>>>>>> internal suppression buffer, since it only contains > > >>>>>>>> exactly the same > > >>>>>> value > > >>>>>>>> as the upstream table, for each key that is still > > >>>>>>>> buffered. But it feels > > >>>>>> like > > >>>>>>>> you're saying you're trying to do something different > > >>>>>>>> than just query > > >>>>>> the > > >>>>>>>> windowed key and get back the current count? > > >>>>>>>> > > >>>>>>>> Thanks, -John > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote: > > >>>>>>>>> Hi John, > > >>>>>>>>> > > >>>>>>>>> 'The intermediate state of the suppression' in KIP > > >>>>>>>>> does not mean the > > >>>>>>>> state > > >>>>>>>>> of upstream KTable - sure, the state of the > > >>>>>>>>> upstream KTable can be > > >>>>>>>> queried > > >>>>>>>>> by materializing the operator immediately before > > >>>>>>>>> the suppress as you > > >>>>>>>> shown. > > >>>>>>>>> What I meant in KIP was the final state of the > > >>>>>>>>> buffer, which is not > > >>>>>>>> emitted > > >>>>>>>>> yet. (I agree, the current description may be > > >>>>>>>>> confusing; it would be > > >>>>>>>> better > > >>>>>>>>> to change it with 'the current state of the > > >>>>>>>>> suppression' or 'the > > >>>>>> results > > >>>>>>>> of > > >>>>>>>>> the suppression', like the Jira issue > > >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403> > > >>>>>>>>> states.) > > >>>>>>>>> > > >>>>>>>>> For a little bit more about the motivation, here is > > >>>>>>>>> one of my > > >>>>>>>> experience: I > > >>>>>>>>> had to build a monitoring application which > > >>>>>>>>> collects signals from IoT devices (say, a > > >>>>>>>>> semiconductor production line.) If the number of > > >>>>>>>> collected > > >>>>>>>>> signals within the time window is much less than > > >>>>>>>>> the expected, there > > >>>>>> may > > >>>>>>>> be > > >>>>>>>>> some problems like network hiccup in the systems. > > >>>>>>>>> We wanted to build > > >>>>>> the > > >>>>>>>>> system in the form of a dashboard, but could not by > > >>>>>>>>> lack of > > >>>>>> materializing > > >>>>>>>>> feature. It was precisely the case of querying only > > >>>>>>>>> the final > > >>>>>> results of > > >>>>>>>> a > > >>>>>>>>> windowed aggregation, as the Jira issue > > >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403> > > >>>>>>>>> states. We > > >>>>>> finally > > >>>>>>>> ended > > >>>>>>>>> in implementing the system in an email alerting > > >>>>>>>>> system like this < > > >>>>>>>> > > >>>>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an > > d-t > > > > > >>>>>> > > riggers/ > > >>>>>>>>> > > >>>>>>>>> > > >>>>>> > > > and had to collect the keys and windows of trouble by hand. > > >>>>>>>>> > > >>>>>>>>> I think these kinds of use cases would be much > > >>>>>>>>> common. Should it be described in the KIP much more > > >>>>>>>>> in detail? > > >>>>>>>>> > > >>>>>>>>> Thanks, Dongjin > > >>>>>>>>> > > >>>>>>>>> On Sat, Feb 15, 2020 at 4:43 AM John Roesler > > >>>>>>>>> <vvcep...@apache.org> > > >>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Dongjin, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the KIP! > > >>>>>>>>>> > > >>>>>>>>>> Can you explain more about why the internal data > > >>>>>>>>>> structures of > > >>>>>>>> suppression > > >>>>>>>>>> should be queriable? The motivation just says > > >>>>>>>>>> that users might > > >>>>>> want to > > >>>>>>>> do > > >>>>>>>>>> it, which seems like it could justify literally > > >>>>>>>>>> anything :) > > >>>>>>>>>> > > >>>>>>>>>> One design point of Suppression is that if you > > >>>>>>>>>> wanted to query the > > >>>>>>>> “final > > >>>>>>>>>> state”, you can Materialize the suppress itself > > >>>>>>>>>> (which is why it > > >>>>>> needs > > >>>>>>>> the > > >>>>>>>>>> variant); if you wanted to query the > > >>>>>>>>>> “intermediate state”, you can materialize the > > >>>>>>>>>> operator immediately before the suppress. > > >>>>>>>>>> > > >>>>>>>>>> Example: > > >>>>>>>>>> > > >>>>>>>>>> ...count(Materialized.as(“intermediate”)) > > >>>>>>>>>> .supress(untilWindowClosed(), > > >>>>>>>>>> Materialized.as(“final”)) > > >>>>>>>>>> > > >>>>>>>>>> I’m not sure what use case would require > > >>>>>>>>>> actually fetching from the internal buffers. > > >>>>>>>>>> > > >>>>>>>>>> Thanks, John > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Fri, Feb 14, 2020, at 07:55, Dongjin Lee > > >>>>>>>>>> wrote: > > >>>>>>>>>>> Hi devs, > > >>>>>>>>>>> > > >>>>>>>>>>> I'd like to reboot the discussion on KIP-508, > > >>>>>>>>>>> which aims to > > >>>>>> support a > > >>>>>>>>>>> Materialized variant of KTable#suppress. It > > >>>>>>>>>>> was initially > > >>>>>> submitted > > >>>>>>>>>> several > > >>>>>>>>>>> months ago but closed by the inactivity. > > >>>>>>>>>>> > > >>>>>>>>>>> - KIP: > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make > > +Su > > > > > >>>>>> > > ppression+State+Queriable > > >>>>>>>>>>> > > >>>>>> > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403 > > >>>>>>>>>>> > > >>>>>>>>>>> All kinds of feedback will be greatly > > >>>>>>>>>>> appreciated. > > >>>>>>>>>>> > > >>>>>>>>>>> Best, Dongjin > > >>>>>>>>>>> > > >>>>>>>>>>> -- *Dongjin Lee* > > >>>>>>>>>>> > > >>>>>>>>>>> *A hitchhiker in the mathematical world.* > > >>>>>>>>>>> *github: > > >>>>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr > > >>>>>>>>>>> > > >>>>>>>>>>> > > <https://github.com/dongjinleekr>linkedin: > > >>>>>>>>>> kr.linkedin.com/in/dongjinleekr > > >>>>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > >>>>>>>>>> > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > speakerdeck.com/dongjin > > >>>>>>>>>>> <https://speakerdeck.com/dongjin>* > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- *Dongjin Lee* > > >>>>>>>>> > > >>>>>>>>> *A hitchhiker in the mathematical world.* *github: > > >>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr > > >>>>>>>>> <https://github.com/dongjinleekr>linkedin: > > >>>>>>>> kr.linkedin.com/in/dongjinleekr > > >>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > >>>>>>>> > > >>>>>>>>> > > speakerdeck.com/dongjin > > >>>>>>>>> <https://speakerdeck.com/dongjin>* > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> -- *Dongjin Lee* > > >>>>>>> > > >>>>>>> *A hitchhiker in the mathematical world.* *github: > > >>>>>>> <http://goog_969573159/>github.com/dongjinleekr > > >>>>>>> <https://github.com/dongjinleekr>linkedin: > > >>>>>> kr.linkedin.com/in/dongjinleekr > > >>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > >>>>>> speakerdeck.com/dongjin > > >>>>>>> <https://speakerdeck.com/dongjin>* > > >>>>>>> > > >>>>>> > > >>>>> -- *Dongjin Lee* > > >>>>> > > >>>>> *A hitchhiker in the mathematical world.* *github: > > >>>>> <http://goog_969573159/>github.com/dongjinleekr > > >>>>> <https://github.com/dongjinleekr>linkedin: > > >>>>> kr.linkedin.com/in/dongjinleekr > > >>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > >>>>> speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>* > > >>>>> > > >> > > -----BEGIN PGP SIGNATURE----- > > > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq > > /OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC > > JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s > > Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B > > 397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB > > 0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow > > fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB > > +WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP > > w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c > > NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn > > cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2 > > 02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU= > > =St1e > > -----END PGP SIGNATURE----- > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr <https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*