Hello Ivan,

These are the two proposal you have shared with us

KIP-655: Windowed Distinct Operation for Kafka Streams API
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API

KIP-759: Unneeded repartition canceling
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling

The first proposal makes sense to introduce the distinct() method and it
appears the second proposal is needed to efficiently implement the first
one.

I will need to spend more time to analyze and understand better the second
KIP but I agree with and support the first one right away.

Thanks Ivan for taking the time document and communicate this with the
project.

Sincerely,
Israel


On Sat, Jul 10, 2021 at 1:11 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Ivan,
>
> Sorry for the silence!
>
> I have just re-read the proposal.
>
> To summarize, you are now only proposing the zero-arg distict() method to
> be added to TimeWindowedKStream and SessionWindowedKStream, right?
>
> I’m in favor of this proposal.
>
> Thanks,
> John
>
> On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
> > Hello everyone,
> >
> > I would like to remind you about KIP-655 and KIP-759 just in case they
> > got lost in your inbox.
> >
> > Now the initial proposal is split into two independent and smaller ones,
> > so it must be easier to review them. Of course, if you have time.
> >
> > Regards,
> >
> > Ivan
> >
> >
> > 24.06.2021 18:11, Ivan Ponomarev пишет:
> > > Hello all,
> > >
> > > I have rewritten the KIP-655 summarizing what was agreed upon during
> > > this discussion (now the proposal is much simpler and less invasive).
> > >
> > > I have also created KIP-759 (cancelRepartition operation) and started
> a
> > > discussion for it.
> > >
> > > Regards,
> > >
> > > Ivan.
> > >
> > >
> > >
> > > 04.06.2021 8:15, Matthias J. Sax пишет:
> > >> Just skimmed over the thread -- first of all, I am glad that we could
> > >> merge KIP-418 and ship it :)
> > >>
> > >> About the re-partitioning concerns, there are already two tickets for
> it:
> > >>
> > >>   - https://issues.apache.org/jira/browse/KAFKA-4835
> > >>   - https://issues.apache.org/jira/browse/KAFKA-10844
> > >>
> > >> Thus, it seems best to exclude this topic from this KIP, and do a
> > >> separate KIP for it (if necessary, we can "pause" this KIP until the
> > >> repartition KIP is done). It's a long standing "issue" and we should
> > >> resolve it in a general way I guess.
> > >>
> > >> (Did not yet ready all responses in detail yet, so keeping this
> comment
> > >> short.)
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 6/2/21 6:35 AM, John Roesler wrote:
> > >>> Thanks, Ivan!
> > >>>
> > >>> That sounds like a great plan to me. Two smaller KIPs are easier to
> > >>> agree on than one big one.
> > >>>
> > >>> I agree hopping and sliding windows will actually have a duplicating
> > >>> effect. We can avoid adding distinct() to the sliding window
> > >>> interface, but hopping windows are just a different parameterization
> > >>> of epoch-aligned windows. It seems we can’t do much about that
> except
> > >>> document the issue.
> > >>>
> > >>> Thanks,
> > >>> John
> > >>>
> > >>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> > >>>> Hi John!
> > >>>>
> > >>>> I think that your proposal is just fantastic, it simplifies things
> a
> > >>>> lot!
> > >>>>
> > >>>> I also felt uncomfortable due to the fact that the proposed
> > >>>> `distinct()`
> > >>>> is not somewhere near `count()` and `reduce(..)`. But
> > >>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look
> like
> > >>>> a correct option for  me because of the issue with the unneeded
> > >>>> repartitioning.
> > >>>>
> > >>>> The bold idea that we can just CANCEL the repartitioning didn't
> came to
> > >>>> my mind.
> > >>>>
> > >>>> What seemed to me a single problem is in fact two unrelated
> problems:
> > >>>> `distinct` operation and cancelling the unneeded repartitioning.
> > >>>>
> > >>>>   > what if we introduce a parameter to `selectKey()` that
> specifies
> > >>>> that
> > >>>> the caller asserts that the new key does _not_ change the data
> > >>>> partitioning?
> > >>>>
> > >>>> I think a more elegant solution would be not to add a new parameter
> to
> > >>>> `selectKey` and all the other key-changing operations (`map`,
> > >>>> `transform`, `flatMap`, ...), but add a new operator
> > >>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation`
> > >>>> flag
> > >>>> for the upstream node. Of course, "use it only if you know what
> you're
> > >>>> doing" warning is to be added. Well, it's a topic for a separate
> KIP!
> > >>>>
> > >>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities,
> > >>>> then
> > >>>> changes to the API are minimally invasive: we're just adding
> > >>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
> > >>>> that's all.
> > >>>>
> > >>>> We can now define `distinct` as an operation that returns only a
> first
> > >>>> record that falls into a new window, and filters out all the other
> > >>>> records that fall into an already existing window. BTW, we can mock
> the
> > >>>> behaviour of such an operation with `TopologyTestDriver` using
> > >>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> > >>>>
> > >>>> Consider the following example (record times are in seconds):
> > >>>>
> > >>>> //three bursts of variously ordered records
> > >>>> 4, 5, 6
> > >>>> 23, 22, 24
> > >>>> 34, 33, 32
> > >>>> //'late arrivals'
> > >>>> 7, 22, 35
> > >>>>
> > >>>>
> > >>>> 1. 'Epoch-aligned deduplication' using tumbling windows:
> > >>>>
> > >>>>
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> > >>>>
> > >>>>
> > >>>> produces
> > >>>>
> > >>>> (key@[00000/10000], 4)
> > >>>> (key@[20000/30000], 23)
> > >>>> (key@[30000/40000], 34)
> > >>>>
> > >>>> -- that is, one record per epoch-aligned window.
> > >>>>
> > >>>> 2. Hopping and sliding windows do not make much sense here, because
> > >>>> they
> > >>>> produce multiple intersected windows, so that one record can be
> > >>>> multiplied, but we want deduplication.
> > >>>>
> > >>>> 3. SessionWindows work for 'data-aligned deduplication'.
> > >>>>
> > >>>>
> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
>
> > >>>>
> > >>>>
> > >>>>
> > >>>> produces only
> > >>>>
> > >>>> ([key@4000/4000], 4)
> > >>>> ([key@23000/23000], 23)
> > >>>>
> > >>>> because all the records bigger than 7 are stuck together in one
> > >>>> session.
> > >>>> Setting inactivity gap to 9 seconds will return three records:
> > >>>>
> > >>>> ([key@4000/4000], 4)
> > >>>> ([key@23000/23000], 23)
> > >>>> ([key@34000/34000], 34)
> > >>>>
> > >>>> WDYT? If you like this variant, I will re-write KIP-655 and propose
> a
> > >>>> separate KIP for `cancelRepartitioning` (or whatever name we will
> > >>>> choose
> > >>>> for it).
> > >>>>
> > >>>> Regards,
> > >>>>
> > >>>> Ivan
> > >>>>
> > >>>>
> > >>>> 24.05.2021 22:32, John Roesler пишет:
> > >>>>> Hey there, Ivan!
> > >>>>>
> > >>>>> In typical fashion, I'm going to make a somewhat outlandish
> > >>>>> proposal. I'm hoping that we can side-step some of the
> > >>>>> complications that have arisen. Please bear with me.
> > >>>>>
> > >>>>> It seems like `distinct()` is not fundamentally unlike other
> windowed
> > >>>>> "aggregation" operations. Your concern about unnecessary
> > >>>>> repartitioning seems to apply just as well to `count()` as to
> > >>>>> `distinct()`.
> > >>>>> This has come up before, but I don't remember when: what if we
> > >>>>> introduce a parameter to `selectKey()` that specifies that the
> caller
> > >>>>> asserts that the new key does _not_ change the data partitioning?
> > >>>>> The docs on that parameter would of course spell out all the
> "rights
> > >>>>> and responsibilities" of setting it.
> > >>>>>
> > >>>>> In that case, we could indeed get back to
> > >>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose
> the
> > >>>>> key mapper and the windowing function without having to carve out
> > >>>>> a separate domain just for `distinct()`. All the rest of the
> KStream
> > >>>>> operations would also benefit.
> > >>>>>
> > >>>>> What do you think?
> > >>>>>
> > >>>>> Thanks,
> > >>>>> John
> > >>>>>
> > >>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> > >>>>>> Hello everyone,
> > >>>>>>
> > >>>>>> let me revive the discussion for KIP-655. Now I have some time
> > >>>>>> again and
> > >>>>>> I'm eager to finalize this.
> > >>>>>>
> > >>>>>> Based on what was already discussed, I think that we can split the
> > >>>>>> discussion into three topics for our convenience.
> > >>>>>>
> > >>>>>> The three topics are:
> > >>>>>>
> > >>>>>> - idExtractor  (how should we extract the deduplication key for
> > >>>>>> the record)
> > >>>>>>
> > >>>>>> - timeWindows (what time windows should we use)
> > >>>>>>
> > >>>>>> - miscellaneous (naming etc.)
> > >>>>>>
> > >>>>>> ---- idExtractor ----
> > >>>>>>
> > >>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to
> (k,
> > >>>>>> v) ->
> > >>>>>> k.  The drawback here is that we must warn the user to choose
> such a
> > >>>>>> function that sets different IDs for records from different
> > >>>>>> partitions,
> > >>>>>> otherwise same IDs might be not co-partitioned (and not
> > >>>>>> deduplicated as
> > >>>>>> a result). Additional concern: what should we do when this
> function
> > >>>>>> returns null?
> > >>>>>>
> > >>>>>> Matthias proposed key-only deduplication: that is, no idExtractor
> at
> > >>>>>> all, and if we want to use `distinct` for a particular
> identifier, we
> > >>>>>> must `selectKey()` before. The drawback of this approach is that
> > >>>>>> we will
> > >>>>>> always have repartitioning after the key selection, while in
> practice
> > >>>>>> repartitioning will not always be necessary (for example, when
> the
> > >>>>>> data
> > >>>>>> stream is such that different values infer different keys).
> > >>>>>>
> > >>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe'
> > >>>>>> variant
> > >>>>>> is also not very convenient for developers, since we're forcing
> > >>>>>> them to
> > >>>>>> change the structure of their records.
> > >>>>>>
> > >>>>>> A 'golden mean' here might be using composite ID with its first
> > >>>>>> component equals to k and its second component equals to some
> f(v) (f
> > >>>>>> defaults to v -> null, and null value returned by f(v) means
> > >>>>>> 'deduplicate by the key only'). The nuance here is that we will
> have
> > >>>>>> serializers only for types of k and f(v), and we must correctly
> > >>>>>> serialize a tuple (k, f(v)), but of course this is doable.
> > >>>>>>
> > >>>>>> What do you think?
> > >>>>>>
> > >>>>>> ---- timeWindows ----
> > >>>>>>
> > >>>>>> Originally I proposed TimeWindows only just because they solved my
> > >>>>>> particular case :-) but agree with Matthias' and Sophie's
> objections.
> > >>>>>>
> > >>>>>> I like the Sophie's point: we need both epoch-aligned and
> > >>>>>> data-aligned
> > >>>>>> windows. IMO this is absolutely correct: "data-aligned is useful
> for
> > >>>>>> example when you know that a large number of updates to a single
> key
> > >>>>>> will occur in short bursts, and epoch-aligned when you
> > >>>>>> specifically want
> > >>>>>> to get just a single update per discrete time interval."
> > >>>>>>
> > >>>>>> I just cannot agree right away with Sophie's
> > >>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies
> the
> > >>>>>> key-only deduplication -- see the previous topic.
> > >>>>>>
> > >>>>>> Epoch-aligned windows are very simple: they should forward only
> one
> > >>>>>> record per enumerated time window. TimeWindows are exactly what
> we
> > >>>>>> want
> > >>>>>> here. I mentioned in the KIP both tumbling and hopping windows
> just
> > >>>>>> because both are possible for TimeWindows, but indeed I don't see
> any
> > >>>>>> real use case for hopping windows, only tumbling windows make
> > >>>>>> sence IMO.
> > >>>>>>
> > >>>>>> For data-aligned windows SlidingWindow interface seems to be a
> nearly
> > >>>>>> valid choice. Nearly. It should forward a record once when it's
> first
> > >>>>>> seen, and then not again for any identical records that fall into
> the
> > >>>>>> next N timeUnits.  However, we cannot reuse SlidingWindow as is,
> > >>>>>> because
> > >>>>>> just as Matthias noted, SlidingWindows go backward in time, while
> we
> > >>>>>> need a windows that go forward in time, and are not opened while
> > >>>>>> records
> > >>>>>> fall into an already existing window. We definitely should make
> > >>>>>> our own
> > >>>>>> implementation, maybe we should call it ExpirationWindow? WDYT?
> > >>>>>>
> > >>>>>>
> > >>>>>> ---- miscellaneous ----
> > >>>>>>
> > >>>>>> Persistent/in-memory stores. Matthias proposed to pass
> Materialized
> > >>>>>> parameter next to DistinctParameters (and this is necessary,
> > >>>>>> because we
> > >>>>>> will need to provide a serializer for extracted id). This is
> > >>>>>> absolutely
> > >>>>>> valid point, I agree and I will fix it in the KIP.
> > >>>>>>
> > >>>>>> Naming. Sophie noted that the Streams DSL operators are typically
> > >>>>>> named
> > >>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`.
> I
> > >>>>>> think
> > >>>>>> that while it's important to stick to the naming conventions, it
> > >>>>>> is also
> > >>>>>> important to think of the experience of those who come from
> different
> > >>>>>> stacks/technologies. People who are familiar with SQL and Java
> > >>>>>> Streams
> > >>>>>> API must know for sure what does 'distinct' mean, while data
> > >>>>>> deduplication in general is a more complex task and thus
> > >>>>>> `deduplicate`
> > >>>>>> might be misleading. But I'm ready to be convinced if the majority
> > >>>>>> thinks otherwise.
> > >>>>>>
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>>
> > >>>>>> Ivan
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> > >>>>>>> Hey all,
> > >>>>>>>
> > >>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit
> all
> > >>>>>>> possible use cases.
> > >>>>>>> Both seem totally reasonable to me: data-aligned is useful for
> > >>>>>>> example when
> > >>>>>>> you know
> > >>>>>>> that a large number of updates to a single key will occur in
> > >>>>>>> short bursts,
> > >>>>>>> and epoch-
> > >>>>>>> aligned when you specifically want to get just a single update
> > >>>>>>> per discrete
> > >>>>>>> time
> > >>>>>>> interval.
> > >>>>>>>
> > >>>>>>> Going a step further, though, what if you want just a single
> > >>>>>>> update per
> > >>>>>>> calendar
> > >>>>>>> month, or per year with accounting for leap years? Neither of
> > >>>>>>> those are
> > >>>>>>> serviced that
> > >>>>>>> well by the existing Windows specification to windowed
> > >>>>>>> aggregations, a
> > >>>>>>> well-known
> > >>>>>>> limitation of the current API. There is actually a KIP
> > >>>>>>> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
>
> > >>>>>>>
> > >>>>>>> going
> > >>>>>>> on in parallel to fix this
> > >>>>>>> exact issue and make the windowing interface much more flexible.
> > >>>>>>> Maybe
> > >>>>>>> instead
> > >>>>>>> of re-implementing this windowing interface in a similarly
> > >>>>>>> limited fashion
> > >>>>>>> for the
> > >>>>>>> Distinct operator, we could leverage it here and get all the
> > >>>>>>> benefits
> > >>>>>>> coming with
> > >>>>>>> KIP-645.
> > >>>>>>>
> > >>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config
> > >>>>>>> from the
> > >>>>>>> DistinctParameters class, and move the distinct() method from
> the
> > >>>>>>> KStream
> > >>>>>>> interface
> > >>>>>>> to the TimeWindowedKStream interface. Since it's semantically
> > >>>>>>> similar to a
> > >>>>>>> kind of
> > >>>>>>> windowed aggregation, it makes sense to align it with the
> > >>>>>>> existing windowing
> > >>>>>>> framework, ie:
> > >>>>>>>
> > >>>>>>> inputStream
> > >>>>>>>        .groupKyKey()
> > >>>>>>>        .windowedBy()
> > >>>>>>>        .distinct()
> > >>>>>>>
> > >>>>>>> Then we could use data-aligned windows if SlidingWindows is
> > >>>>>>> specified in
> > >>>>>>> the
> > >>>>>>> windowedBy(), and epoch-aligned (or some other kind of
> enumerable
> > >>>>>>> window)
> > >>>>>>> if a Windows is specified in windowedBy() (or an
> > >>>>>>> EnumerableWindowDefinition
> > >>>>>>> once KIP-645 is implemented to replace Windows).
> > >>>>>>>
> > >>>>>>> *SlidingWindows*: should forward a record once when it's first
> > >>>>>>> seen, and
> > >>>>>>> then not again
> > >>>>>>> for any identical records that fall into the next N timeUnits.
> This
> > >>>>>>> includes out-of-order
> > >>>>>>> records, ie if you have a SlidingWindows of size 10s and process
> > >>>>>>> records at
> > >>>>>>> time
> > >>>>>>> 15s, 20s, 14s then you would just forward the one at 15s.
> > >>>>>>> Presumably, if
> > >>>>>>> you're
> > >>>>>>> using SlidingWindows, you don't care about what falls into exact
> > >>>>>>> time
> > >>>>>>> boxes, you just
> > >>>>>>> want to deduplicate. If you do care about exact time boxing then
> > >>>>>>> you should
> > >>>>>>> use...
> > >>>>>>>
> > >>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward
> > >>>>>>> only one
> > >>>>>>> record
> > >>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s
> > >>>>>>> where the
> > >>>>>>> windows
> > >>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the
> > >>>>>>> record at 15s
> > >>>>>>> and also
> > >>>>>>> the record at 14s
> > >>>>>>>
> > >>>>>>> Just an idea: not sure if the impedance mismatch would throw
> > >>>>>>> users off
> > >>>>>>> since the
> > >>>>>>> semantics of the distinct windows are slightly different than in
> the
> > >>>>>>> aggregations.
> > >>>>>>> But if we don't fit this into the existing windowed framework,
> > >>>>>>> then we
> > >>>>>>> shouldn't use
> > >>>>>>> any existing Windows-type classes at all, imo. ie we should
> > >>>>>>> create a new
> > >>>>>>> DistinctWindows config class, similar to how stream-stream joins
> > >>>>>>> get their
> > >>>>>>> own
> > >>>>>>> JoinWindows class
> > >>>>>>>
> > >>>>>>> I also think that non-windowed deduplication could be useful, in
> > >>>>>>> which case
> > >>>>>>> we
> > >>>>>>> would want to also have the distinct() operator on the KStream
> > >>>>>>> interface.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> One quick note regarding the naming: it seems like the Streams
> > >>>>>>> DSL operators
> > >>>>>>> are typically named as verbs rather than adjectives, for
> example.
> > >>>>>>> #suppress
> > >>>>>>> or
> > >>>>>>> #aggregate. I get that there's some precedent for  'distinct'
> > >>>>>>> specifically,
> > >>>>>>> but
> > >>>>>>> maybe something like 'deduplicate' would be more appropriate for
> > >>>>>>> the Streams
> > >>>>>>> API.
> > >>>>>>>
> > >>>>>>> WDYT?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
> > >>>>>>> <iponoma...@mail.ru.invalid>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Matthias,
> > >>>>>>>>
> > >>>>>>>> Thanks for your review! It made me think deeper, and indeed I
> > >>>>>>>> understood
> > >>>>>>>> that I was missing some important details.
> > >>>>>>>>
> > >>>>>>>> To simplify, let me explain my particular use case first so I
> > >>>>>>>> can refer
> > >>>>>>>> to it later.
> > >>>>>>>>
> > >>>>>>>> We have a system that collects information about ongoing live
> > >>>>>>>> sporting
> > >>>>>>>> events from different sources. The information sources have
> > >>>>>>>> their IDs
> > >>>>>>>> and these IDs are keys of the stream. Each source emits messages
> > >>>>>>>> concerning sporting events, and we can have many messages about
> > >>>>>>>> each
> > >>>>>>>> sporing event from each source. Event ID is extracted from the
> > >>>>>>>> message.
> > >>>>>>>>
> > >>>>>>>> We need a database of event IDs that were reported at least
> once
> > >>>>>>>> by each
> > >>>>>>>> source (important: events from different sources are considered
> > >>>>>>>> to be
> > >>>>>>>> different entities). The requirements are:
> > >>>>>>>>
> > >>>>>>>> 1) each new event ID should be written to the database as soon
> > >>>>>>>> as possible
> > >>>>>>>>
> > >>>>>>>> 2) although it's ok and sometimes even desired to repeat the
> > >>>>>>>> notification about already known event ID, but we wouldn’t like
> our
> > >>>>>>>> database to be bothered by the same event ID more often than
> > >>>>>>>> once in a
> > >>>>>>>> given period of time (say, 15 minutes).
> > >>>>>>>>
> > >>>>>>>> With this example in mind let me answer your questions
> > >>>>>>>>
> > >>>>>>>>     > (1) Using the `idExtractor` has the issue that data might
> > >>>>>>>> not be
> > >>>>>>>>     > co-partitioned as you mentioned in the KIP. Thus, I am
> > >>>>>>>> wondering if it
> > >>>>>>>>     > might be better to do deduplication only on the key? If
> > >>>>>>>> one sets a new
> > >>>>>>>>     > key upstream (ie, extracts the deduplication id into the
> > >>>>>>>> key), the
> > >>>>>>>>     > `distinct` operator could automatically repartition the
> > >>>>>>>> data and thus we
> > >>>>>>>>     > would avoid user errors.
> > >>>>>>>>
> > >>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we
> > >>>>>>>> will
> > >>>>>>>> never cause problems with co-partitioning. But in practice, we
> > >>>>>>>> often
> > >>>>>>>> don't need repartitioning even if 'dedup ID' is different from
> > >>>>>>>> the key,
> > >>>>>>>> like in my example above. So here we have a sort of
> 'performance vs
> > >>>>>>>> security' tradeoff.
> > >>>>>>>>
> > >>>>>>>> The 'golden middle way' here can be the following: we can form a
> > >>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In
> case
> > >>>>>>>> idExtractor is not provided, we deduplicate by key only (as in
> > >>>>>>>> original
> > >>>>>>>> proposal). Then idExtractor transforms only the value (and not
> > >>>>>>>> the key)
> > >>>>>>>> and its result is appended to the key. Records from different
> > >>>>>>>> partitions
> > >>>>>>>> will inherently have different deduplication IDs and all the
> > >>>>>>>> data will
> > >>>>>>>> be co-partitioned. As with any stateful operation, we will
> > >>>>>>>> repartition
> > >>>>>>>> the topic in case the key was changed upstream, but only in
> this
> > >>>>>>>> case,
> > >>>>>>>> thus avoiding unnecessary repartitioning. My example above fits
> > >>>>>>>> this
> > >>>>>>>> perfectly.
> > >>>>>>>>
> > >>>>>>>>     > (2) What is the motivation for allowing the `idExtractor`
> > >>>>>>>> to return
> > >>>>>>>>     > `null`? Might be good to have some use-case examples for
> > >>>>>>>> this feature.
> > >>>>>>>>
> > >>>>>>>> Can't think of any use-cases. As it often happens, it's just
> > >>>>>>>> came with a
> > >>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here:
> > >>>>>>>>
> > >>>>>>>>
> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> But, jokes aside, we'll have to decide what to do with nulls.
> If we
> > >>>>>>>> accept the above proposal of having deduplication ID as KEY +
> > >>>>>>>> postfix,
> > >>>>>>>> then null can be treated as no postfix at all. If we don't
> > >>>>>>>> accept this
> > >>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be
> a
> > >>>>>>>> reasonable assumption (we can't get or put null as a key to a
> KV
> > >>>>>>>> store,
> > >>>>>>>> so a record with null ID is always going to look 'new' for us).
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>     > (2) Is using a `TimeWindow` really what we want? I was
> > >>>>>>>> wondering if a
> > >>>>>>>>     > `SlidingWindow` might be better? Or maybe we need a new
> > >>>>>>>> type of window?
> > >>>>>>>>
> > >>>>>>>> Agree. It's probably not what we want. Once I thought that
> reusing
> > >>>>>>>> TimeWindow is a clever idea, now I don't.
> > >>>>>>>>
> > >>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I
> > >>>>>>>> don't
> > >>>>>>>> know if anyone going to need this. Epoch alignment is good for
> > >>>>>>>> aggregation, but deduplication is a different story.
> > >>>>>>>>
> > >>>>>>>> Let me describe the semantic the way I see it now and tell me
> > >>>>>>>> what you
> > >>>>>>>> think:
> > >>>>>>>>
> > >>>>>>>> - the only parameter that defines the deduplication logic is
> > >>>>>>>> 'expiration
> > >>>>>>>> period'
> > >>>>>>>>
> > >>>>>>>> - when a deduplication ID arrives and we cannot find it in the
> > >>>>>>>> store, we
> > >>>>>>>> forward the message downstream and store the ID + its timestamp.
> > >>>>>>>>
> > >>>>>>>> - when an out-of-order ID arrives with an older timestamp and
> we
> > >>>>>>>> find a
> > >>>>>>>> 'fresher' record, we do nothing and don't forward the message
> > >>>>>>>> (??? OR
> > >>>>>>>> NOT? In what case would we want to forward an out-of-order
> > >>>>>>>> message?)
> > >>>>>>>>
> > >>>>>>>> - when an ID with fresher timestamp arrives we check if it
> falls
> > >>>>>>>> into
> > >>>>>>>> the expiration period and either forward it or not, but in both
> > >>>>>>>> cases we
> > >>>>>>>> update the timestamp of the message in the store
> > >>>>>>>>
> > >>>>>>>> - the WindowStore retention mechanism should clean up very old
> > >>>>>>>> records
> > >>>>>>>> in order not to run out of space.
> > >>>>>>>>
> > >>>>>>>>     > (3) `isPersistent` -- instead of using this flag, it
> seems
> > >>>>>>>> better to
> > >>>>>>>>     > allow users to pass in a `Materialized` parameter next to
> > >>>>>>>>     > `DistinctParameters` to configure the state store?
> > >>>>>>>>
> > >>>>>>>> Fully agree! Users might also want to change the retention time.
> > >>>>>>>>
> > >>>>>>>>     > (4) I am wondering if we should really have 4 overloads
> for
> > >>>>>>>>     > `DistinctParameters.with()`? It might be better to have
> > >>>>>>>> one overload
> > >>>>>>>>     > with all require parameters, and add optional parameters
> > >>>>>>>> using the
> > >>>>>>>>     > builder pattern? This seems to follow the DSL Grammer
> > >>>>>>>> proposal.
> > >>>>>>>>
> > >>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern
> > >>>>>>>> because of
> > >>>>>>>> Java type inference limitations. We have to provide type
> > >>>>>>>> parameters to
> > >>>>>>>> the builder methods or the code won't compile: see e. g. this
> > >>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and
> > >>>>>>>> following
> > >>>>>>>> discussion with Tagir Valeev.
> > >>>>>>>>
> > >>>>>>>> When we came across the similar difficulties in KIP-418, we
> finally
> > >>>>>>>> decided to add all the necessary overloads to parameter class.
> > >>>>>>>> So I just
> > >>>>>>>> reproduced that approach here.
> > >>>>>>>>
> > >>>>>>>>     > (5) Even if it might be an implementation detail (and
> > >>>>>>>> maybe the KIP
> > >>>>>>>>     > itself does not need to mention it), can you give a high
> > >>>>>>>> level overview
> > >>>>>>>>     > how you intent to implement it (that would be easier to
> > >>>>>>>> grog, compared
> > >>>>>>>>     > to reading the PR).
> > >>>>>>>>
> > >>>>>>>> Well as with any operation on KStreamImpl level I'm building a
> > >>>>>>>> store and
> > >>>>>>>> a processor node.
> > >>>>>>>>
> > >>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier,
> with
> > >>>>>>>> the
> > >>>>>>>> logic regarding the forwarding/muting of the records located in
> > >>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process
> > >>>>>>>>
> > >>>>>>>> ----
> > >>>>>>>>
> > >>>>>>>> Matthias, if you are still reading this :-) a gentle reminder:
> > >>>>>>>> my PR for
> > >>>>>>>> already accepted KIP-418 is still waiting for your review. I
> > >>>>>>>> think it's
> > >>>>>>>> better for me to finalize at least one  KIP before proceeding
> to
> > >>>>>>>> a new
> > >>>>>>>> one :-)
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>>
> > >>>>>>>> Ivan
> > >>>>>>>>
> > >>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет:
> > >>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication
> > >>>>>>>>> operator is for
> > >>>>>>>>> sure a good addition.
> > >>>>>>>>>
> > >>>>>>>>> Couple of questions:
> > >>>>>>>>>
> > >>>>>>>>> (1) Using the `idExtractor` has the issue that data might not
> be
> > >>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am
> > >>>>>>>>> wondering if it
> > >>>>>>>>> might be better to do deduplication only on the key? If one
> > >>>>>>>>> sets a new
> > >>>>>>>>> key upstream (ie, extracts the deduplication id into the key),
> the
> > >>>>>>>>> `distinct` operator could automatically repartition the data
> > >>>>>>>>> and thus we
> > >>>>>>>>> would avoid user errors.
> > >>>>>>>>>
> > >>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to
> > >>>>>>>>> return
> > >>>>>>>>> `null`? Might be good to have some use-case examples for this
> > >>>>>>>>> feature.
> > >>>>>>>>>
> > >>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was
> > >>>>>>>>> wondering if a
> > >>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type
> of
> > >>>>>>>>> window?
> > >>>>>>>>>
> > >>>>>>>>> It would be helpful if you could describe potential use cases
> > >>>>>>>>> in more
> > >>>>>>>>> detail. -- I am mainly wondering about hopping window? Each
> > >>>>>>>>> record would
> > >>>>>>>>> always falls into multiple window and thus would be emitted
> > >>>>>>>>> multiple
> > >>>>>>>>> times, ie, each time the window closes. Is this really a valid
> > >>>>>>>>> use case?
> > >>>>>>>>>
> > >>>>>>>>> It seems that for de-duplication, one wants to have some
> > >>>>>>>>> "expiration
> > >>>>>>>>> time", ie, for each ID, deduplicate all consecutive records
> > >>>>>>>>> with the
> > >>>>>>>>> same ID and emit the first record after the "expiration time"
> > >>>>>>>>> passed. In
> > >>>>>>>>> terms of a window, this would mean that the window starts at
> > >>>>>>>>> `r.ts` and
> > >>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the
> > >>>>>>>>> data.
> > >>>>>>>>> TimeWindows are aligned to the epoch though. While
> > >>>>>>>>> `SlidingWindows` also
> > >>>>>>>>> align to the data, for the aggregation use-case they go
> > >>>>>>>>> backward in
> > >>>>>>>>> time, while we need a window that goes forward in time. It's
> an
> > >>>>>>>>> open
> > >>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be
> > >>>>>>>>> ok the
> > >>>>>>>>> make the alignment (into the past vs into the future) an
> operator
> > >>>>>>>>> dependent behavior?
> > >>>>>>>>>
> > >>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems
> > >>>>>>>>> better to
> > >>>>>>>>> allow users to pass in a `Materialized` parameter next to
> > >>>>>>>>> `DistinctParameters` to configure the state store?
> > >>>>>>>>>
> > >>>>>>>>> (4) I am wondering if we should really have 4 overloads for
> > >>>>>>>>> `DistinctParameters.with()`? It might be better to have one
> > >>>>>>>>> overload
> > >>>>>>>>> with all require parameters, and add optional parameters using
> the
> > >>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal.
> > >>>>>>>>>
> > >>>>>>>>> (5) Even if it might be an implementation detail (and maybe
> the
> > >>>>>>>>> KIP
> > >>>>>>>>> itself does not need to mention it), can you give a high level
> > >>>>>>>>> overview
> > >>>>>>>>> how you intent to implement it (that would be easier to grog,
> > >>>>>>>>> compared
> > >>>>>>>>> to reading the PR).
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -Matthias
> > >>>>>>>>>
> > >>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> > >>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
> > >>>>>>>>>>
> > >>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
> > >>>>>>>>>>> Hello,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I'd like to start a discussion for KIP-655.
> > >>>>>>>>>>>
> > >>>>>>>>>>> KIP-655:
> > >>>>>>>>>>>
> > >>>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > >>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment
> > >>>>>>>>>>> with the API:
> > >>>>>>>>>>>
> > >>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
> > >>>>>>>>>>>
> > >>>>>>>>>>> Regards,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Ivan Ponomarev
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >
> >
> >
>

Reply via email to