-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Thanks for clarification.

Can you maybe update the Jira ticket? Do we have a ticket for
spill-to-disk? Maybe link to it and explain that it's two different
things? Maybe even rename the ticket to something more clear, ie,
"make suppress result queryable" or simliar?


- -Matthias

On 3/7/20 1:58 PM, John Roesler wrote:
> Hey Matthias,
>
> I’m sorry if the ticket was poorly stated. The ticket is to add a
DSL overload to pass a Materialized argument to suppress. As a result,
the result of the suppression would be queriable.
>
> This is unrelated to “persistent buffer” aka “spill-to-disk”.
>
> There was some confusion before about whether this ticket could be
implemented as “query the buffer”. Maybe it can, but not trivially.
The obvious way is just to add a new state store which we write the
results into just before we forward. I.e., it’s exactly like the
materialized variant of any stateless KTable operation.
>
> Thanks, John
>
> On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
> the KIP Dongjin,
>
> I am still not sure if I can follow, what might also be caused by
> the backing JIRA ticket (maybe John can clarify the intent of the
> ticket as he created it):
>
> Currently, suppress() only uses an in-memory buffer and my
> understanding of the Jira is, to add the ability to use a
> persistent buffer (ie, spill to disk backed by RocksDB).
>
> Adding a persistent buffer is completely unrelated to allow
> querying the buffer. In fact, one could query an in-memory buffer,
> too. However, querying the buffer does not really seem to be useful
> as pointed out by John, as you can always query the upstream KTable
> store.
>
> Also note that for the emit-on-window-close case the result is
> deleted from the buffer when it is emitted, and thus cannot be
> queried any longe r.
>
>
> Can you please clarify if you intend to allow spilling to disk or
> if you intent to enable IQ (even if I don't see why querying make
> sense, as the data is either upstream or deleted). Also, if you
> want to enable IQ, why do we need all those new interfaces? The
> result of a suppress() is a KTable that is the same as any other
> key-value/windowed/sessions store?
>
> We should also have corresponding Jira tickets for different cases
> to avoid the confusion I am in atm :)
>
>
> -Matthias
>
>
> On 2/27/20 8:21 AM, John Roesler wrote:
>>>> Hi Dongjin,
>>>>
>>>> No problem; glad we got it sorted out.
>>>>
>>>> Thanks again for picking this up! -John
>>>>
>>>> On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
>>>>>> I was under the impression that you wanted to expand the
>>>>>> scope of the KIP
>>>>> to additionally allow querying the internal buffer, not
>>>>> just the result. Can you clarify whether you are proposing
>>>>> to allow querying the state of the internal buffer, the
>>>>> result, or both?
>>>>>
>>>>> Sorry for the confusion. As we already talked with, we only
>>>>> need to query the suppressed output, not the internal
>>>>> buffer. The current implementation is wrong. After refining
>>>>> the KIP and implementation accordingly I will notify you -
>>>>> I must be confused, also.
>>>>>
>>>>> Thanks, Dongjin
>>>>>
>>>>> On Tue, Feb 25, 2020 at 12:17 AM John Roesler
>>>>> <vvcep...@apache.org> wrote:
>>>>>
>>>>>> Hi Dongjin,
>>>>>>
>>>>>> Ah, I think I may have been confused. I 100% agree that
>>>>>> we need a materialized variant for suppress(). Then, you
>>>>>> could do: ...suppress(...,
>>>>>> Materialized.as(“final-count”))
>>>>>>
>>>>>> If that’s your proposal, then we are on the same page.
>>>>>>
>>>>>> I was under the impression that you wanted to expand the
>>>>>> scope of the KIP to additionally allow querying the
>>>>>> internal buffer, not just the result. Can you clarify
>>>>>> whether you are proposing to allow querying the state of
>>>>>> the internal buffer, the result, or both?
>>>>>>
>>>>>> Thanks, John
>>>>>>
>>>>>> On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
>>>>>>> Hi John, Thanks for your kind explanation with an
>>>>>>> example.
>>>>>>>
>>>>>>>> But it feels like you're saying you're trying to do
>>>>>>>> something different
>>>>>>> than just query the windowed key and get back the
>>>>>>> current count?
>>>>>>>
>>>>>>> Yes, for example, what if we need to retrieve the (all
>>>>>>> or range) keys
>>>>>> with
>>>>>>> a closed window? In this example, let's imagine we need
>>>>>>> to retrieve only (key=A, window=10), not (key=A,
>>>>>>> window=20).
>>>>>>>
>>>>>>> Of course, the value accompanied by a flushed key is
>>>>>>> exactly the same to the one in the upstream KTable;
>>>>>>> However, if our intention is not pointing out a
>>>>>>> specific key but retrieving a group of unspecified
>>>>>>> keys, we stuck
>>>>>> in
>>>>>>> trouble - since we can't be sure which key is flushed
>>>>>>> out beforehand.
>>>>>>>
>>>>>>> One workaround would be materializing it with
>>>>>>> `suppressed.filter(e ->
>>>>>> true,
>>>>>>> Materialized.as("final-count"))`. But I think providing
>>>>>>> a materialized variant for suppress method is better
>>>>>>> than this workaround.
>>>>>>>
>>>>>>> Thanks, Dongjin
>>>>>>>
>>>>>>> On Thu, Feb 20, 2020 at 1:26 AM John Roesler
>>>>>>> <vvcep...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the response, Dongjin,
>>>>>>>>
>>>>>>>> I'm sorry, but I'm still not following. It seems like
>>>>>>>> the view you
>>>>>> would
>>>>>>>> get on the "current state of the buffer" would always
>>>>>>>> be equivalent to the view of the upstream table.
>>>>>>>>
>>>>>>>> Let me try an example, and maybe you can point out
>>>>>>>> the flaw in my reasoning.
>>>>>>>>
>>>>>>>> Let's say we're doing 10 ms windows with a grace
>>>>>>>> period of zero. Let's also say we're computing a
>>>>>>>> windowed count, and that we have a "final results"
>>>>>>>> suppression after the count. Let's  materialize the
>>>>>>>> count as "Count" and the suppressed result as "Final
>>>>>>>> Count".
>>>>>>>>
>>>>>>>> Suppose we get an input event: (time=10, key=A,
>>>>>>>> value=...)
>>>>>>>>
>>>>>>>> Then, Count will look like:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     1 |
>>>>>>>>
>>>>>>>> The (internal) suppression buffer will contain:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     1 |
>>>>>>>>
>>>>>>>> The record is still buffered because the window
>>>>>>>> isn't closed yet. Final Count is an empty table:
>>>>>>>>
>>>>>>>> | window | key | value |
>>>>>>>>
>>>>>>>> ---------------
>>>>>>>>
>>>>>>>> Now, we get a second event: (time=15, key=A,
>>>>>>>> value=...)
>>>>>>>>
>>>>>>>> Then, Count will look like:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     2 |
>>>>>>>>
>>>>>>>> The (internal) suppression buffer will contain:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     2 |
>>>>>>>>
>>>>>>>> The record is still buffered because the window
>>>>>>>> isn't closed yet. Final Count is an empty table:
>>>>>>>>
>>>>>>>> | window | key | value |
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------
>>>>>>>>
>>>>>>>> Finally, we get a third event: (time=20, key=A,
>>>>>>>> value=...)
>>>>>>>>
>>>>>>>> Then, Count will look like:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     2 | |
>>>>>>>> 20 | A   |     1 |
>>>>>>>>
>>>>>>>> The (internal) suppression buffer will contain:
>>>>>>>>
>>>>>>>> | window | key | value | | 20     | A   |     1 |
>>>>>>>>
>>>>>>>> Note that window 10 has been flushed out, because
>>>>>>>> it's now closed. And window 20 is buffered because it
>>>>>>>> isn't closed yet. Final Count is now:
>>>>>>>>
>>>>>>>> | window | key | value | | 10     | A   |     2 |
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------
>>>>>>>>
>>>>>>>> Reading your email, I can't figure out what value
>>>>>>>> there is in querying
>>>>>> the
>>>>>>>> internal suppression buffer, since it only contains
>>>>>>>> exactly the same
>>>>>> value
>>>>>>>> as the upstream table, for each key that is still
>>>>>>>> buffered. But it feels
>>>>>> like
>>>>>>>> you're saying you're trying to do something different
>>>>>>>> than just query
>>>>>> the
>>>>>>>> windowed key and get back the current count?
>>>>>>>>
>>>>>>>> Thanks, -John
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote:
>>>>>>>>> Hi John,
>>>>>>>>>
>>>>>>>>> 'The intermediate state of the suppression' in KIP
>>>>>>>>> does not mean the
>>>>>>>> state
>>>>>>>>> of upstream KTable - sure, the state of the
>>>>>>>>> upstream KTable can be
>>>>>>>> queried
>>>>>>>>> by materializing the operator immediately before
>>>>>>>>> the suppress as you
>>>>>>>> shown.
>>>>>>>>> What I meant in KIP was the final state of the
>>>>>>>>> buffer, which is not
>>>>>>>> emitted
>>>>>>>>> yet. (I agree, the current description may be
>>>>>>>>> confusing; it would be
>>>>>>>> better
>>>>>>>>> to change it with 'the current state of the
>>>>>>>>> suppression' or 'the
>>>>>> results
>>>>>>>> of
>>>>>>>>> the suppression', like the Jira issue
>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
>>>>>>>>> states.)
>>>>>>>>>
>>>>>>>>> For a little bit more about the motivation, here is
>>>>>>>>> one of my
>>>>>>>> experience: I
>>>>>>>>> had to build a monitoring application which
>>>>>>>>> collects signals from IoT devices (say, a
>>>>>>>>> semiconductor production line.) If the number of
>>>>>>>> collected
>>>>>>>>> signals within the time window is much less than
>>>>>>>>> the expected, there
>>>>>> may
>>>>>>>> be
>>>>>>>>> some problems like network hiccup in the systems.
>>>>>>>>> We wanted to build
>>>>>> the
>>>>>>>>> system in the form of a dashboard, but could not by
>>>>>>>>> lack of
>>>>>> materializing
>>>>>>>>> feature. It was precisely the case of querying only
>>>>>>>>> the final
>>>>>> results of
>>>>>>>> a
>>>>>>>>> windowed aggregation, as the Jira issue
>>>>>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
>>>>>>>>> states. We
>>>>>> finally
>>>>>>>> ended
>>>>>>>>> in implementing the system in an email alerting
>>>>>>>>> system like this <
>>>>>>>>
>>>>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an
d-t
>
>>>>>>
riggers/
>>>>>>>>>
>>>>>>>>>
>>>>>>
> and had to collect the keys and windows of trouble by hand.
>>>>>>>>>
>>>>>>>>> I think these kinds of use cases would be much
>>>>>>>>> common. Should it be described in the KIP much more
>>>>>>>>> in detail?
>>>>>>>>>
>>>>>>>>> Thanks, Dongjin
>>>>>>>>>
>>>>>>>>> On Sat, Feb 15, 2020 at 4:43 AM John Roesler
>>>>>>>>> <vvcep...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Dongjin,
>>>>>>>>>>
>>>>>>>>>> Thanks for the KIP!
>>>>>>>>>>
>>>>>>>>>> Can you explain more about why the internal data
>>>>>>>>>> structures of
>>>>>>>> suppression
>>>>>>>>>> should be queriable? The motivation just says
>>>>>>>>>> that users might
>>>>>> want to
>>>>>>>> do
>>>>>>>>>> it, which seems like it could justify literally
>>>>>>>>>> anything :)
>>>>>>>>>>
>>>>>>>>>> One design point of Suppression is that if you
>>>>>>>>>> wanted to query the
>>>>>>>> “final
>>>>>>>>>> state”, you can Materialize the suppress itself
>>>>>>>>>> (which is why it
>>>>>> needs
>>>>>>>> the
>>>>>>>>>> variant); if you wanted to query the
>>>>>>>>>> “intermediate state”, you can materialize the
>>>>>>>>>> operator immediately before the suppress.
>>>>>>>>>>
>>>>>>>>>> Example:
>>>>>>>>>>
>>>>>>>>>> ...count(Materialized.as(“intermediate”))
>>>>>>>>>> .supress(untilWindowClosed(),
>>>>>>>>>> Materialized.as(“final”))
>>>>>>>>>>
>>>>>>>>>> I’m not sure what use case would require
>>>>>>>>>> actually fetching from the internal buffers.
>>>>>>>>>>
>>>>>>>>>> Thanks, John
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 14, 2020, at 07:55, Dongjin Lee
>>>>>>>>>> wrote:
>>>>>>>>>>> Hi devs,
>>>>>>>>>>>
>>>>>>>>>>> I'd like to reboot the discussion on KIP-508,
>>>>>>>>>>> which aims to
>>>>>> support a
>>>>>>>>>>> Materialized variant of KTable#suppress. It
>>>>>>>>>>> was initially
>>>>>> submitted
>>>>>>>>>> several
>>>>>>>>>>> months ago but closed by the inactivity.
>>>>>>>>>>>
>>>>>>>>>>> - KIP:
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make
+Su
>
>>>>>>
ppression+State+Queriable
>>>>>>>>>>>
>>>>>>
> - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
>>>>>>>>>>>
>>>>>>>>>>> All kinds of feedback will be greatly
>>>>>>>>>>> appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Best, Dongjin
>>>>>>>>>>>
>>>>>>>>>>> -- *Dongjin Lee*
>>>>>>>>>>>
>>>>>>>>>>> *A hitchhiker in the mathematical world.*
>>>>>>>>>>> *github:
>>>>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
>>>>>>>>>>>
>>>>>>>>>>>
<https://github.com/dongjinleekr>linkedin:
>>>>>>>>>> kr.linkedin.com/in/dongjinleekr
>>>>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
>>>>>>>>>>
>>>>>>>>>>>
>
>>>>>>>>>>>
speakerdeck.com/dongjin
>>>>>>>>>>> <https://speakerdeck.com/dongjin>*
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -- *Dongjin Lee*
>>>>>>>>>
>>>>>>>>> *A hitchhiker in the mathematical world.* *github:
>>>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
>>>>>>>>> <https://github.com/dongjinleekr>linkedin:
>>>>>>>> kr.linkedin.com/in/dongjinleekr
>>>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
>>>>>>>>
>>>>>>>>>
speakerdeck.com/dongjin
>>>>>>>>> <https://speakerdeck.com/dongjin>*
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -- *Dongjin Lee*
>>>>>>>
>>>>>>> *A hitchhiker in the mathematical world.* *github:
>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
>>>>>>> <https://github.com/dongjinleekr>linkedin:
>>>>>> kr.linkedin.com/in/dongjinleekr
>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
>>>>>> speakerdeck.com/dongjin
>>>>>>> <https://speakerdeck.com/dongjin>*
>>>>>>>
>>>>>>
>>>>> -- *Dongjin Lee*
>>>>>
>>>>> *A hitchhiker in the mathematical world.* *github:
>>>>> <http://goog_969573159/>github.com/dongjinleekr
>>>>> <https://github.com/dongjinleekr>linkedin:
>>>>> kr.linkedin.com/in/dongjinleekr
>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
>>>>> speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*
>>>>>
>>
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq
/OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC
JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s
Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B
397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB
0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow
fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB
+WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP
w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c
NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn
cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2
02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU=
=St1e
-----END PGP SIGNATURE-----

Reply via email to