Hi John, It seems like the available alternatives in this point is clear:
1. Pass queriable name as a separate parameter (i.e., `KTable#suppress(Suppressed, String)`) 2. Make use of the Suppression processor name as a queryable name by adding `enableQuery` optional flag to `Suppressed`. However, I doubt the second approach a little bit; As far as I know, the processor name is introduced in KIP-307[^1] to make debugging topology easy and understandable. Since the processor name is an independent concept with the materialization, I feel the first approach is more natural and consistent. Is there any specific reason that you prefer the second approach? Thanks, Dongjin [^1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL On Wed, Sep 16, 2020 at 11:48 PM John Roesler <vvcep...@apache.org> wrote: > Hi Dongjin, > > Yes, that's where I was leaning. Although, I'd prefer adding > the option to Suppressed instead of adding a new argument to > the method call. > > What do you think about: > > class Suppressed<K> { > + public Suppressed<K> enableQuery(); > } > > Since Suppressed already has `withName(String)`, it seems > like all we need to add is a boolean flag. > > Does that seem sensible to you? > > Thanks, > -John > > On Wed, 2020-09-16 at 21:50 +0900, Dongjin Lee wrote: > > Hi John, > > > > > Although it's not great to have "special snowflakes" in the API, > Choice B > > does seem safer in the short term. We would basically be proposing a > > temporary API to make the suppressed view queriable without a > Materialized > > argument. > > > > Then, it seems like you prefer `KTable#suppress(Suppressed, String)` > (i.e., > > queriable name only as a parameter) for this time, and refine API with > the > > other related KIPs later. > > > > Do I understand correctly? > > > > Thanks, > > Dongjin > > > > On Wed, Sep 16, 2020 at 2:17 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > Hi Dongjin, > > > > > > Thanks for presenting these options. The concern that > > > Matthias brought up is a very deep problem that afflicts all > > > operations downstream of windowing operations. It's the same > > > thing that derailed KIP-300. For the larger context, I have > > > developed a couple of approaches to resolve this situation, > > > but I think it makes sense to finish up KIP-478 before > > > presenting them. > > > > > > However, I don't think that we need in particular to block > > > the current proposal on solving that long-running and deep > > > issue with the DSL. Instead, we should make a top-level > > > decision whether to: > > > > > > A: Make Suppress just like all the other KTable operations. > > > It will have the same pathological behavior that the keyset > > > is unbounded while the store implementation is only a > > > KeyValueStore. Again, this exact pathology currently affects > > > all KTable operations that follow from windowing operations. > > > For example, it applies to the current workaround that > > > Dongjin documented in the KIP: > > > suppress().filter(Materialized<KeyValueStore>). This is > > > Option 2 that Dongjin presented. > > > > > > B: Do something different with Suppress in order to side- > > > step the problem. For example, Suppress does not _need_ to > > > have a separate state store at all. If we just give people a > > > switch to make the operation queriable, we can implement a > > > ReadOnlyKeyValueStore interface by querying the "priorValue" > > > of the buffer first and then querying the upstream > > > ValueGetter. This broad category of "do something different > > > with Suppress" encompases Option 1 and Option 3 that Dongjin > > > presented. > > > > > > > > > Speaking personally, I think Choice A would be the most > > > obvious and least weird choice, but it presents a serious > > > risk of escalating the severity of the problem of unbounded > > > state. This is currently a risk that we're aware of, but has > > > not yet become a big problem in practice. As Matthias > > > pointed out, Suppress is far more likely to be used > > > downstream of windowed tables than other operations, so > > > having a Materialized<KVStore> overload has the significant > > > risk of getting people into a bad state. Note, broadly > > > advertising the workaround from the KIP would have the exact > > > same impact, so we should be careful about recommending it. > > > > > > Although it's not great to have "special snowflakes" in the > > > API, Choice B does seem safer in the short term. We would > > > basically be proposing a temporary API to make the > > > suppressed view queriable without a Materialized argument. > > > Then, once we fix the main KIP-300 problem, we would look at > > > converging Suppress with the rest of the KTable > > > materialization APIs. > > > > > > WDYT? > > > Thanks, > > > -John > > > > > > > > > On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote: > > > > Hi Matthias, > > > > > > > > Thank you very much for the detailed feedback. Here are my opinions: > > > > > > > > > Because there is no final result for non-windowed KTables, it seems > > > that > > > > this new feature only make sense for the windowed-aggregation case? > > > > > > > > I think a little bit different. Of course, for windowed KTable, this > > > > feature provides the final state; for non-windowed KTables, it > provides a > > > > view to the records received more than the predefined waiting time > ago - > > > > excluding the records that are waiting for more events. > > > > > > > > > Thus, the signature of `Materialized` should take a `WindowStore` > > > instead > > > > of a `KeyValueStore`? > > > > > > > > I reviewed the implementation following your comments and found the > > > > following: > > > > > > > > 1. `Materialized` instance includes the following: KeySerde, > ValueSerde, > > > > StoreSupplier, and Queriable Name. > > > > 2. The other `Materialized` method variants in `KTable` are making > use of > > > > KeySerde, ValueSerde, and Queriable Name only. (That is, > StoreSupplier is > > > > ignored.) > > > > 3. `KTable#suppress(Suppressed, Materialized)` uses the Queriable > Name > > > > only. StoreSupplier is also ignored. > > > > > > > > So, we have three choices for the method signature: > > > > > > > > 1. `KTable#suppress(Suppressed, String)` (i.e., passing the Queriable > > > Name > > > > only): > > > > > > > > This is the simplest; however, it is inconsistent with the other > > > > Materialized variant methods. > > > > > > > > 2. `KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)` > > > (i.e., > > > > current proposal) > > > > > > > > This approach is harmless at this point, for StoreSupplier is > ignored; > > > > However, since suppression can be used to both of `KeyValueStore` and > > > > `WindowStore`, this approach is not only weird but also leaving some > > > > potential risk to the future. (On second thoughts, I agree, this API > > > design > > > > is bad and dangerous.) > > > > > > > > 3. `KTable#suppress(Suppressed, Materialized<K, V, StateStore>)` > > > > > > > > This approach embraces both of `KeyValueStore` and `WindowStore` > cases. > > > > Since the concrete class type of `Suppressed` instance differs for > the > > > > `StateStore`[^1], it seems like we can validate the arguments on the > > > method > > > > call. (e.g., throw `IllegalArgumentException` if when `Suppressed` > > > instance > > > > on `KeyValueStore` is given with `Materialized` instance of > > > `WindowStore`.) > > > > This approach not only breaks the API consistency but also guards > from a > > > > miss-usage of the API. > > > > > > > > How do you think? I am now making a try on the third approach. > > > > > > > > Thanks, > > > > Dongjin > > > > > > > > [^1]: `SuppressedInternal` for `KeyValueStore` and > > > > `FinalResultsSuppressionBuilder` for `WindowStore`. > > > > > > > > On Sat, Sep 12, 2020 at 3:29 AM Matthias J. Sax <mj...@apache.org> > > > wrote: > > > > > Thanks for updating the KIP. > > > > > > > > > > I think there is still one open question. `suppress()` can be used > on > > > > > non-windowed KTable for rate control, as well as on a > windowed-KTable > > > > > (also for rate control, but actually mainly) for only emitting the > > > final > > > > > result of a windowed aggregation. For the non-windowed case, we > use a > > > > > KeyValueStore while for the windowed cases, we use a WindowStore. > > > > > > > > > > Because there is no final result for non-windowed KTables, it seems > > > that > > > > > this new feature only make sense for the windowed-aggregation case? > > > > > Thus, the signature of `Materialized` should take a `WindowStore` > > > > > instead of a `KeyValueStore`? > > > > > > > > > > If that's correct, I am wondering: > > > > > > > > > > - Can we guard from a miss-usage of the API if the upstream > KTable is > > > > > not windowed (or maybe it's not necessary to guard)? > > > > > - Can we actually implement it? We had issues with regard to > KIP-300 > > > to > > > > > materialize windowed-KTables? > > > > > > > > > > Would be worth to clarify upfront. Maybe, we even need a POC > > > > > implementation to verify that it works? > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 9/11/20 12:26 AM, Dongjin Lee wrote: > > > > > > Hi All, > > > > > > > > > > > > Here is the voting thread: > > > > > > > > > > https://lists.apache.org/thread.html/r5653bf2dafbb27b247bf20dbe6f070c151b3823d96c9c9ca94183e20%40%3Cdev.kafka.apache.org%3E > > > > > > Thanks, > > > > > > Dongjin > > > > > > > > > > > > On Fri, Sep 11, 2020 at 4:23 PM Dongjin Lee <dong...@apache.org> > > > wrote: > > > > > > > Hi John, > > > > > > > > > > > > > > Thanks for the feedback. I will open the Vote thread now. > > > > > > > > > > > > > > Best, > > > > > > > Dongjin > > > > > > > > > > > > > > On Fri, Sep 11, 2020 at 2:00 AM John Roesler < > vvcep...@apache.org> > > > > > wrote: > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > Sorry for the delay. I'm glad you're still pushing this > > > > > > > > forward. It would be nice to get this in to the 2.7 release. > > > > > > > > > > > > > > > > I just took another look at the KIP, and it looks good to > > > > > > > > me! > > > > > > > > > > > > > > > > I think this is ready for a vote. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > -John > > > > > > > > > > > > > > > > On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote: > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > I updated the KIP > > > > > > > > > < > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable > > > > > > > > > and the implementation, following the discussion here. > > > > > > > > > > > > > > > > > > You must be working hard preparing the release of 2.6.0, so > > > please > > > > > have > > > > > > > > a > > > > > > > > > look after your work is done. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Dongjin > > > > > > > > > > > > > > > > > > On Sun, Mar 8, 2020 at 12:20 PM John Roesler < > > > vvcep...@apache.org> > > > > > > > > wrote: > > > > > > > > > > Thanks Matthias, > > > > > > > > > > > > > > > > > > > > Good idea. I've changed the ticket name and added a note > > > > > > > > > > clarifying that this ticket is not the same as > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7224 > > > > > > > > > > > > > > > > > > > > Incidentally, I learned that I never documented my > reasons > > > > > > > > > > for abandoning my work on KAFKA-7224 ! I've now updated > > > > > > > > > > that ticket, too, so your question had an unexpected > > > side-benefit. > > > > > > > > > > Thanks, > > > > > > > > > > -John > > > > > > > > > > > > > > > > > > > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote: > > > > > > Thanks for clarification. > > > > > > > > > > > > Can you maybe update the Jira ticket? Do we have a ticket for > > > > > > spill-to-disk? Maybe link to it and explain that it's two > different > > > > > > things? Maybe even rename the ticket to something more clear, ie, > > > > > > "make suppress result queryable" or simliar? > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 3/7/20 1:58 PM, John Roesler wrote: > > > > > > > > > > > > > Hey Matthias, > > > > > > > > > > > > > > > > > > > > > > > > > > I’m sorry if the ticket was poorly stated. The > ticket > > > is to add a > > > > > > DSL overload to pass a Materialized argument to suppress. As a > > > > > > > > > result, > > > > > > the result of the suppression would be queriable. > > > > > > > > > > > > > This is unrelated to “persistent buffer” aka > > > “spill-to-disk”. > > > > > > > > > > > > > There was some confusion before about whether this > > > ticket could be > > > > > > implemented as “query the buffer”. Maybe it can, but not > trivially. > > > > > > The obvious way is just to add a new state store which we write > the > > > > > > results into just before we forward. I.e., it’s exactly like the > > > > > > materialized variant of any stateless KTable operation. > > > > > > > > > > > > > Thanks, John > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax > wrote: > > > Thanks for > > > > > > > > > > > > > the KIP Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > I am still not sure if I can follow, what might > also > > > be caused by > > > > > > > > > > > > > the backing JIRA ticket (maybe John can clarify the > > > intent of the > > > > > > > > > > > > > ticket as he created it): > > > > > > > > > > > > > > > > > > > > > > > > > > Currently, suppress() only uses an in-memory buffer > > > and my > > > > > > > > > > > > > understanding of the Jira is, to add the ability to > > > use a > > > > > > > > > > > > > persistent buffer (ie, spill to disk backed by > > > RocksDB). > > > > > > > > > > > > > Adding a persistent buffer is completely unrelated > to > > > allow > > > > > > > > > > > > > querying the buffer. In fact, one could query an > > > in-memory buffer, > > > > > > > > > > > > > too. However, querying the buffer does not really > seem > > > to be > > > > > > > > > useful > > > > > > > > > > > > > as pointed out by John, as you can always query the > > > upstream > > > > > > > > > KTable > > > > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > > > > > > > Also note that for the emit-on-window-close case > the > > > result is > > > > > > > > > > > > > deleted from the buffer when it is emitted, and > thus > > > cannot be > > > > > > > > > > > > > queried any longe r. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you please clarify if you intend to allow > spilling > > > to disk or > > > > > > > > > > > > > if you intent to enable IQ (even if I don't see why > > > querying make > > > > > > > > > > > > > sense, as the data is either upstream or deleted). > > > Also, if you > > > > > > > > > > > > > want to enable IQ, why do we need all those new > > > interfaces? The > > > > > > > > > > > > > result of a suppress() is a KTable that is the > same as > > > any other > > > > > > > > > > > > > key-value/windowed/sessions store? > > > > > > > > > > > > > > > > > > > > > > > > > > We should also have corresponding Jira tickets for > > > different cases > > > > > > > > > > > > > to avoid the confusion I am in atm :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2/27/20 8:21 AM, John Roesler wrote: > > > > > > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > No problem; glad we got it sorted out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks again for picking this up! -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee > > > wrote: > > > > > > > > > > > > > > > > > > I was under the impression that you > wanted > > > to expand the > > > > > > > > > > > > > > > > > > scope of the KIP > > > > > > > > > > > > > > > > > to additionally allow querying the internal > > > buffer, not > > > > > > > > > > > > > > > > > just the result. Can you clarify whether > you > > > are proposing > > > > > > > > > > > > > > > > > to allow querying the state of the internal > > > buffer, the > > > > > > > > > > > > > > > > > result, or both? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry for the confusion. As we already > talked > > > with, we > > > > > > > > > only > > > > > > > > > > > > > > > > > need to query the suppressed output, not > the > > > internal > > > > > > > > > > > > > > > > > buffer. The current implementation is > wrong. > > > After > > > > > > > > > refining > > > > > > > > > > > > > > > > > the KIP and implementation accordingly I > will > > > notify you - > > > > > > > > > > > > > > > > > I must be confused, also. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 25, 2020 at 12:17 AM John > Roesler > > > > > > > > > > > > > > > > > <vvcep...@apache.org> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ah, I think I may have been confused. I > 100% > > > agree that > > > > > > > > > > > > > > > > > > we need a materialized variant for > > > suppress(). Then, you > > > > > > > > > > > > > > > > > > could do: ...suppress(..., > > > > > > > > > > > > > > > > > > Materialized.as(“final-count”)) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If that’s your proposal, then we are on > the > > > same page. > > > > > > > > > > > > > > > > > > I was under the impression that you > wanted > > > to expand the > > > > > > > > > > > > > > > > > > scope of the KIP to additionally allow > > > querying the > > > > > > > > > > > > > > > > > > internal buffer, not just the result. Can > > > you clarify > > > > > > > > > > > > > > > > > > whether you are proposing to allow > querying > > > the state of > > > > > > > > > > > > > > > > > > the internal buffer, the result, or both? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin > Lee > > > wrote: > > > > > > > > > > > > > > > > > > > Hi John, Thanks for your kind > explanation > > > with an > > > > > > > > > > > > > > > > > > > example. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > But it feels like you're saying > you're > > > trying to do > > > > > > > > > > > > > > > > > > > > something different > > > > > > > > > > > > > > > > > > > than just query the windowed key and > get > > > back the > > > > > > > > > > > > > > > > > > > current count? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, for example, what if we need to > > > retrieve the (all > > > > > > > > > > > > > > > > > > > or range) keys > > > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > > > a closed window? In this example, let's > > > imagine we > > > > > > > > > need > > > > > > > > > > > > > > > > > > > to retrieve only (key=A, window=10), > not > > > (key=A, > > > > > > > > > > > > > > > > > > > window=20). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Of course, the value accompanied by a > > > flushed key is > > > > > > > > > > > > > > > > > > > exactly the same to the one in the > > > upstream KTable; > > > > > > > > > > > > > > > > > > > However, if our intention is not > pointing > > > out a > > > > > > > > > > > > > > > > > > > specific key but retrieving a group of > > > unspecified > > > > > > > > > > > > > > > > > > > keys, we stuck > > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > trouble - since we can't be sure which > key > > > is flushed > > > > > > > > > > > > > > > > > > > out beforehand. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One workaround would be materializing > it > > > with > > > > > > > > > > > > > > > > > > > `suppressed.filter(e -> > > > > > > > > > > > > > > > > > > true, > > > > > > > > > > > > > > > > > > > Materialized.as("final-count"))`. But I > > > think > > > > > > > > > providing > > > > > > > > > > > > > > > > > > > a materialized variant for suppress > method > > > is better > > > > > > > > > > > > > > > > > > > than this workaround. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John > > > Roesler > > > > > > > > > > > > > > > > > > > <vvcep...@apache.org> > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Thanks for the response, Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm sorry, but I'm still not > following. > > > It seems > > > > > > > > > like > > > > > > > > > > > > > > > > > > > > the view you > > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > get on the "current state of the > buffer" > > > would > > > > > > > > > always > > > > > > > > > > > > > > > > > > > > be equivalent to the view of the > > > upstream table. > > > > > > > > > > > > > > > > > > > > Let me try an example, and maybe you > can > > > point out > > > > > > > > > > > > > > > > > > > > the flaw in my reasoning. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Let's say we're doing 10 ms windows > with > > > a grace > > > > > > > > > > > > > > > > > > > > period of zero. Let's also say we're > > > computing a > > > > > > > > > > > > > > > > > > > > windowed count, and that we have a > > > "final results" > > > > > > > > > > > > > > > > > > > > suppression after the count. Let's > > > materialize the > > > > > > > > > > > > > > > > > > > > count as "Count" and the suppressed > > > result as "Final > > > > > > > > > > > > > > > > > > > > Count". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Suppose we get an input event: > (time=10, > > > key=A, > > > > > > > > > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 1 | > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer > will > > > contain: > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 1 | > > > > > > > > > > > > > > > > > > > > The record is still buffered because > the > > > window > > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an > > > empty table: > > > > > > > > > > > > > > > > > > > > > window | key | value | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Now, we get a second event: (time=15, > > > key=A, > > > > > > > > > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 2 | > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer > will > > > contain: > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 2 | > > > > > > > > > > > > > > > > > > > > The record is still buffered because > the > > > window > > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an > > > empty table: > > > > > > > > > > > > > > > > > > > > > window | key | value | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Finally, we get a third event: > (time=20, > > > key=A, > > > > > > > > > > > > > > > > > > > > value=...) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Then, Count will look like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 2 | | > > > > > > > > > > > > > > > > > > > > 20 | A | 1 | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer > will > > > contain: > > > > > > > > > > > > > > > > > > > > > window | key | value | | 20 | A > > > | 1 | > > > > > > > > > > > > > > > > > > > > Note that window 10 has been flushed > > > out, because > > > > > > > > > > > > > > > > > > > > it's now closed. And window 20 is > > > buffered because > > > > > > > > > it > > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is now: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > window | key | value | | 10 | A > > > | 2 | > > > > > > > > > > > > > > > > > > > > --------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Reading your email, I can't figure > out > > > what value > > > > > > > > > > > > > > > > > > > > there is in querying > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > internal suppression buffer, since it > > > only contains > > > > > > > > > > > > > > > > > > > > exactly the same > > > > > > > > > > > > > > > > > > value > > > > > > > > > > > > > > > > > > > > as the upstream table, for each key > that > > > is still > > > > > > > > > > > > > > > > > > > > buffered. But it feels > > > > > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > > > > > > you're saying you're trying to do > > > something > > > > > > > > > different > > > > > > > > > > > > > > > > > > > > than just query > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > windowed key and get back the current > > > count? > > > > > > > > > > > > > > > > > > > > Thanks, -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49, > Dongjin > > > Lee wrote: > > > > > > > > > > > > > > > > > > > > > Hi John, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 'The intermediate state of the > > > suppression' in KIP > > > > > > > > > > > > > > > > > > > > > does not mean the > > > > > > > > > > > > > > > > > > > > state > > > > > > > > > > > > > > > > > > > > > of upstream KTable - sure, the > state > > > of the > > > > > > > > > > > > > > > > > > > > > upstream KTable can be > > > > > > > > > > > > > > > > > > > > queried > > > > > > > > > > > > > > > > > > > > > by materializing the operator > > > immediately before > > > > > > > > > > > > > > > > > > > > > the suppress as you > > > > > > > > > > > > > > > > > > > > shown. > > > > > > > > > > > > > > > > > > > > > What I meant in KIP was the final > > > state of the > > > > > > > > > > > > > > > > > > > > > buffer, which is not > > > > > > > > > > > > > > > > > > > > emitted > > > > > > > > > > > > > > > > > > > > > yet. (I agree, the current > description > > > may be > > > > > > > > > > > > > > > > > > > > > confusing; it would be > > > > > > > > > > > > > > > > > > > > better > > > > > > > > > > > > > > > > > > > > > to change it with 'the current > state > > > of the > > > > > > > > > > > > > > > > > > > > > suppression' or 'the > > > > > > > > > > > > > > > > > > results > > > > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > > the suppression', like the Jira > issue > > > > > > > > > > > > > > > > > > > > > < > > > https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > > > > > > > > > > > > states.) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For a little bit more about the > > > motivation, here > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > > one of my > > > > > > > > > > > > > > > > > > > > experience: I > > > > > > > > > > > > > > > > > > > > > had to build a monitoring > application > > > which > > > > > > > > > > > > > > > > > > > > > collects signals from IoT devices > > > (say, a > > > > > > > > > > > > > > > > > > > > > semiconductor production line.) If > the > > > number of > > > > > > > > > > > > > > > > > > > > collected > > > > > > > > > > > > > > > > > > > > > signals within the time window is > much > > > less than > > > > > > > > > > > > > > > > > > > > > the expected, there > > > > > > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > > some problems like network hiccup > in > > > the systems. > > > > > > > > > > > > > > > > > > > > > We wanted to build > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > system in the form of a dashboard, > but > > > could not > > > > > > > > > by > > > > > > > > > > > > > > > > > > > > > lack of > > > > > > > > > > > > > > > > > > materializing > > > > > > > > > > > > > > > > > > > > > feature. It was precisely the case > of > > > querying > > > > > > > > > only > > > > > > > > > > > > > > > > > > > > > the final > > > > > > > > > > > > > > > > > > results of > > > > > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > > > windowed aggregation, as the Jira > issue > > > > > > > > > > > > > > > > > > > > > < > > > https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > > > > > > > > > > > > states. We > > > > > > > > > > > > > > > > > > finally > > > > > > > > > > > > > > > > > > > > ended > > > > > > > > > > > > > > > > > > > > > in implementing the system in an > email > > > alerting > > > > > > > > > > > > > > > > > > > > > system like this < > > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an > > > > > > d-t > > > > > > riggers/ > > > > > > > > > > > > > and had to collect the keys and windows of trouble > by > > > hand. > > > > > > > > > > > > > > > > > > > > > I think these kinds of use cases > would > > > be much > > > > > > > > > > > > > > > > > > > > > common. Should it be described in > the > > > KIP much > > > > > > > > > more > > > > > > > > > > > > > > > > > > > > > in detail? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM > John > > > Roesler > > > > > > > > > > > > > > > > > > > > > <vvcep...@apache.org> > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you explain more about why > the > > > internal data > > > > > > > > > > > > > > > > > > > > > > structures of > > > > > > > > > > > > > > > > > > > > suppression > > > > > > > > > > > > > > > > > > > > > > should be queriable? The > motivation > > > just says > > > > > > > > > > > > > > > > > > > > > > that users might > > > > > > > > > > > > > > > > > > want to > > > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > > > it, which seems like it could > > > justify literally > > > > > > > > > > > > > > > > > > > > > > anything :) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One design point of Suppression > is > > > that if you > > > > > > > > > > > > > > > > > > > > > > wanted to query the > > > > > > > > > > > > > > > > > > > > “final > > > > > > > > > > > > > > > > > > > > > > state”, you can Materialize the > > > suppress itself > > > > > > > > > > > > > > > > > > > > > > (which is why it > > > > > > > > > > > > > > > > > > needs > > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > variant); if you wanted to query > the > > > > > > > > > > > > > > > > > > > > > > “intermediate state”, you can > > > materialize the > > > > > > > > > > > > > > > > > > > > > > operator immediately before the > > > suppress. > > > > > > > > > > > > > > > > > > > > > > Example: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ...count(Materialized.as(“intermediate”)) > > > > > > > > > > > > > > > > > > > > > > .supress(untilWindowClosed(), > > > > > > > > > > > > > > > > > > > > > > Materialized.as(“final”)) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I’m not sure what use case would > > > require > > > > > > > > > > > > > > > > > > > > > > actually fetching from the > internal > > > buffers. > > > > > > > > > > > > > > > > > > > > > > Thanks, John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, > > > Dongjin Lee > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to reboot the > discussion > > > on KIP-508, > > > > > > > > > > > > > > > > > > > > > > > which aims to > > > > > > > > > > > > > > > > > > support a > > > > > > > > > > > > > > > > > > > > > > > Materialized variant of > > > KTable#suppress. It > > > > > > > > > > > > > > > > > > > > > > > was initially > > > > > > > > > > > > > > > > > > submitted > > > > > > > > > > > > > > > > > > > > > > several > > > > > > > > > > > > > > > > > > > > > > > months ago but closed by the > > > inactivity. > > > > > > > > > > > > > > > > > > > > > > > - KIP: > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make > > > > > > +Su > > > > > > ppression+State+Queriable > > > > > > > > > > > > > - Jira: > > > https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > > > > > > > > > > > > > > All kinds of feedback will be > > > greatly > > > > > > > > > > > > > > > > > > > > > > > appreciated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, Dongjin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the > mathematical > > > world.* > > > > > > > > > > > > > > > > > > > > > > > *github: > > > > > > > > > > > > > > > > > > > > > > > <http://goog_969573159/> > > > > > > > > > github.com/dongjinleekr > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > > > > > > > > > > > < > > > https://kr.linkedin.com/in/dongjinleekr > > > > > > > > > > speakerdeck: > > > > > > speakerdeck.com/dongjin > > > > > > > > > > > > > > > > > > > > > > > < > https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical > > > world.* *github: > > > > > > > > > > > > > > > > > > > > > <http://goog_969573159/> > > > github.com/dongjinleekr > > > > > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr > > > > linkedin: > > > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > > > > > > > > > < > > > https://kr.linkedin.com/in/dongjinleekr > > > > > > > > > > speakerdeck: > > > > > > speakerdeck.com/dongjin > > > > > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical > world.* > > > *github: > > > > > > > > > > > > > > > > > > > <http://goog_969573159/> > > > github.com/dongjinleekr > > > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr > >linkedin: > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > > > > > > > < > https://kr.linkedin.com/in/dongjinleekr > > > > speakerdeck: > > > > > > > > > > > > > > > > > > speakerdeck.com/dongjin > > > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > *github: > > > > > > > > > > > > > > > > > <http://goog_969573159/> > > > github.com/dongjinleekr > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr > > > > speakerdeck: > > > > > > > > > > > > > > > > > speakerdeck.com/dongjin < > > > https://speakerdeck.com/dongjin > > > > > > > > > > * > > > > > > > > > > > > > > -- > > > > > > > *Dongjin Lee* > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > > > > > <https://github.com/dongjinleekr>keybase: > > > > > https://keybase.io/dongjinleekr > > > > > > > <https://keybase.io/dongjinleekr>linkedin: > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > > speakerdeck.com/dongjin > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr <https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*