Thanks Matthias,

Good idea. I've changed the ticket name and added a note
clarifying that this ticket is not the same as
https://issues.apache.org/jira/browse/KAFKA-7224

Incidentally, I learned that I never documented my reasons
for abandoning my work on KAFKA-7224 ! I've now updated
that ticket, too, so your question had an unexpected side-benefit.

Thanks,
-John

On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
> 
> Thanks for clarification.
> 
> Can you maybe update the Jira ticket? Do we have a ticket for
> spill-to-disk? Maybe link to it and explain that it's two different
> things? Maybe even rename the ticket to something more clear, ie,
> "make suppress result queryable" or simliar?
> 
> 
> - -Matthias
> 
> On 3/7/20 1:58 PM, John Roesler wrote:
> > Hey Matthias,
> >
> > I’m sorry if the ticket was poorly stated. The ticket is to add a
> DSL overload to pass a Materialized argument to suppress. As a result,
> the result of the suppression would be queriable.
> >
> > This is unrelated to “persistent buffer” aka “spill-to-disk”.
> >
> > There was some confusion before about whether this ticket could be
> implemented as “query the buffer”. Maybe it can, but not trivially.
> The obvious way is just to add a new state store which we write the
> results into just before we forward. I.e., it’s exactly like the
> materialized variant of any stateless KTable operation.
> >
> > Thanks, John
> >
> > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
> > the KIP Dongjin,
> >
> > I am still not sure if I can follow, what might also be caused by
> > the backing JIRA ticket (maybe John can clarify the intent of the
> > ticket as he created it):
> >
> > Currently, suppress() only uses an in-memory buffer and my
> > understanding of the Jira is, to add the ability to use a
> > persistent buffer (ie, spill to disk backed by RocksDB).
> >
> > Adding a persistent buffer is completely unrelated to allow
> > querying the buffer. In fact, one could query an in-memory buffer,
> > too. However, querying the buffer does not really seem to be useful
> > as pointed out by John, as you can always query the upstream KTable
> > store.
> >
> > Also note that for the emit-on-window-close case the result is
> > deleted from the buffer when it is emitted, and thus cannot be
> > queried any longe r.
> >
> >
> > Can you please clarify if you intend to allow spilling to disk or
> > if you intent to enable IQ (even if I don't see why querying make
> > sense, as the data is either upstream or deleted). Also, if you
> > want to enable IQ, why do we need all those new interfaces? The
> > result of a suppress() is a KTable that is the same as any other
> > key-value/windowed/sessions store?
> >
> > We should also have corresponding Jira tickets for different cases
> > to avoid the confusion I am in atm :)
> >
> >
> > -Matthias
> >
> >
> > On 2/27/20 8:21 AM, John Roesler wrote:
> >>>> Hi Dongjin,
> >>>>
> >>>> No problem; glad we got it sorted out.
> >>>>
> >>>> Thanks again for picking this up! -John
> >>>>
> >>>> On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> >>>>>> I was under the impression that you wanted to expand the
> >>>>>> scope of the KIP
> >>>>> to additionally allow querying the internal buffer, not
> >>>>> just the result. Can you clarify whether you are proposing
> >>>>> to allow querying the state of the internal buffer, the
> >>>>> result, or both?
> >>>>>
> >>>>> Sorry for the confusion. As we already talked with, we only
> >>>>> need to query the suppressed output, not the internal
> >>>>> buffer. The current implementation is wrong. After refining
> >>>>> the KIP and implementation accordingly I will notify you -
> >>>>> I must be confused, also.
> >>>>>
> >>>>> Thanks, Dongjin
> >>>>>
> >>>>> On Tue, Feb 25, 2020 at 12:17 AM John Roesler
> >>>>> <vvcep...@apache.org> wrote:
> >>>>>
> >>>>>> Hi Dongjin,
> >>>>>>
> >>>>>> Ah, I think I may have been confused. I 100% agree that
> >>>>>> we need a materialized variant for suppress(). Then, you
> >>>>>> could do: ...suppress(...,
> >>>>>> Materialized.as(“final-count”))
> >>>>>>
> >>>>>> If that’s your proposal, then we are on the same page.
> >>>>>>
> >>>>>> I was under the impression that you wanted to expand the
> >>>>>> scope of the KIP to additionally allow querying the
> >>>>>> internal buffer, not just the result. Can you clarify
> >>>>>> whether you are proposing to allow querying the state of
> >>>>>> the internal buffer, the result, or both?
> >>>>>>
> >>>>>> Thanks, John
> >>>>>>
> >>>>>> On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
> >>>>>>> Hi John, Thanks for your kind explanation with an
> >>>>>>> example.
> >>>>>>>
> >>>>>>>> But it feels like you're saying you're trying to do
> >>>>>>>> something different
> >>>>>>> than just query the windowed key and get back the
> >>>>>>> current count?
> >>>>>>>
> >>>>>>> Yes, for example, what if we need to retrieve the (all
> >>>>>>> or range) keys
> >>>>>> with
> >>>>>>> a closed window? In this example, let's imagine we need
> >>>>>>> to retrieve only (key=A, window=10), not (key=A,
> >>>>>>> window=20).
> >>>>>>>
> >>>>>>> Of course, the value accompanied by a flushed key is
> >>>>>>> exactly the same to the one in the upstream KTable;
> >>>>>>> However, if our intention is not pointing out a
> >>>>>>> specific key but retrieving a group of unspecified
> >>>>>>> keys, we stuck
> >>>>>> in
> >>>>>>> trouble - since we can't be sure which key is flushed
> >>>>>>> out beforehand.
> >>>>>>>
> >>>>>>> One workaround would be materializing it with
> >>>>>>> `suppressed.filter(e ->
> >>>>>> true,
> >>>>>>> Materialized.as("final-count"))`. But I think providing
> >>>>>>> a materialized variant for suppress method is better
> >>>>>>> than this workaround.
> >>>>>>>
> >>>>>>> Thanks, Dongjin
> >>>>>>>
> >>>>>>> On Thu, Feb 20, 2020 at 1:26 AM John Roesler
> >>>>>>> <vvcep...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the response, Dongjin,
> >>>>>>>>
> >>>>>>>> I'm sorry, but I'm still not following. It seems like
> >>>>>>>> the view you
> >>>>>> would
> >>>>>>>> get on the "current state of the buffer" would always
> >>>>>>>> be equivalent to the view of the upstream table.
> >>>>>>>>
> >>>>>>>> Let me try an example, and maybe you can point out
> >>>>>>>> the flaw in my reasoning.
> >>>>>>>>
> >>>>>>>> Let's say we're doing 10 ms windows with a grace
> >>>>>>>> period of zero. Let's also say we're computing a
> >>>>>>>> windowed count, and that we have a "final results"
> >>>>>>>> suppression after the count. Let's  materialize the
> >>>>>>>> count as "Count" and the suppressed result as "Final
> >>>>>>>> Count".
> >>>>>>>>
> >>>>>>>> Suppose we get an input event: (time=10, key=A,
> >>>>>>>> value=...)
> >>>>>>>>
> >>>>>>>> Then, Count will look like:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     1 |
> >>>>>>>>
> >>>>>>>> The (internal) suppression buffer will contain:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     1 |
> >>>>>>>>
> >>>>>>>> The record is still buffered because the window
> >>>>>>>> isn't closed yet. Final Count is an empty table:
> >>>>>>>>
> >>>>>>>> | window | key | value |
> >>>>>>>>
> >>>>>>>> ---------------
> >>>>>>>>
> >>>>>>>> Now, we get a second event: (time=15, key=A,
> >>>>>>>> value=...)
> >>>>>>>>
> >>>>>>>> Then, Count will look like:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>>>>
> >>>>>>>> The (internal) suppression buffer will contain:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>>>>
> >>>>>>>> The record is still buffered because the window
> >>>>>>>> isn't closed yet. Final Count is an empty table:
> >>>>>>>>
> >>>>>>>> | window | key | value |
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> ---------------
> >>>>>>>>
> >>>>>>>> Finally, we get a third event: (time=20, key=A,
> >>>>>>>> value=...)
> >>>>>>>>
> >>>>>>>> Then, Count will look like:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     2 | |
> >>>>>>>> 20 | A   |     1 |
> >>>>>>>>
> >>>>>>>> The (internal) suppression buffer will contain:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 20     | A   |     1 |
> >>>>>>>>
> >>>>>>>> Note that window 10 has been flushed out, because
> >>>>>>>> it's now closed. And window 20 is buffered because it
> >>>>>>>> isn't closed yet. Final Count is now:
> >>>>>>>>
> >>>>>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> ---------------
> >>>>>>>>
> >>>>>>>> Reading your email, I can't figure out what value
> >>>>>>>> there is in querying
> >>>>>> the
> >>>>>>>> internal suppression buffer, since it only contains
> >>>>>>>> exactly the same
> >>>>>> value
> >>>>>>>> as the upstream table, for each key that is still
> >>>>>>>> buffered. But it feels
> >>>>>> like
> >>>>>>>> you're saying you're trying to do something different
> >>>>>>>> than just query
> >>>>>> the
> >>>>>>>> windowed key and get back the current count?
> >>>>>>>>
> >>>>>>>> Thanks, -John
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote:
> >>>>>>>>> Hi John,
> >>>>>>>>>
> >>>>>>>>> 'The intermediate state of the suppression' in KIP
> >>>>>>>>> does not mean the
> >>>>>>>> state
> >>>>>>>>> of upstream KTable - sure, the state of the
> >>>>>>>>> upstream KTable can be
> >>>>>>>> queried
> >>>>>>>>> by materializing the operator immediately before
> >>>>>>>>> the suppress as you
> >>>>>>>> shown.
> >>>>>>>>> What I meant in KIP was the final state of the
> >>>>>>>>> buffer, which is not
> >>>>>>>> emitted
> >>>>>>>>> yet. (I agree, the current description may be
> >>>>>>>>> confusing; it would be
> >>>>>>>> better
> >>>>>>>>> to change it with 'the current state of the
> >>>>>>>>> suppression' or 'the
> >>>>>> results
> >>>>>>>> of
> >>>>>>>>> the suppression', like the Jira issue
> >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
> >>>>>>>>> states.)
> >>>>>>>>>
> >>>>>>>>> For a little bit more about the motivation, here is
> >>>>>>>>> one of my
> >>>>>>>> experience: I
> >>>>>>>>> had to build a monitoring application which
> >>>>>>>>> collects signals from IoT devices (say, a
> >>>>>>>>> semiconductor production line.) If the number of
> >>>>>>>> collected
> >>>>>>>>> signals within the time window is much less than
> >>>>>>>>> the expected, there
> >>>>>> may
> >>>>>>>> be
> >>>>>>>>> some problems like network hiccup in the systems.
> >>>>>>>>> We wanted to build
> >>>>>> the
> >>>>>>>>> system in the form of a dashboard, but could not by
> >>>>>>>>> lack of
> >>>>>> materializing
> >>>>>>>>> feature. It was precisely the case of querying only
> >>>>>>>>> the final
> >>>>>> results of
> >>>>>>>> a
> >>>>>>>>> windowed aggregation, as the Jira issue
> >>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
> >>>>>>>>> states. We
> >>>>>> finally
> >>>>>>>> ended
> >>>>>>>>> in implementing the system in an email alerting
> >>>>>>>>> system like this <
> >>>>>>>>
> >>>>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an
> d-t
> >
> >>>>>>
> riggers/
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> > and had to collect the keys and windows of trouble by hand.
> >>>>>>>>>
> >>>>>>>>> I think these kinds of use cases would be much
> >>>>>>>>> common. Should it be described in the KIP much more
> >>>>>>>>> in detail?
> >>>>>>>>>
> >>>>>>>>> Thanks, Dongjin
> >>>>>>>>>
> >>>>>>>>> On Sat, Feb 15, 2020 at 4:43 AM John Roesler
> >>>>>>>>> <vvcep...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Dongjin,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the KIP!
> >>>>>>>>>>
> >>>>>>>>>> Can you explain more about why the internal data
> >>>>>>>>>> structures of
> >>>>>>>> suppression
> >>>>>>>>>> should be queriable? The motivation just says
> >>>>>>>>>> that users might
> >>>>>> want to
> >>>>>>>> do
> >>>>>>>>>> it, which seems like it could justify literally
> >>>>>>>>>> anything :)
> >>>>>>>>>>
> >>>>>>>>>> One design point of Suppression is that if you
> >>>>>>>>>> wanted to query the
> >>>>>>>> “final
> >>>>>>>>>> state”, you can Materialize the suppress itself
> >>>>>>>>>> (which is why it
> >>>>>> needs
> >>>>>>>> the
> >>>>>>>>>> variant); if you wanted to query the
> >>>>>>>>>> “intermediate state”, you can materialize the
> >>>>>>>>>> operator immediately before the suppress.
> >>>>>>>>>>
> >>>>>>>>>> Example:
> >>>>>>>>>>
> >>>>>>>>>> ...count(Materialized.as(“intermediate”))
> >>>>>>>>>> .supress(untilWindowClosed(),
> >>>>>>>>>> Materialized.as(“final”))
> >>>>>>>>>>
> >>>>>>>>>> I’m not sure what use case would require
> >>>>>>>>>> actually fetching from the internal buffers.
> >>>>>>>>>>
> >>>>>>>>>> Thanks, John
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Feb 14, 2020, at 07:55, Dongjin Lee
> >>>>>>>>>> wrote:
> >>>>>>>>>>> Hi devs,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to reboot the discussion on KIP-508,
> >>>>>>>>>>> which aims to
> >>>>>> support a
> >>>>>>>>>>> Materialized variant of KTable#suppress. It
> >>>>>>>>>>> was initially
> >>>>>> submitted
> >>>>>>>>>> several
> >>>>>>>>>>> months ago but closed by the inactivity.
> >>>>>>>>>>>
> >>>>>>>>>>> - KIP:
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make
> +Su
> >
> >>>>>>
> ppression+State+Queriable
> >>>>>>>>>>>
> >>>>>>
> > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
> >>>>>>>>>>>
> >>>>>>>>>>> All kinds of feedback will be greatly
> >>>>>>>>>>> appreciated.
> >>>>>>>>>>>
> >>>>>>>>>>> Best, Dongjin
> >>>>>>>>>>>
> >>>>>>>>>>> -- *Dongjin Lee*
> >>>>>>>>>>>
> >>>>>>>>>>> *A hitchhiker in the mathematical world.*
> >>>>>>>>>>> *github:
> >>>>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>>>>>>>>
> >>>>>>>>>>>
> <https://github.com/dongjinleekr>linkedin:
> >>>>>>>>>> kr.linkedin.com/in/dongjinleekr
> >>>>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>>>>>>>
> >>>>>>>>>>>
> >
> >>>>>>>>>>>
> speakerdeck.com/dongjin
> >>>>>>>>>>> <https://speakerdeck.com/dongjin>*
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -- *Dongjin Lee*
> >>>>>>>>>
> >>>>>>>>> *A hitchhiker in the mathematical world.* *github:
> >>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>>>>>> <https://github.com/dongjinleekr>linkedin:
> >>>>>>>> kr.linkedin.com/in/dongjinleekr
> >>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>>>>>
> >>>>>>>>>
> speakerdeck.com/dongjin
> >>>>>>>>> <https://speakerdeck.com/dongjin>*
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -- *Dongjin Lee*
> >>>>>>>
> >>>>>>> *A hitchhiker in the mathematical world.* *github:
> >>>>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>>>> <https://github.com/dongjinleekr>linkedin:
> >>>>>> kr.linkedin.com/in/dongjinleekr
> >>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>>> speakerdeck.com/dongjin
> >>>>>>> <https://speakerdeck.com/dongjin>*
> >>>>>>>
> >>>>>>
> >>>>> -- *Dongjin Lee*
> >>>>>
> >>>>> *A hitchhiker in the mathematical world.* *github:
> >>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>> <https://github.com/dongjinleekr>linkedin:
> >>>>> kr.linkedin.com/in/dongjinleekr
> >>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>> speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*
> >>>>>
> >>
> -----BEGIN PGP SIGNATURE-----
> 
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq
> /OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC
> JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s
> Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B
> 397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB
> 0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow
> fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB
> +WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP
> w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c
> NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn
> cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2
> 02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU=
> =St1e
> -----END PGP SIGNATURE-----
>

Reply via email to