Hi John,

It seems like the available alternatives in this point is clear:

1. Pass queriable name as a separate parameter (i.e.,
`KTable#suppress(Suppressed, String)`)
2. Make use of the Suppression processor name as a queryable name by adding
`enableQuery` optional flag to `Suppressed`.

However, I doubt the second approach a little bit; As far as I know, the
processor name is introduced in KIP-307[^1] to make debugging topology easy
and understandable. Since the processor name is an independent concept with
the materialization, I feel the first approach is more natural and
consistent. Is there any specific reason that you prefer the second
approach?

Thanks,
Dongjin

[^1]:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL



On Wed, Sep 16, 2020 at 11:48 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Dongjin,
>
> Yes, that's where I was leaning. Although, I'd prefer adding
> the option to Suppressed instead of adding a new argument to
> the method call.
>
> What do you think about:
>
> class Suppressed<K> {
> +  public Suppressed<K> enableQuery();
> }
>
> Since Suppressed already has `withName(String)`, it seems
> like all we need to add is a boolean flag.
>
> Does that seem sensible to you?
>
> Thanks,
> -John
>
> On Wed, 2020-09-16 at 21:50 +0900, Dongjin Lee wrote:
> > Hi John,
> >
> > > Although it's not great to have "special snowflakes" in the API,
> Choice B
> > does seem safer in the short term. We would basically be proposing a
> > temporary API to make the suppressed view queriable without a
> Materialized
> > argument.
> >
> > Then, it seems like you prefer `KTable#suppress(Suppressed, String)`
> (i.e.,
> > queriable name only as a parameter) for this time, and refine API with
> the
> > other related KIPs later.
> >
> > Do I understand correctly?
> >
> > Thanks,
> > Dongjin
> >
> > On Wed, Sep 16, 2020 at 2:17 AM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Hi Dongjin,
> > >
> > > Thanks for presenting these options. The concern that
> > > Matthias brought up is a very deep problem that afflicts all
> > > operations downstream of windowing operations. It's the same
> > > thing that derailed KIP-300. For the larger context, I have
> > > developed a couple of approaches to resolve this situation,
> > > but I think it makes sense to finish up KIP-478 before
> > > presenting them.
> > >
> > > However, I don't think that we need in particular to block
> > > the current proposal on solving that long-running and deep
> > > issue with the DSL. Instead, we should make a top-level
> > > decision whether to:
> > >
> > > A: Make Suppress just like all the other KTable operations.
> > > It will have the same pathological behavior that the keyset
> > > is unbounded while the store implementation is only a
> > > KeyValueStore. Again, this exact pathology currently affects
> > > all KTable operations that follow from windowing operations.
> > > For example, it applies to the current workaround that
> > > Dongjin documented in the KIP:
> > > suppress().filter(Materialized<KeyValueStore>). This is
> > > Option 2 that Dongjin presented.
> > >
> > > B: Do something different with Suppress in order to side-
> > > step the problem. For example, Suppress does not _need_ to
> > > have a separate state store at all. If we just give people a
> > > switch to make the operation queriable, we can implement a
> > > ReadOnlyKeyValueStore interface by querying the "priorValue"
> > > of the buffer first and then querying the upstream
> > > ValueGetter. This broad category of "do something different
> > > with Suppress" encompases Option 1 and Option 3 that Dongjin
> > > presented.
> > >
> > >
> > > Speaking personally, I think Choice A would be the most
> > > obvious and least weird choice, but it presents a serious
> > > risk of escalating the severity of the problem of unbounded
> > > state. This is currently a risk that we're aware of, but has
> > > not yet become a big problem in practice. As Matthias
> > > pointed out, Suppress is far more likely to be used
> > > downstream of windowed tables than other operations, so
> > > having a Materialized<KVStore> overload has the significant
> > > risk of getting people into a bad state. Note, broadly
> > > advertising the workaround from the KIP would have the exact
> > > same impact, so we should be careful about recommending it.
> > >
> > > Although it's not great to have "special snowflakes" in the
> > > API, Choice B does seem safer in the short term. We would
> > > basically be proposing a temporary API to make the
> > > suppressed view queriable without a Materialized argument.
> > > Then, once we fix the main KIP-300 problem, we would look at
> > > converging Suppress with the rest of the KTable
> > > materialization APIs.
> > >
> > > WDYT?
> > > Thanks,
> > > -John
> > >
> > >
> > > On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote:
> > > > Hi Matthias,
> > > >
> > > > Thank you very much for the detailed feedback. Here are my opinions:
> > > >
> > > > > Because there is no final result for non-windowed KTables, it seems
> > > that
> > > > this new feature only make sense for the windowed-aggregation case?
> > > >
> > > > I think a little bit different. Of course, for windowed KTable, this
> > > > feature provides the final state; for non-windowed KTables, it
> provides a
> > > > view to the records received more than the predefined waiting time
> ago -
> > > > excluding the records that are waiting for more events.
> > > >
> > > > > Thus, the signature of `Materialized` should take a `WindowStore`
> > > instead
> > > > of a `KeyValueStore`?
> > > >
> > > > I reviewed the implementation following your comments and found the
> > > > following:
> > > >
> > > > 1. `Materialized` instance includes the following: KeySerde,
> ValueSerde,
> > > > StoreSupplier, and Queriable Name.
> > > > 2. The other `Materialized` method variants in `KTable` are making
> use of
> > > > KeySerde, ValueSerde, and Queriable Name only. (That is,
> StoreSupplier is
> > > > ignored.)
> > > > 3. `KTable#suppress(Suppressed, Materialized)` uses the Queriable
> Name
> > > > only. StoreSupplier is also ignored.
> > > >
> > > > So, we have three choices for the method signature:
> > > >
> > > > 1. `KTable#suppress(Suppressed, String)` (i.e., passing the Queriable
> > > Name
> > > > only):
> > > >
> > > >   This is the simplest; however, it is inconsistent with the other
> > > > Materialized variant methods.
> > > >
> > > > 2. `KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)`
> > > (i.e.,
> > > > current proposal)
> > > >
> > > >   This approach is harmless at this point, for StoreSupplier is
> ignored;
> > > > However, since suppression can be used to both of `KeyValueStore` and
> > > > `WindowStore`, this approach is not only weird but also leaving some
> > > > potential risk to the future. (On second thoughts, I agree, this API
> > > design
> > > > is bad and dangerous.)
> > > >
> > > > 3. `KTable#suppress(Suppressed, Materialized<K, V, StateStore>)`
> > > >
> > > >   This approach embraces both of `KeyValueStore` and `WindowStore`
> cases.
> > > > Since the concrete class type of `Suppressed` instance differs for
> the
> > > > `StateStore`[^1], it seems like we can validate the arguments on the
> > > method
> > > > call. (e.g., throw `IllegalArgumentException` if when `Suppressed`
> > > instance
> > > > on `KeyValueStore` is given with `Materialized` instance of
> > > `WindowStore`.)
> > > > This approach not only breaks the API consistency but also guards
> from a
> > > > miss-usage of the API.
> > > >
> > > > How do you think? I am now making a try on the third approach.
> > > >
> > > > Thanks,
> > > > Dongjin
> > > >
> > > > [^1]: `SuppressedInternal` for `KeyValueStore` and
> > > > `FinalResultsSuppressionBuilder` for `WindowStore`.
> > > >
> > > > On Sat, Sep 12, 2020 at 3:29 AM Matthias J. Sax <mj...@apache.org>
> > > wrote:
> > > > > Thanks for updating the KIP.
> > > > >
> > > > > I think there is still one open question. `suppress()` can be used
> on
> > > > > non-windowed KTable for rate control, as well as on a
> windowed-KTable
> > > > > (also for rate control, but actually mainly) for only emitting the
> > > final
> > > > > result of a windowed aggregation. For the non-windowed case, we
> use a
> > > > > KeyValueStore while for the windowed cases, we use a WindowStore.
> > > > >
> > > > > Because there is no final result for non-windowed KTables, it seems
> > > that
> > > > > this new feature only make sense for the windowed-aggregation case?
> > > > > Thus, the signature of `Materialized` should take a `WindowStore`
> > > > > instead of a `KeyValueStore`?
> > > > >
> > > > > If that's correct, I am wondering:
> > > > >
> > > > >  - Can we guard from a miss-usage of the API if the upstream
> KTable is
> > > > > not windowed (or maybe it's not necessary to guard)?
> > > > >  - Can we actually implement it? We had issues with regard to
> KIP-300
> > > to
> > > > > materialize windowed-KTables?
> > > > >
> > > > > Would be worth to clarify upfront. Maybe, we even need a POC
> > > > > implementation to verify that it works?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 9/11/20 12:26 AM, Dongjin Lee wrote:
> > > > > > Hi All,
> > > > > >
> > > > > > Here is the voting thread:
> > > > > >
> > >
> https://lists.apache.org/thread.html/r5653bf2dafbb27b247bf20dbe6f070c151b3823d96c9c9ca94183e20%40%3Cdev.kafka.apache.org%3E
> > > > > > Thanks,
> > > > > > Dongjin
> > > > > >
> > > > > > On Fri, Sep 11, 2020 at 4:23 PM Dongjin Lee <dong...@apache.org>
> > > wrote:
> > > > > > > Hi John,
> > > > > > >
> > > > > > > Thanks for the feedback. I will open the Vote thread now.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dongjin
> > > > > > >
> > > > > > > On Fri, Sep 11, 2020 at 2:00 AM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > > > > Hi Dongjin,
> > > > > > > >
> > > > > > > > Sorry for the delay. I'm glad you're still pushing this
> > > > > > > > forward. It would be nice to get this in to the 2.7 release.
> > > > > > > >
> > > > > > > > I just took another look at the KIP, and it looks good to
> > > > > > > > me!
> > > > > > > >
> > > > > > > > I think this is ready for a vote.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I updated the KIP
> > > > > > > > > <
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
> > > > > > > > > and the implementation, following the discussion here.
> > > > > > > > >
> > > > > > > > > You must be working hard preparing the release of 2.6.0, so
> > > please
> > > > > have
> > > > > > > > a
> > > > > > > > > look after your work is done.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Dongjin
> > > > > > > > >
> > > > > > > > > On Sun, Mar 8, 2020 at 12:20 PM John Roesler <
> > > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > > > Thanks Matthias,
> > > > > > > > > >
> > > > > > > > > > Good idea. I've changed the ticket name and added a note
> > > > > > > > > > clarifying that this ticket is not the same as
> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7224
> > > > > > > > > >
> > > > > > > > > > Incidentally, I learned that I never documented my
> reasons
> > > > > > > > > > for abandoning my work on KAFKA-7224 ! I've now updated
> > > > > > > > > > that ticket, too, so your question had an unexpected
> > > side-benefit.
> > > > > > > > > > Thanks,
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> > > > > > Thanks for clarification.
> > > > > >
> > > > > > Can you maybe update the Jira ticket? Do we have a ticket for
> > > > > > spill-to-disk? Maybe link to it and explain that it's two
> different
> > > > > > things? Maybe even rename the ticket to something more clear, ie,
> > > > > > "make suppress result queryable" or simliar?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 3/7/20 1:58 PM, John Roesler wrote:
> > > > > > > > > > > > > Hey Matthias,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I’m sorry if the ticket was poorly stated. The
> ticket
> > > is to add a
> > > > > > DSL overload to pass a Materialized argument to suppress. As a
> > > > > > > > > result,
> > > > > > the result of the suppression would be queriable.
> > > > > > > > > > > > > This is unrelated to “persistent buffer” aka
> > > “spill-to-disk”.
> > > > > > > > > > > > > There was some confusion before about whether this
> > > ticket could be
> > > > > > implemented as “query the buffer”. Maybe it can, but not
> trivially.
> > > > > > The obvious way is just to add a new state store which we write
> the
> > > > > > results into just before we forward. I.e., it’s exactly like the
> > > > > > materialized variant of any stateless KTable operation.
> > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax
> wrote:
> > > Thanks for
> > > > > > > > > > > > > the KIP Dongjin,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am still not sure if I can follow, what might
> also
> > > be caused by
> > > > > > > > > > > > > the backing JIRA ticket (maybe John can clarify the
> > > intent of the
> > > > > > > > > > > > > ticket as he created it):
> > > > > > > > > > > > >
> > > > > > > > > > > > > Currently, suppress() only uses an in-memory buffer
> > > and my
> > > > > > > > > > > > > understanding of the Jira is, to add the ability to
> > > use a
> > > > > > > > > > > > > persistent buffer (ie, spill to disk backed by
> > > RocksDB).
> > > > > > > > > > > > > Adding a persistent buffer is completely unrelated
> to
> > > allow
> > > > > > > > > > > > > querying the buffer. In fact, one could query an
> > > in-memory buffer,
> > > > > > > > > > > > > too. However, querying the buffer does not really
> seem
> > > to be
> > > > > > > > > useful
> > > > > > > > > > > > > as pointed out by John, as you can always query the
> > > upstream
> > > > > > > > > KTable
> > > > > > > > > > > > > store.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Also note that for the emit-on-window-close case
> the
> > > result is
> > > > > > > > > > > > > deleted from the buffer when it is emitted, and
> thus
> > > cannot be
> > > > > > > > > > > > > queried any longe r.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Can you please clarify if you intend to allow
> spilling
> > > to disk or
> > > > > > > > > > > > > if you intent to enable IQ (even if I don't see why
> > > querying make
> > > > > > > > > > > > > sense, as the data is either upstream or deleted).
> > > Also, if you
> > > > > > > > > > > > > want to enable IQ, why do we need all those new
> > > interfaces? The
> > > > > > > > > > > > > result of a suppress() is a KTable that is the
> same as
> > > any other
> > > > > > > > > > > > > key-value/windowed/sessions store?
> > > > > > > > > > > > >
> > > > > > > > > > > > > We should also have corresponding Jira tickets for
> > > different cases
> > > > > > > > > > > > > to avoid the confusion I am in atm :)
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On 2/27/20 8:21 AM, John Roesler wrote:
> > > > > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > No problem; glad we got it sorted out.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks again for picking this up! -John
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee
> > > wrote:
> > > > > > > > > > > > > > > > > > I was under the impression that you
> wanted
> > > to expand the
> > > > > > > > > > > > > > > > > > scope of the KIP
> > > > > > > > > > > > > > > > > to additionally allow querying the internal
> > > buffer, not
> > > > > > > > > > > > > > > > > just the result. Can you clarify whether
> you
> > > are proposing
> > > > > > > > > > > > > > > > > to allow querying the state of the internal
> > > buffer, the
> > > > > > > > > > > > > > > > > result, or both?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Sorry for the confusion. As we already
> talked
> > > with, we
> > > > > > > > > only
> > > > > > > > > > > > > > > > > need to query the suppressed output, not
> the
> > > internal
> > > > > > > > > > > > > > > > > buffer. The current implementation is
> wrong.
> > > After
> > > > > > > > > refining
> > > > > > > > > > > > > > > > > the KIP and implementation accordingly I
> will
> > > notify you -
> > > > > > > > > > > > > > > > > I must be confused, also.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Tue, Feb 25, 2020 at 12:17 AM John
> Roesler
> > > > > > > > > > > > > > > > > <vvcep...@apache.org> wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Ah, I think I may have been confused. I
> 100%
> > > agree that
> > > > > > > > > > > > > > > > > > we need a materialized variant for
> > > suppress(). Then, you
> > > > > > > > > > > > > > > > > > could do: ...suppress(...,
> > > > > > > > > > > > > > > > > > Materialized.as(“final-count”))
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > If that’s your proposal, then we are on
> the
> > > same page.
> > > > > > > > > > > > > > > > > > I was under the impression that you
> wanted
> > > to expand the
> > > > > > > > > > > > > > > > > > scope of the KIP to additionally allow
> > > querying the
> > > > > > > > > > > > > > > > > > internal buffer, not just the result. Can
> > > you clarify
> > > > > > > > > > > > > > > > > > whether you are proposing to allow
> querying
> > > the state of
> > > > > > > > > > > > > > > > > > the internal buffer, the result, or both?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin
> Lee
> > > wrote:
> > > > > > > > > > > > > > > > > > > Hi John, Thanks for your kind
> explanation
> > > with an
> > > > > > > > > > > > > > > > > > > example.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > But it feels like you're saying
> you're
> > > trying to do
> > > > > > > > > > > > > > > > > > > > something different
> > > > > > > > > > > > > > > > > > > than just query the windowed key and
> get
> > > back the
> > > > > > > > > > > > > > > > > > > current count?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Yes, for example, what if we need to
> > > retrieve the (all
> > > > > > > > > > > > > > > > > > > or range) keys
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > a closed window? In this example, let's
> > > imagine we
> > > > > > > > > need
> > > > > > > > > > > > > > > > > > > to retrieve only (key=A, window=10),
> not
> > > (key=A,
> > > > > > > > > > > > > > > > > > > window=20).
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Of course, the value accompanied by a
> > > flushed key is
> > > > > > > > > > > > > > > > > > > exactly the same to the one in the
> > > upstream KTable;
> > > > > > > > > > > > > > > > > > > However, if our intention is not
> pointing
> > > out a
> > > > > > > > > > > > > > > > > > > specific key but retrieving a group of
> > > unspecified
> > > > > > > > > > > > > > > > > > > keys, we stuck
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > trouble - since we can't be sure which
> key
> > > is flushed
> > > > > > > > > > > > > > > > > > > out beforehand.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > One workaround would be materializing
> it
> > > with
> > > > > > > > > > > > > > > > > > > `suppressed.filter(e ->
> > > > > > > > > > > > > > > > > > true,
> > > > > > > > > > > > > > > > > > > Materialized.as("final-count"))`. But I
> > > think
> > > > > > > > > providing
> > > > > > > > > > > > > > > > > > > a materialized variant for suppress
> method
> > > is better
> > > > > > > > > > > > > > > > > > > than this workaround.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John
> > > Roesler
> > > > > > > > > > > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Thanks for the response, Dongjin,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I'm sorry, but I'm still not
> following.
> > > It seems
> > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > the view you
> > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > get on the "current state of the
> buffer"
> > > would
> > > > > > > > > always
> > > > > > > > > > > > > > > > > > > > be equivalent to the view of the
> > > upstream table.
> > > > > > > > > > > > > > > > > > > > Let me try an example, and maybe you
> can
> > > point out
> > > > > > > > > > > > > > > > > > > > the flaw in my reasoning.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Let's say we're doing 10 ms windows
> with
> > > a grace
> > > > > > > > > > > > > > > > > > > > period of zero. Let's also say we're
> > > computing a
> > > > > > > > > > > > > > > > > > > > windowed count, and that we have a
> > > "final results"
> > > > > > > > > > > > > > > > > > > > suppression after the count. Let's
> > > materialize the
> > > > > > > > > > > > > > > > > > > > count as "Count" and the suppressed
> > > result as "Final
> > > > > > > > > > > > > > > > > > > > Count".
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Suppose we get an input event:
> (time=10,
> > > key=A,
> > > > > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     1 |
> > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer
> will
> > > contain:
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     1 |
> > > > > > > > > > > > > > > > > > > > The record is still buffered because
> the
> > > window
> > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an
> > > empty table:
> > > > > > > > > > > > > > > > > > > > > window | key | value |
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Now, we get a second event: (time=15,
> > > key=A,
> > > > > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     2 |
> > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer
> will
> > > contain:
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     2 |
> > > > > > > > > > > > > > > > > > > > The record is still buffered because
> the
> > > window
> > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an
> > > empty table:
> > > > > > > > > > > > > > > > > > > > > window | key | value |
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Finally, we get a third event:
> (time=20,
> > > key=A,
> > > > > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     2 | |
> > > > > > > > > > > > > > > > > > > > 20 | A   |     1 |
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > The (internal) suppression buffer
> will
> > > contain:
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 20     | A
> > >  |     1 |
> > > > > > > > > > > > > > > > > > > > Note that window 10 has been flushed
> > > out, because
> > > > > > > > > > > > > > > > > > > > it's now closed. And window 20 is
> > > buffered because
> > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > isn't closed yet. Final Count is now:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > window | key | value | | 10     | A
> > >  |     2 |
> > > > > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Reading your email, I can't figure
> out
> > > what value
> > > > > > > > > > > > > > > > > > > > there is in querying
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > internal suppression buffer, since it
> > > only contains
> > > > > > > > > > > > > > > > > > > > exactly the same
> > > > > > > > > > > > > > > > > > value
> > > > > > > > > > > > > > > > > > > > as the upstream table, for each key
> that
> > > is still
> > > > > > > > > > > > > > > > > > > > buffered. But it feels
> > > > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > you're saying you're trying to do
> > > something
> > > > > > > > > different
> > > > > > > > > > > > > > > > > > > > than just query
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > windowed key and get back the current
> > > count?
> > > > > > > > > > > > > > > > > > > > Thanks, -John
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49,
> Dongjin
> > > Lee wrote:
> > > > > > > > > > > > > > > > > > > > > Hi John,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > 'The intermediate state of the
> > > suppression' in KIP
> > > > > > > > > > > > > > > > > > > > > does not mean the
> > > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > of upstream KTable - sure, the
> state
> > > of the
> > > > > > > > > > > > > > > > > > > > > upstream KTable can be
> > > > > > > > > > > > > > > > > > > > queried
> > > > > > > > > > > > > > > > > > > > > by materializing the operator
> > > immediately before
> > > > > > > > > > > > > > > > > > > > > the suppress as you
> > > > > > > > > > > > > > > > > > > > shown.
> > > > > > > > > > > > > > > > > > > > > What I meant in KIP was the final
> > > state of the
> > > > > > > > > > > > > > > > > > > > > buffer, which is not
> > > > > > > > > > > > > > > > > > > > emitted
> > > > > > > > > > > > > > > > > > > > > yet. (I agree, the current
> description
> > > may be
> > > > > > > > > > > > > > > > > > > > > confusing; it would be
> > > > > > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > > > > to change it with 'the current
> state
> > > of the
> > > > > > > > > > > > > > > > > > > > > suppression' or 'the
> > > > > > > > > > > > > > > > > > results
> > > > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > the suppression', like the Jira
> issue
> > > > > > > > > > > > > > > > > > > > > <
> > > https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > > > > states.)
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > For a little bit more about the
> > > motivation, here
> > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > one of my
> > > > > > > > > > > > > > > > > > > > experience: I
> > > > > > > > > > > > > > > > > > > > > had to build a monitoring
> application
> > > which
> > > > > > > > > > > > > > > > > > > > > collects signals from IoT devices
> > > (say, a
> > > > > > > > > > > > > > > > > > > > > semiconductor production line.) If
> the
> > > number of
> > > > > > > > > > > > > > > > > > > > collected
> > > > > > > > > > > > > > > > > > > > > signals within the time window is
> much
> > > less than
> > > > > > > > > > > > > > > > > > > > > the expected, there
> > > > > > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > some problems like network hiccup
> in
> > > the systems.
> > > > > > > > > > > > > > > > > > > > > We wanted to build
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > system in the form of a dashboard,
> but
> > > could not
> > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > > lack of
> > > > > > > > > > > > > > > > > > materializing
> > > > > > > > > > > > > > > > > > > > > feature. It was precisely the case
> of
> > > querying
> > > > > > > > > only
> > > > > > > > > > > > > > > > > > > > > the final
> > > > > > > > > > > > > > > > > > results of
> > > > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > windowed aggregation, as the Jira
> issue
> > > > > > > > > > > > > > > > > > > > > <
> > > https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > > > > states. We
> > > > > > > > > > > > > > > > > > finally
> > > > > > > > > > > > > > > > > > > > ended
> > > > > > > > > > > > > > > > > > > > > in implementing the system in an
> email
> > > alerting
> > > > > > > > > > > > > > > > > > > > > system like this <
> > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an
> > > > > > d-t
> > > > > > riggers/
> > > > > > > > > > > > > and had to collect the keys and windows of trouble
> by
> > > hand.
> > > > > > > > > > > > > > > > > > > > > I think these kinds of use cases
> would
> > > be much
> > > > > > > > > > > > > > > > > > > > > common. Should it be described in
> the
> > > KIP much
> > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > > in detail?
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM
> John
> > > Roesler
> > > > > > > > > > > > > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Can you explain more about why
> the
> > > internal data
> > > > > > > > > > > > > > > > > > > > > > structures of
> > > > > > > > > > > > > > > > > > > > suppression
> > > > > > > > > > > > > > > > > > > > > > should be queriable? The
> motivation
> > > just says
> > > > > > > > > > > > > > > > > > > > > > that users might
> > > > > > > > > > > > > > > > > > want to
> > > > > > > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > > > > > > > it, which seems like it could
> > > justify literally
> > > > > > > > > > > > > > > > > > > > > > anything :)
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > One design point of Suppression
> is
> > > that if you
> > > > > > > > > > > > > > > > > > > > > > wanted to query the
> > > > > > > > > > > > > > > > > > > > “final
> > > > > > > > > > > > > > > > > > > > > > state”, you can Materialize the
> > > suppress itself
> > > > > > > > > > > > > > > > > > > > > > (which is why it
> > > > > > > > > > > > > > > > > > needs
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > variant); if you wanted to query
> the
> > > > > > > > > > > > > > > > > > > > > > “intermediate state”, you can
> > > materialize the
> > > > > > > > > > > > > > > > > > > > > > operator immediately before the
> > > suppress.
> > > > > > > > > > > > > > > > > > > > > > Example:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > ...count(Materialized.as(“intermediate”))
> > > > > > > > > > > > > > > > > > > > > > .supress(untilWindowClosed(),
> > > > > > > > > > > > > > > > > > > > > > Materialized.as(“final”))
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > I’m not sure what use case would
> > > require
> > > > > > > > > > > > > > > > > > > > > > actually fetching from the
> internal
> > > buffers.
> > > > > > > > > > > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55,
> > > Dongjin Lee
> > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > Hi devs,
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > I'd like to reboot the
> discussion
> > > on KIP-508,
> > > > > > > > > > > > > > > > > > > > > > > which aims to
> > > > > > > > > > > > > > > > > > support a
> > > > > > > > > > > > > > > > > > > > > > > Materialized variant of
> > > KTable#suppress. It
> > > > > > > > > > > > > > > > > > > > > > > was initially
> > > > > > > > > > > > > > > > > > submitted
> > > > > > > > > > > > > > > > > > > > > > several
> > > > > > > > > > > > > > > > > > > > > > > months ago but closed by the
> > > inactivity.
> > > > > > > > > > > > > > > > > > > > > > > - KIP:
> > > > > > > > > > > > > > > > > > > > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make
> > > > > > +Su
> > > > > > ppression+State+Queriable
> > > > > > > > > > > > > - Jira:
> > > https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > > > > > > All kinds of feedback will be
> > > greatly
> > > > > > > > > > > > > > > > > > > > > > > appreciated.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > Best, Dongjin
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the
> mathematical
> > > world.*
> > > > > > > > > > > > > > > > > > > > > > > *github:
> > > > > > > > > > > > > > > > > > > > > > > <http://goog_969573159/>
> > > > > > > > > github.com/dongjinleekr
> > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > > > > > > > <
> > > https://kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > speakerdeck:
> > > > > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > > > > > > > > <
> https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical
> > > world.* *github:
> > > > > > > > > > > > > > > > > > > > > <http://goog_969573159/>
> > > github.com/dongjinleekr
> > > > > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr
> > > > linkedin:
> > > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > > > > > <
> > > https://kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > speakerdeck:
> > > > > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical
> world.*
> > > *github:
> > > > > > > > > > > > > > > > > > > <http://goog_969573159/>
> > > github.com/dongjinleekr
> > > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr
> >linkedin:
> > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > > > <
> https://kr.linkedin.com/in/dongjinleekr
> > > > speakerdeck:
> > > > > > > > > > > > > > > > > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.*
> > > *github:
> > > > > > > > > > > > > > > > > <http://goog_969573159/>
> > > github.com/dongjinleekr
> > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr
> > > > speakerdeck:
> > > > > > > > > > > > > > > > > speakerdeck.com/dongjin <
> > > https://speakerdeck.com/dongjin
> > > > > > > > > > *
> > > > > > >
> > > > > > > --
> > > > > > > *Dongjin Lee*
> > > > > > >
> > > > > > > *A hitchhiker in the mathematical world.*
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > <https://github.com/dongjinleekr>keybase:
> > > > > https://keybase.io/dongjinleekr
> > > > > > > <https://keybase.io/dongjinleekr>linkedin:
> > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > speakerdeck.com/dongjin
> > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > >
>
>

-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*




*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr
<https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Reply via email to