-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Thanks for clarification.
Can you maybe update the Jira ticket? Do we have a ticket for spill-to-disk? Maybe link to it and explain that it's two different things? Maybe even rename the ticket to something more clear, ie, "make suppress result queryable" or simliar? - -Matthias On 3/7/20 1:58 PM, John Roesler wrote: > Hey Matthias, > > I’m sorry if the ticket was poorly stated. The ticket is to add a DSL overload to pass a Materialized argument to suppress. As a result, the result of the suppression would be queriable. > > This is unrelated to “persistent buffer” aka “spill-to-disk”. > > There was some confusion before about whether this ticket could be implemented as “query the buffer”. Maybe it can, but not trivially. The obvious way is just to add a new state store which we write the results into just before we forward. I.e., it’s exactly like the materialized variant of any stateless KTable operation. > > Thanks, John > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for > the KIP Dongjin, > > I am still not sure if I can follow, what might also be caused by > the backing JIRA ticket (maybe John can clarify the intent of the > ticket as he created it): > > Currently, suppress() only uses an in-memory buffer and my > understanding of the Jira is, to add the ability to use a > persistent buffer (ie, spill to disk backed by RocksDB). > > Adding a persistent buffer is completely unrelated to allow > querying the buffer. In fact, one could query an in-memory buffer, > too. However, querying the buffer does not really seem to be useful > as pointed out by John, as you can always query the upstream KTable > store. > > Also note that for the emit-on-window-close case the result is > deleted from the buffer when it is emitted, and thus cannot be > queried any longe r. > > > Can you please clarify if you intend to allow spilling to disk or > if you intent to enable IQ (even if I don't see why querying make > sense, as the data is either upstream or deleted). Also, if you > want to enable IQ, why do we need all those new interfaces? The > result of a suppress() is a KTable that is the same as any other > key-value/windowed/sessions store? > > We should also have corresponding Jira tickets for different cases > to avoid the confusion I am in atm :) > > > -Matthias > > > On 2/27/20 8:21 AM, John Roesler wrote: >>>> Hi Dongjin, >>>> >>>> No problem; glad we got it sorted out. >>>> >>>> Thanks again for picking this up! -John >>>> >>>> On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote: >>>>>> I was under the impression that you wanted to expand the >>>>>> scope of the KIP >>>>> to additionally allow querying the internal buffer, not >>>>> just the result. Can you clarify whether you are proposing >>>>> to allow querying the state of the internal buffer, the >>>>> result, or both? >>>>> >>>>> Sorry for the confusion. As we already talked with, we only >>>>> need to query the suppressed output, not the internal >>>>> buffer. The current implementation is wrong. After refining >>>>> the KIP and implementation accordingly I will notify you - >>>>> I must be confused, also. >>>>> >>>>> Thanks, Dongjin >>>>> >>>>> On Tue, Feb 25, 2020 at 12:17 AM John Roesler >>>>> <vvcep...@apache.org> wrote: >>>>> >>>>>> Hi Dongjin, >>>>>> >>>>>> Ah, I think I may have been confused. I 100% agree that >>>>>> we need a materialized variant for suppress(). Then, you >>>>>> could do: ...suppress(..., >>>>>> Materialized.as(“final-count”)) >>>>>> >>>>>> If that’s your proposal, then we are on the same page. >>>>>> >>>>>> I was under the impression that you wanted to expand the >>>>>> scope of the KIP to additionally allow querying the >>>>>> internal buffer, not just the result. Can you clarify >>>>>> whether you are proposing to allow querying the state of >>>>>> the internal buffer, the result, or both? >>>>>> >>>>>> Thanks, John >>>>>> >>>>>> On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote: >>>>>>> Hi John, Thanks for your kind explanation with an >>>>>>> example. >>>>>>> >>>>>>>> But it feels like you're saying you're trying to do >>>>>>>> something different >>>>>>> than just query the windowed key and get back the >>>>>>> current count? >>>>>>> >>>>>>> Yes, for example, what if we need to retrieve the (all >>>>>>> or range) keys >>>>>> with >>>>>>> a closed window? In this example, let's imagine we need >>>>>>> to retrieve only (key=A, window=10), not (key=A, >>>>>>> window=20). >>>>>>> >>>>>>> Of course, the value accompanied by a flushed key is >>>>>>> exactly the same to the one in the upstream KTable; >>>>>>> However, if our intention is not pointing out a >>>>>>> specific key but retrieving a group of unspecified >>>>>>> keys, we stuck >>>>>> in >>>>>>> trouble - since we can't be sure which key is flushed >>>>>>> out beforehand. >>>>>>> >>>>>>> One workaround would be materializing it with >>>>>>> `suppressed.filter(e -> >>>>>> true, >>>>>>> Materialized.as("final-count"))`. But I think providing >>>>>>> a materialized variant for suppress method is better >>>>>>> than this workaround. >>>>>>> >>>>>>> Thanks, Dongjin >>>>>>> >>>>>>> On Thu, Feb 20, 2020 at 1:26 AM John Roesler >>>>>>> <vvcep...@apache.org> >>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the response, Dongjin, >>>>>>>> >>>>>>>> I'm sorry, but I'm still not following. It seems like >>>>>>>> the view you >>>>>> would >>>>>>>> get on the "current state of the buffer" would always >>>>>>>> be equivalent to the view of the upstream table. >>>>>>>> >>>>>>>> Let me try an example, and maybe you can point out >>>>>>>> the flaw in my reasoning. >>>>>>>> >>>>>>>> Let's say we're doing 10 ms windows with a grace >>>>>>>> period of zero. Let's also say we're computing a >>>>>>>> windowed count, and that we have a "final results" >>>>>>>> suppression after the count. Let's materialize the >>>>>>>> count as "Count" and the suppressed result as "Final >>>>>>>> Count". >>>>>>>> >>>>>>>> Suppose we get an input event: (time=10, key=A, >>>>>>>> value=...) >>>>>>>> >>>>>>>> Then, Count will look like: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 1 | >>>>>>>> >>>>>>>> The (internal) suppression buffer will contain: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 1 | >>>>>>>> >>>>>>>> The record is still buffered because the window >>>>>>>> isn't closed yet. Final Count is an empty table: >>>>>>>> >>>>>>>> | window | key | value | >>>>>>>> >>>>>>>> --------------- >>>>>>>> >>>>>>>> Now, we get a second event: (time=15, key=A, >>>>>>>> value=...) >>>>>>>> >>>>>>>> Then, Count will look like: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 2 | >>>>>>>> >>>>>>>> The (internal) suppression buffer will contain: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 2 | >>>>>>>> >>>>>>>> The record is still buffered because the window >>>>>>>> isn't closed yet. Final Count is an empty table: >>>>>>>> >>>>>>>> | window | key | value | >>>>>>>> >>>>>>>> >>>>>>>> --------------- >>>>>>>> >>>>>>>> Finally, we get a third event: (time=20, key=A, >>>>>>>> value=...) >>>>>>>> >>>>>>>> Then, Count will look like: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 2 | | >>>>>>>> 20 | A | 1 | >>>>>>>> >>>>>>>> The (internal) suppression buffer will contain: >>>>>>>> >>>>>>>> | window | key | value | | 20 | A | 1 | >>>>>>>> >>>>>>>> Note that window 10 has been flushed out, because >>>>>>>> it's now closed. And window 20 is buffered because it >>>>>>>> isn't closed yet. Final Count is now: >>>>>>>> >>>>>>>> | window | key | value | | 10 | A | 2 | >>>>>>>> >>>>>>>> >>>>>>>> --------------- >>>>>>>> >>>>>>>> Reading your email, I can't figure out what value >>>>>>>> there is in querying >>>>>> the >>>>>>>> internal suppression buffer, since it only contains >>>>>>>> exactly the same >>>>>> value >>>>>>>> as the upstream table, for each key that is still >>>>>>>> buffered. But it feels >>>>>> like >>>>>>>> you're saying you're trying to do something different >>>>>>>> than just query >>>>>> the >>>>>>>> windowed key and get back the current count? >>>>>>>> >>>>>>>> Thanks, -John >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote: >>>>>>>>> Hi John, >>>>>>>>> >>>>>>>>> 'The intermediate state of the suppression' in KIP >>>>>>>>> does not mean the >>>>>>>> state >>>>>>>>> of upstream KTable - sure, the state of the >>>>>>>>> upstream KTable can be >>>>>>>> queried >>>>>>>>> by materializing the operator immediately before >>>>>>>>> the suppress as you >>>>>>>> shown. >>>>>>>>> What I meant in KIP was the final state of the >>>>>>>>> buffer, which is not >>>>>>>> emitted >>>>>>>>> yet. (I agree, the current description may be >>>>>>>>> confusing; it would be >>>>>>>> better >>>>>>>>> to change it with 'the current state of the >>>>>>>>> suppression' or 'the >>>>>> results >>>>>>>> of >>>>>>>>> the suppression', like the Jira issue >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403> >>>>>>>>> states.) >>>>>>>>> >>>>>>>>> For a little bit more about the motivation, here is >>>>>>>>> one of my >>>>>>>> experience: I >>>>>>>>> had to build a monitoring application which >>>>>>>>> collects signals from IoT devices (say, a >>>>>>>>> semiconductor production line.) If the number of >>>>>>>> collected >>>>>>>>> signals within the time window is much less than >>>>>>>>> the expected, there >>>>>> may >>>>>>>> be >>>>>>>>> some problems like network hiccup in the systems. >>>>>>>>> We wanted to build >>>>>> the >>>>>>>>> system in the form of a dashboard, but could not by >>>>>>>>> lack of >>>>>> materializing >>>>>>>>> feature. It was precisely the case of querying only >>>>>>>>> the final >>>>>> results of >>>>>>>> a >>>>>>>>> windowed aggregation, as the Jira issue >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403> >>>>>>>>> states. We >>>>>> finally >>>>>>>> ended >>>>>>>>> in implementing the system in an email alerting >>>>>>>>> system like this < >>>>>>>> >>>>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an d-t > >>>>>> riggers/ >>>>>>>>> >>>>>>>>> >>>>>> > and had to collect the keys and windows of trouble by hand. >>>>>>>>> >>>>>>>>> I think these kinds of use cases would be much >>>>>>>>> common. Should it be described in the KIP much more >>>>>>>>> in detail? >>>>>>>>> >>>>>>>>> Thanks, Dongjin >>>>>>>>> >>>>>>>>> On Sat, Feb 15, 2020 at 4:43 AM John Roesler >>>>>>>>> <vvcep...@apache.org> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Dongjin, >>>>>>>>>> >>>>>>>>>> Thanks for the KIP! >>>>>>>>>> >>>>>>>>>> Can you explain more about why the internal data >>>>>>>>>> structures of >>>>>>>> suppression >>>>>>>>>> should be queriable? The motivation just says >>>>>>>>>> that users might >>>>>> want to >>>>>>>> do >>>>>>>>>> it, which seems like it could justify literally >>>>>>>>>> anything :) >>>>>>>>>> >>>>>>>>>> One design point of Suppression is that if you >>>>>>>>>> wanted to query the >>>>>>>> “final >>>>>>>>>> state”, you can Materialize the suppress itself >>>>>>>>>> (which is why it >>>>>> needs >>>>>>>> the >>>>>>>>>> variant); if you wanted to query the >>>>>>>>>> “intermediate state”, you can materialize the >>>>>>>>>> operator immediately before the suppress. >>>>>>>>>> >>>>>>>>>> Example: >>>>>>>>>> >>>>>>>>>> ...count(Materialized.as(“intermediate”)) >>>>>>>>>> .supress(untilWindowClosed(), >>>>>>>>>> Materialized.as(“final”)) >>>>>>>>>> >>>>>>>>>> I’m not sure what use case would require >>>>>>>>>> actually fetching from the internal buffers. >>>>>>>>>> >>>>>>>>>> Thanks, John >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Feb 14, 2020, at 07:55, Dongjin Lee >>>>>>>>>> wrote: >>>>>>>>>>> Hi devs, >>>>>>>>>>> >>>>>>>>>>> I'd like to reboot the discussion on KIP-508, >>>>>>>>>>> which aims to >>>>>> support a >>>>>>>>>>> Materialized variant of KTable#suppress. It >>>>>>>>>>> was initially >>>>>> submitted >>>>>>>>>> several >>>>>>>>>>> months ago but closed by the inactivity. >>>>>>>>>>> >>>>>>>>>>> - KIP: >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make +Su > >>>>>> ppression+State+Queriable >>>>>>>>>>> >>>>>> > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403 >>>>>>>>>>> >>>>>>>>>>> All kinds of feedback will be greatly >>>>>>>>>>> appreciated. >>>>>>>>>>> >>>>>>>>>>> Best, Dongjin >>>>>>>>>>> >>>>>>>>>>> -- *Dongjin Lee* >>>>>>>>>>> >>>>>>>>>>> *A hitchhiker in the mathematical world.* >>>>>>>>>>> *github: >>>>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr >>>>>>>>>>> >>>>>>>>>>> <https://github.com/dongjinleekr>linkedin: >>>>>>>>>> kr.linkedin.com/in/dongjinleekr >>>>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: >>>>>>>>>> >>>>>>>>>>> > >>>>>>>>>>> speakerdeck.com/dongjin >>>>>>>>>>> <https://speakerdeck.com/dongjin>* >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- *Dongjin Lee* >>>>>>>>> >>>>>>>>> *A hitchhiker in the mathematical world.* *github: >>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr >>>>>>>>> <https://github.com/dongjinleekr>linkedin: >>>>>>>> kr.linkedin.com/in/dongjinleekr >>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: >>>>>>>> >>>>>>>>> speakerdeck.com/dongjin >>>>>>>>> <https://speakerdeck.com/dongjin>* >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- *Dongjin Lee* >>>>>>> >>>>>>> *A hitchhiker in the mathematical world.* *github: >>>>>>> <http://goog_969573159/>github.com/dongjinleekr >>>>>>> <https://github.com/dongjinleekr>linkedin: >>>>>> kr.linkedin.com/in/dongjinleekr >>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: >>>>>> speakerdeck.com/dongjin >>>>>>> <https://speakerdeck.com/dongjin>* >>>>>>> >>>>>> >>>>> -- *Dongjin Lee* >>>>> >>>>> *A hitchhiker in the mathematical world.* *github: >>>>> <http://goog_969573159/>github.com/dongjinleekr >>>>> <https://github.com/dongjinleekr>linkedin: >>>>> kr.linkedin.com/in/dongjinleekr >>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: >>>>> speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>* >>>>> >> -----BEGIN PGP SIGNATURE----- iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq /OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B 397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB 0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB +WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2 02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU= =St1e -----END PGP SIGNATURE-----