Hi all,

Bruno raised some very good points. I’d like to chime in with additional 
context. 

1. Great point. We faced a similar problem defining KIP-557. For 557, we chose 
to use the serialized byte array instead of the equals() method, but I think 
the situation in KIP-655 is a bit different. I think it might make sense to use 
the equals() method here, but am curious what Ivan thinks.

2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't 
make a ton of sense to use it, which I agree with, but it doesn't seem like 
that means we should prohibit it.

3. FWIW, I don't have a strong feeling either way.

Thanks,
-John

On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:
> Hi Ivan,
> 
> Thank you for the KIP!
> 
> Some aspects are not clear to me from the KIP and I have a proposal.
> 
> 1. The KIP does not describe the criteria that define a duplicate. Could 
> you add a definition of duplicate to the KIP?
> 
> 2. The KIP does not describe what happens if distinct() is applied on a 
> hopping window. On the DSL level, I do not see how you can avoid that 
> users apply distinct() on a hopping window, i.e., you cannot avoid it at 
> compile time, you need to check it at runtime and throw an exception. Is 
> this correct or am I missing something?
> 
> 3. I would also like to back a proposal by Sophie. She proposed to use 
> deduplicate() instead of distinct(), since the other DSL operations are 
> also verbs. I do not think that SQL and the Java Stream API are good 
> arguments to not use a verb.
> 
> Best,
> Bruno
> 
> 
> On 10.07.21 19:11, John Roesler wrote:
> > Hi Ivan,
> > 
> > Sorry for the silence!
> > 
> > I have just re-read the proposal.
> > 
> > To summarize, you are now only proposing the zero-arg distict() method to 
> > be added to TimeWindowedKStream and SessionWindowedKStream, right?
> > 
> > I’m in favor of this proposal.
> > 
> > Thanks,
> > John
> > 
> > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
> >> Hello everyone,
> >>
> >> I would like to remind you about KIP-655 and KIP-759 just in case they
> >> got lost in your inbox.
> >>
> >> Now the initial proposal is split into two independent and smaller ones,
> >> so it must be easier to review them. Of course, if you have time.
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >>
> >> 24.06.2021 18:11, Ivan Ponomarev пишет:
> >>> Hello all,
> >>>
> >>> I have rewritten the KIP-655 summarizing what was agreed upon during
> >>> this discussion (now the proposal is much simpler and less invasive).
> >>>
> >>> I have also created KIP-759 (cancelRepartition operation) and started a
> >>> discussion for it.
> >>>
> >>> Regards,
> >>>
> >>> Ivan.
> >>>
> >>>
> >>>
> >>> 04.06.2021 8:15, Matthias J. Sax пишет:
> >>>> Just skimmed over the thread -- first of all, I am glad that we could
> >>>> merge KIP-418 and ship it :)
> >>>>
> >>>> About the re-partitioning concerns, there are already two tickets for it:
> >>>>
> >>>>    - https://issues.apache.org/jira/browse/KAFKA-4835
> >>>>    - https://issues.apache.org/jira/browse/KAFKA-10844
> >>>>
> >>>> Thus, it seems best to exclude this topic from this KIP, and do a
> >>>> separate KIP for it (if necessary, we can "pause" this KIP until the
> >>>> repartition KIP is done). It's a long standing "issue" and we should
> >>>> resolve it in a general way I guess.
> >>>>
> >>>> (Did not yet ready all responses in detail yet, so keeping this comment
> >>>> short.)
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 6/2/21 6:35 AM, John Roesler wrote:
> >>>>> Thanks, Ivan!
> >>>>>
> >>>>> That sounds like a great plan to me. Two smaller KIPs are easier to
> >>>>> agree on than one big one.
> >>>>>
> >>>>> I agree hopping and sliding windows will actually have a duplicating
> >>>>> effect. We can avoid adding distinct() to the sliding window
> >>>>> interface, but hopping windows are just a different parameterization
> >>>>> of epoch-aligned windows. It seems we can’t do much about that except
> >>>>> document the issue.
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> >>>>>> Hi John!
> >>>>>>
> >>>>>> I think that your proposal is just fantastic, it simplifies things a
> >>>>>> lot!
> >>>>>>
> >>>>>> I also felt uncomfortable due to the fact that the proposed
> >>>>>> `distinct()`
> >>>>>> is not somewhere near `count()` and `reduce(..)`. But
> >>>>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
> >>>>>> a correct option for  me because of the issue with the unneeded
> >>>>>> repartitioning.
> >>>>>>
> >>>>>> The bold idea that we can just CANCEL the repartitioning didn't came to
> >>>>>> my mind.
> >>>>>>
> >>>>>> What seemed to me a single problem is in fact two unrelated problems:
> >>>>>> `distinct` operation and cancelling the unneeded repartitioning.
> >>>>>>
> >>>>>>    > what if we introduce a parameter to `selectKey()` that specifies
> >>>>>> that
> >>>>>> the caller asserts that the new key does _not_ change the data
> >>>>>> partitioning?
> >>>>>>
> >>>>>> I think a more elegant solution would be not to add a new parameter to
> >>>>>> `selectKey` and all the other key-changing operations (`map`,
> >>>>>> `transform`, `flatMap`, ...), but add a new operator
> >>>>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation`
> >>>>>> flag
> >>>>>> for the upstream node. Of course, "use it only if you know what you're
> >>>>>> doing" warning is to be added. Well, it's a topic for a separate KIP!
> >>>>>>
> >>>>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities,
> >>>>>> then
> >>>>>> changes to the API are minimally invasive: we're just adding
> >>>>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
> >>>>>> that's all.
> >>>>>>
> >>>>>> We can now define `distinct` as an operation that returns only a first
> >>>>>> record that falls into a new window, and filters out all the other
> >>>>>> records that fall into an already existing window. BTW, we can mock the
> >>>>>> behaviour of such an operation with `TopologyTestDriver` using
> >>>>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> >>>>>>
> >>>>>> Consider the following example (record times are in seconds):
> >>>>>>
> >>>>>> //three bursts of variously ordered records
> >>>>>> 4, 5, 6
> >>>>>> 23, 22, 24
> >>>>>> 34, 33, 32
> >>>>>> //'late arrivals'
> >>>>>> 7, 22, 35
> >>>>>>
> >>>>>>
> >>>>>> 1. 'Epoch-aligned deduplication' using tumbling windows:
> >>>>>>
> >>>>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> >>>>>>
> >>>>>>
> >>>>>> produces
> >>>>>>
> >>>>>> (key@[00000/10000], 4)
> >>>>>> (key@[20000/30000], 23)
> >>>>>> (key@[30000/40000], 34)
> >>>>>>
> >>>>>> -- that is, one record per epoch-aligned window.
> >>>>>>
> >>>>>> 2. Hopping and sliding windows do not make much sense here, because
> >>>>>> they
> >>>>>> produce multiple intersected windows, so that one record can be
> >>>>>> multiplied, but we want deduplication.
> >>>>>>
> >>>>>> 3. SessionWindows work for 'data-aligned deduplication'.
> >>>>>>
> >>>>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> produces only
> >>>>>>
> >>>>>> ([key@4000/4000], 4)
> >>>>>> ([key@23000/23000], 23)
> >>>>>>
> >>>>>> because all the records bigger than 7 are stuck together in one
> >>>>>> session.
> >>>>>> Setting inactivity gap to 9 seconds will return three records:
> >>>>>>
> >>>>>> ([key@4000/4000], 4)
> >>>>>> ([key@23000/23000], 23)
> >>>>>> ([key@34000/34000], 34)
> >>>>>>
> >>>>>> WDYT? If you like this variant, I will re-write KIP-655 and propose a
> >>>>>> separate KIP for `cancelRepartitioning` (or whatever name we will
> >>>>>> choose
> >>>>>> for it).
> >>>>>>
> >>>>>> Regards,
> >>>>>>
> >>>>>> Ivan
> >>>>>>
> >>>>>>
> >>>>>> 24.05.2021 22:32, John Roesler пишет:
> >>>>>>> Hey there, Ivan!
> >>>>>>>
> >>>>>>> In typical fashion, I'm going to make a somewhat outlandish
> >>>>>>> proposal. I'm hoping that we can side-step some of the
> >>>>>>> complications that have arisen. Please bear with me.
> >>>>>>>
> >>>>>>> It seems like `distinct()` is not fundamentally unlike other windowed
> >>>>>>> "aggregation" operations. Your concern about unnecessary
> >>>>>>> repartitioning seems to apply just as well to `count()` as to
> >>>>>>> `distinct()`.
> >>>>>>> This has come up before, but I don't remember when: what if we
> >>>>>>> introduce a parameter to `selectKey()` that specifies that the caller
> >>>>>>> asserts that the new key does _not_ change the data partitioning?
> >>>>>>> The docs on that parameter would of course spell out all the "rights
> >>>>>>> and responsibilities" of setting it.
> >>>>>>>
> >>>>>>> In that case, we could indeed get back to
> >>>>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the
> >>>>>>> key mapper and the windowing function without having to carve out
> >>>>>>> a separate domain just for `distinct()`. All the rest of the KStream
> >>>>>>> operations would also benefit.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> John
> >>>>>>>
> >>>>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> >>>>>>>> Hello everyone,
> >>>>>>>>
> >>>>>>>> let me revive the discussion for KIP-655. Now I have some time
> >>>>>>>> again and
> >>>>>>>> I'm eager to finalize this.
> >>>>>>>>
> >>>>>>>> Based on what was already discussed, I think that we can split the
> >>>>>>>> discussion into three topics for our convenience.
> >>>>>>>>
> >>>>>>>> The three topics are:
> >>>>>>>>
> >>>>>>>> - idExtractor  (how should we extract the deduplication key for
> >>>>>>>> the record)
> >>>>>>>>
> >>>>>>>> - timeWindows (what time windows should we use)
> >>>>>>>>
> >>>>>>>> - miscellaneous (naming etc.)
> >>>>>>>>
> >>>>>>>> ---- idExtractor ----
> >>>>>>>>
> >>>>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k,
> >>>>>>>> v) ->
> >>>>>>>> k.  The drawback here is that we must warn the user to choose such a
> >>>>>>>> function that sets different IDs for records from different
> >>>>>>>> partitions,
> >>>>>>>> otherwise same IDs might be not co-partitioned (and not
> >>>>>>>> deduplicated as
> >>>>>>>> a result). Additional concern: what should we do when this function
> >>>>>>>> returns null?
> >>>>>>>>
> >>>>>>>> Matthias proposed key-only deduplication: that is, no idExtractor at
> >>>>>>>> all, and if we want to use `distinct` for a particular identifier, we
> >>>>>>>> must `selectKey()` before. The drawback of this approach is that
> >>>>>>>> we will
> >>>>>>>> always have repartitioning after the key selection, while in practice
> >>>>>>>> repartitioning will not always be necessary (for example, when the
> >>>>>>>> data
> >>>>>>>> stream is such that different values infer different keys).
> >>>>>>>>
> >>>>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe'
> >>>>>>>> variant
> >>>>>>>> is also not very convenient for developers, since we're forcing
> >>>>>>>> them to
> >>>>>>>> change the structure of their records.
> >>>>>>>>
> >>>>>>>> A 'golden mean' here might be using composite ID with its first
> >>>>>>>> component equals to k and its second component equals to some f(v) (f
> >>>>>>>> defaults to v -> null, and null value returned by f(v) means
> >>>>>>>> 'deduplicate by the key only'). The nuance here is that we will have
> >>>>>>>> serializers only for types of k and f(v), and we must correctly
> >>>>>>>> serialize a tuple (k, f(v)), but of course this is doable.
> >>>>>>>>
> >>>>>>>> What do you think?
> >>>>>>>>
> >>>>>>>> ---- timeWindows ----
> >>>>>>>>
> >>>>>>>> Originally I proposed TimeWindows only just because they solved my
> >>>>>>>> particular case :-) but agree with Matthias' and Sophie's objections.
> >>>>>>>>
> >>>>>>>> I like the Sophie's point: we need both epoch-aligned and
> >>>>>>>> data-aligned
> >>>>>>>> windows. IMO this is absolutely correct: "data-aligned is useful for
> >>>>>>>> example when you know that a large number of updates to a single key
> >>>>>>>> will occur in short bursts, and epoch-aligned when you
> >>>>>>>> specifically want
> >>>>>>>> to get just a single update per discrete time interval."
> >>>>>>>>
> >>>>>>>> I just cannot agree right away with Sophie's
> >>>>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies  the
> >>>>>>>> key-only deduplication -- see the previous topic.
> >>>>>>>>
> >>>>>>>> Epoch-aligned windows are very simple: they should forward only one
> >>>>>>>> record per enumerated time window. TimeWindows are exactly what we
> >>>>>>>> want
> >>>>>>>> here. I mentioned in the KIP both tumbling and hopping windows just
> >>>>>>>> because both are possible for TimeWindows, but indeed I don't see any
> >>>>>>>> real use case for hopping windows, only tumbling windows make
> >>>>>>>> sence IMO.
> >>>>>>>>
> >>>>>>>> For data-aligned windows SlidingWindow interface seems to be a nearly
> >>>>>>>> valid choice. Nearly. It should forward a record once when it's first
> >>>>>>>> seen, and then not again for any identical records that fall into the
> >>>>>>>> next N timeUnits.  However, we cannot reuse SlidingWindow as is,
> >>>>>>>> because
> >>>>>>>> just as Matthias noted, SlidingWindows go backward in time, while we
> >>>>>>>> need a windows that go forward in time, and are not opened while
> >>>>>>>> records
> >>>>>>>> fall into an already existing window. We definitely should make
> >>>>>>>> our own
> >>>>>>>> implementation, maybe we should call it ExpirationWindow? WDYT?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> ---- miscellaneous ----
> >>>>>>>>
> >>>>>>>> Persistent/in-memory stores. Matthias proposed to pass Materialized
> >>>>>>>> parameter next to DistinctParameters (and this is necessary,
> >>>>>>>> because we
> >>>>>>>> will need to provide a serializer for extracted id). This is
> >>>>>>>> absolutely
> >>>>>>>> valid point, I agree and I will fix it in the KIP.
> >>>>>>>>
> >>>>>>>> Naming. Sophie noted that the Streams DSL operators are typically
> >>>>>>>> named
> >>>>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I
> >>>>>>>> think
> >>>>>>>> that while it's important to stick to the naming conventions, it
> >>>>>>>> is also
> >>>>>>>> important to think of the experience of those who come from different
> >>>>>>>> stacks/technologies. People who are familiar with SQL and Java
> >>>>>>>> Streams
> >>>>>>>> API must know for sure what does 'distinct' mean, while data
> >>>>>>>> deduplication in general is a more complex task and thus
> >>>>>>>> `deduplicate`
> >>>>>>>> might be misleading. But I'm ready to be convinced if the majority
> >>>>>>>> thinks otherwise.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>>
> >>>>>>>> Ivan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> >>>>>>>>> Hey all,
> >>>>>>>>>
> >>>>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit all
> >>>>>>>>> possible use cases.
> >>>>>>>>> Both seem totally reasonable to me: data-aligned is useful for
> >>>>>>>>> example when
> >>>>>>>>> you know
> >>>>>>>>> that a large number of updates to a single key will occur in
> >>>>>>>>> short bursts,
> >>>>>>>>> and epoch-
> >>>>>>>>> aligned when you specifically want to get just a single update
> >>>>>>>>> per discrete
> >>>>>>>>> time
> >>>>>>>>> interval.
> >>>>>>>>>
> >>>>>>>>> Going a step further, though, what if you want just a single
> >>>>>>>>> update per
> >>>>>>>>> calendar
> >>>>>>>>> month, or per year with accounting for leap years? Neither of
> >>>>>>>>> those are
> >>>>>>>>> serviced that
> >>>>>>>>> well by the existing Windows specification to windowed
> >>>>>>>>> aggregations, a
> >>>>>>>>> well-known
> >>>>>>>>> limitation of the current API. There is actually a KIP
> >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
> >>>>>>>>>
> >>>>>>>>> going
> >>>>>>>>> on in parallel to fix this
> >>>>>>>>> exact issue and make the windowing interface much more flexible.
> >>>>>>>>> Maybe
> >>>>>>>>> instead
> >>>>>>>>> of re-implementing this windowing interface in a similarly
> >>>>>>>>> limited fashion
> >>>>>>>>> for the
> >>>>>>>>> Distinct operator, we could leverage it here and get all the
> >>>>>>>>> benefits
> >>>>>>>>> coming with
> >>>>>>>>> KIP-645.
> >>>>>>>>>
> >>>>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config
> >>>>>>>>> from the
> >>>>>>>>> DistinctParameters class, and move the distinct() method from the
> >>>>>>>>> KStream
> >>>>>>>>> interface
> >>>>>>>>> to the TimeWindowedKStream interface. Since it's semantically
> >>>>>>>>> similar to a
> >>>>>>>>> kind of
> >>>>>>>>> windowed aggregation, it makes sense to align it with the
> >>>>>>>>> existing windowing
> >>>>>>>>> framework, ie:
> >>>>>>>>>
> >>>>>>>>> inputStream
> >>>>>>>>>         .groupKyKey()
> >>>>>>>>>         .windowedBy()
> >>>>>>>>>         .distinct()
> >>>>>>>>>
> >>>>>>>>> Then we could use data-aligned windows if SlidingWindows is
> >>>>>>>>> specified in
> >>>>>>>>> the
> >>>>>>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable
> >>>>>>>>> window)
> >>>>>>>>> if a Windows is specified in windowedBy() (or an
> >>>>>>>>> EnumerableWindowDefinition
> >>>>>>>>> once KIP-645 is implemented to replace Windows).
> >>>>>>>>>
> >>>>>>>>> *SlidingWindows*: should forward a record once when it's first
> >>>>>>>>> seen, and
> >>>>>>>>> then not again
> >>>>>>>>> for any identical records that fall into the next N timeUnits. This
> >>>>>>>>> includes out-of-order
> >>>>>>>>> records, ie if you have a SlidingWindows of size 10s and process
> >>>>>>>>> records at
> >>>>>>>>> time
> >>>>>>>>> 15s, 20s, 14s then you would just forward the one at 15s.
> >>>>>>>>> Presumably, if
> >>>>>>>>> you're
> >>>>>>>>> using SlidingWindows, you don't care about what falls into exact
> >>>>>>>>> time
> >>>>>>>>> boxes, you just
> >>>>>>>>> want to deduplicate. If you do care about exact time boxing then
> >>>>>>>>> you should
> >>>>>>>>> use...
> >>>>>>>>>
> >>>>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward
> >>>>>>>>> only one
> >>>>>>>>> record
> >>>>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s
> >>>>>>>>> where the
> >>>>>>>>> windows
> >>>>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the
> >>>>>>>>> record at 15s
> >>>>>>>>> and also
> >>>>>>>>> the record at 14s
> >>>>>>>>>
> >>>>>>>>> Just an idea: not sure if the impedance mismatch would throw
> >>>>>>>>> users off
> >>>>>>>>> since the
> >>>>>>>>> semantics of the distinct windows are slightly different than in the
> >>>>>>>>> aggregations.
> >>>>>>>>> But if we don't fit this into the existing windowed framework,
> >>>>>>>>> then we
> >>>>>>>>> shouldn't use
> >>>>>>>>> any existing Windows-type classes at all, imo. ie we should
> >>>>>>>>> create a new
> >>>>>>>>> DistinctWindows config class, similar to how stream-stream joins
> >>>>>>>>> get their
> >>>>>>>>> own
> >>>>>>>>> JoinWindows class
> >>>>>>>>>
> >>>>>>>>> I also think that non-windowed deduplication could be useful, in
> >>>>>>>>> which case
> >>>>>>>>> we
> >>>>>>>>> would want to also have the distinct() operator on the KStream
> >>>>>>>>> interface.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> One quick note regarding the naming: it seems like the Streams
> >>>>>>>>> DSL operators
> >>>>>>>>> are typically named as verbs rather than adjectives, for example.
> >>>>>>>>> #suppress
> >>>>>>>>> or
> >>>>>>>>> #aggregate. I get that there's some precedent for  'distinct'
> >>>>>>>>> specifically,
> >>>>>>>>> but
> >>>>>>>>> maybe something like 'deduplicate' would be more appropriate for
> >>>>>>>>> the Streams
> >>>>>>>>> API.
> >>>>>>>>>
> >>>>>>>>> WDYT?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
> >>>>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Matthias,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your review! It made me think deeper, and indeed I
> >>>>>>>>>> understood
> >>>>>>>>>> that I was missing some important details.
> >>>>>>>>>>
> >>>>>>>>>> To simplify, let me explain my particular use case first so I
> >>>>>>>>>> can refer
> >>>>>>>>>> to it later.
> >>>>>>>>>>
> >>>>>>>>>> We have a system that collects information about ongoing live
> >>>>>>>>>> sporting
> >>>>>>>>>> events from different sources. The information sources have
> >>>>>>>>>> their IDs
> >>>>>>>>>> and these IDs are keys of the stream. Each source emits messages
> >>>>>>>>>> concerning sporting events, and we can have many messages about
> >>>>>>>>>> each
> >>>>>>>>>> sporing event from each source. Event ID is extracted from the
> >>>>>>>>>> message.
> >>>>>>>>>>
> >>>>>>>>>> We need a database of event IDs that were reported at least once
> >>>>>>>>>> by each
> >>>>>>>>>> source (important: events from different sources are considered
> >>>>>>>>>> to be
> >>>>>>>>>> different entities). The requirements are:
> >>>>>>>>>>
> >>>>>>>>>> 1) each new event ID should be written to the database as soon
> >>>>>>>>>> as possible
> >>>>>>>>>>
> >>>>>>>>>> 2) although it's ok and sometimes even desired to repeat the
> >>>>>>>>>> notification about already known event ID, but we wouldn’t like our
> >>>>>>>>>> database to be bothered by the same event ID more often than
> >>>>>>>>>> once in a
> >>>>>>>>>> given period of time (say, 15 minutes).
> >>>>>>>>>>
> >>>>>>>>>> With this example in mind let me answer your questions
> >>>>>>>>>>
> >>>>>>>>>>      > (1) Using the `idExtractor` has the issue that data might
> >>>>>>>>>> not be
> >>>>>>>>>>      > co-partitioned as you mentioned in the KIP. Thus, I am
> >>>>>>>>>> wondering if it
> >>>>>>>>>>      > might be better to do deduplication only on the key? If
> >>>>>>>>>> one sets a new
> >>>>>>>>>>      > key upstream (ie, extracts the deduplication id into the
> >>>>>>>>>> key), the
> >>>>>>>>>>      > `distinct` operator could automatically repartition the
> >>>>>>>>>> data and thus we
> >>>>>>>>>>      > would avoid user errors.
> >>>>>>>>>>
> >>>>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we
> >>>>>>>>>> will
> >>>>>>>>>> never cause problems with co-partitioning. But in practice, we
> >>>>>>>>>> often
> >>>>>>>>>> don't need repartitioning even if 'dedup ID' is different from
> >>>>>>>>>> the key,
> >>>>>>>>>> like in my example above. So here we have a sort of 'performance vs
> >>>>>>>>>> security' tradeoff.
> >>>>>>>>>>
> >>>>>>>>>> The 'golden middle way' here can be the following: we can form a
> >>>>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case
> >>>>>>>>>> idExtractor is not provided, we deduplicate by key only (as in
> >>>>>>>>>> original
> >>>>>>>>>> proposal). Then idExtractor transforms only the value (and not
> >>>>>>>>>> the key)
> >>>>>>>>>> and its result is appended to the key. Records from different
> >>>>>>>>>> partitions
> >>>>>>>>>> will inherently have different deduplication IDs and all the
> >>>>>>>>>> data will
> >>>>>>>>>> be co-partitioned. As with any stateful operation, we will
> >>>>>>>>>> repartition
> >>>>>>>>>> the topic in case the key was changed upstream, but only in this
> >>>>>>>>>> case,
> >>>>>>>>>> thus avoiding unnecessary repartitioning. My example above fits
> >>>>>>>>>> this
> >>>>>>>>>> perfectly.
> >>>>>>>>>>
> >>>>>>>>>>      > (2) What is the motivation for allowing the `idExtractor`
> >>>>>>>>>> to return
> >>>>>>>>>>      > `null`? Might be good to have some use-case examples for
> >>>>>>>>>> this feature.
> >>>>>>>>>>
> >>>>>>>>>> Can't think of any use-cases. As it often happens, it's just
> >>>>>>>>>> came with a
> >>>>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here:
> >>>>>>>>>>
> >>>>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we
> >>>>>>>>>> accept the above proposal of having deduplication ID as KEY +
> >>>>>>>>>> postfix,
> >>>>>>>>>> then null can be treated as no postfix at all. If we don't
> >>>>>>>>>> accept this
> >>>>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be a
> >>>>>>>>>> reasonable assumption (we can't get or put null as a key to a KV
> >>>>>>>>>> store,
> >>>>>>>>>> so a record with null ID is always going to look 'new' for us).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>      > (2) Is using a `TimeWindow` really what we want? I was
> >>>>>>>>>> wondering if a
> >>>>>>>>>>      > `SlidingWindow` might be better? Or maybe we need a new
> >>>>>>>>>> type of window?
> >>>>>>>>>>
> >>>>>>>>>> Agree. It's probably not what we want. Once I thought that reusing
> >>>>>>>>>> TimeWindow is a clever idea, now I don't.
> >>>>>>>>>>
> >>>>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I
> >>>>>>>>>> don't
> >>>>>>>>>> know if anyone going to need this. Epoch alignment is good for
> >>>>>>>>>> aggregation, but deduplication is a different story.
> >>>>>>>>>>
> >>>>>>>>>> Let me describe the semantic the way I see it now and tell me
> >>>>>>>>>> what you
> >>>>>>>>>> think:
> >>>>>>>>>>
> >>>>>>>>>> - the only parameter that defines the deduplication logic is
> >>>>>>>>>> 'expiration
> >>>>>>>>>> period'
> >>>>>>>>>>
> >>>>>>>>>> - when a deduplication ID arrives and we cannot find it in the
> >>>>>>>>>> store, we
> >>>>>>>>>> forward the message downstream and store the ID + its timestamp.
> >>>>>>>>>>
> >>>>>>>>>> - when an out-of-order ID arrives with an older timestamp and we
> >>>>>>>>>> find a
> >>>>>>>>>> 'fresher' record, we do nothing and don't forward the message
> >>>>>>>>>> (??? OR
> >>>>>>>>>> NOT? In what case would we want to forward an out-of-order
> >>>>>>>>>> message?)
> >>>>>>>>>>
> >>>>>>>>>> - when an ID with fresher timestamp arrives we check if it falls
> >>>>>>>>>> into
> >>>>>>>>>> the expiration period and either forward it or not, but in both
> >>>>>>>>>> cases we
> >>>>>>>>>> update the timestamp of the message in the store
> >>>>>>>>>>
> >>>>>>>>>> - the WindowStore retention mechanism should clean up very old
> >>>>>>>>>> records
> >>>>>>>>>> in order not to run out of space.
> >>>>>>>>>>
> >>>>>>>>>>      > (3) `isPersistent` -- instead of using this flag, it seems
> >>>>>>>>>> better to
> >>>>>>>>>>      > allow users to pass in a `Materialized` parameter next to
> >>>>>>>>>>      > `DistinctParameters` to configure the state store?
> >>>>>>>>>>
> >>>>>>>>>> Fully agree! Users might also want to change the retention time.
> >>>>>>>>>>
> >>>>>>>>>>      > (4) I am wondering if we should really have 4 overloads for
> >>>>>>>>>>      > `DistinctParameters.with()`? It might be better to have
> >>>>>>>>>> one overload
> >>>>>>>>>>      > with all require parameters, and add optional parameters
> >>>>>>>>>> using the
> >>>>>>>>>>      > builder pattern? This seems to follow the DSL Grammer
> >>>>>>>>>> proposal.
> >>>>>>>>>>
> >>>>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern
> >>>>>>>>>> because of
> >>>>>>>>>> Java type inference limitations. We have to provide type
> >>>>>>>>>> parameters to
> >>>>>>>>>> the builder methods or the code won't compile: see e. g. this
> >>>>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and
> >>>>>>>>>> following
> >>>>>>>>>> discussion with Tagir Valeev.
> >>>>>>>>>>
> >>>>>>>>>> When we came across the similar difficulties in KIP-418, we finally
> >>>>>>>>>> decided to add all the necessary overloads to parameter class.
> >>>>>>>>>> So I just
> >>>>>>>>>> reproduced that approach here.
> >>>>>>>>>>
> >>>>>>>>>>      > (5) Even if it might be an implementation detail (and
> >>>>>>>>>> maybe the KIP
> >>>>>>>>>>      > itself does not need to mention it), can you give a high
> >>>>>>>>>> level overview
> >>>>>>>>>>      > how you intent to implement it (that would be easier to
> >>>>>>>>>> grog, compared
> >>>>>>>>>>      > to reading the PR).
> >>>>>>>>>>
> >>>>>>>>>> Well as with any operation on KStreamImpl level I'm building a
> >>>>>>>>>> store and
> >>>>>>>>>> a processor node.
> >>>>>>>>>>
> >>>>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with
> >>>>>>>>>> the
> >>>>>>>>>> logic regarding the forwarding/muting of the records located in
> >>>>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process
> >>>>>>>>>>
> >>>>>>>>>> ----
> >>>>>>>>>>
> >>>>>>>>>> Matthias, if you are still reading this :-) a gentle reminder:
> >>>>>>>>>> my PR for
> >>>>>>>>>> already accepted KIP-418 is still waiting for your review. I
> >>>>>>>>>> think it's
> >>>>>>>>>> better for me to finalize at least one  KIP before proceeding to
> >>>>>>>>>> a new
> >>>>>>>>>> one :-)
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>>
> >>>>>>>>>> Ivan
> >>>>>>>>>>
> >>>>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет:
> >>>>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication
> >>>>>>>>>>> operator is for
> >>>>>>>>>>> sure a good addition.
> >>>>>>>>>>>
> >>>>>>>>>>> Couple of questions:
> >>>>>>>>>>>
> >>>>>>>>>>> (1) Using the `idExtractor` has the issue that data might not be
> >>>>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am
> >>>>>>>>>>> wondering if it
> >>>>>>>>>>> might be better to do deduplication only on the key? If one
> >>>>>>>>>>> sets a new
> >>>>>>>>>>> key upstream (ie, extracts the deduplication id into the key), the
> >>>>>>>>>>> `distinct` operator could automatically repartition the data
> >>>>>>>>>>> and thus we
> >>>>>>>>>>> would avoid user errors.
> >>>>>>>>>>>
> >>>>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to
> >>>>>>>>>>> return
> >>>>>>>>>>> `null`? Might be good to have some use-case examples for this
> >>>>>>>>>>> feature.
> >>>>>>>>>>>
> >>>>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was
> >>>>>>>>>>> wondering if a
> >>>>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of
> >>>>>>>>>>> window?
> >>>>>>>>>>>
> >>>>>>>>>>> It would be helpful if you could describe potential use cases
> >>>>>>>>>>> in more
> >>>>>>>>>>> detail. -- I am mainly wondering about hopping window? Each
> >>>>>>>>>>> record would
> >>>>>>>>>>> always falls into multiple window and thus would be emitted
> >>>>>>>>>>> multiple
> >>>>>>>>>>> times, ie, each time the window closes. Is this really a valid
> >>>>>>>>>>> use case?
> >>>>>>>>>>>
> >>>>>>>>>>> It seems that for de-duplication, one wants to have some
> >>>>>>>>>>> "expiration
> >>>>>>>>>>> time", ie, for each ID, deduplicate all consecutive records
> >>>>>>>>>>> with the
> >>>>>>>>>>> same ID and emit the first record after the "expiration time"
> >>>>>>>>>>> passed. In
> >>>>>>>>>>> terms of a window, this would mean that the window starts at
> >>>>>>>>>>> `r.ts` and
> >>>>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the
> >>>>>>>>>>> data.
> >>>>>>>>>>> TimeWindows are aligned to the epoch though. While
> >>>>>>>>>>> `SlidingWindows` also
> >>>>>>>>>>> align to the data, for the aggregation use-case they go
> >>>>>>>>>>> backward in
> >>>>>>>>>>> time, while we need a window that goes forward in time. It's an
> >>>>>>>>>>> open
> >>>>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be
> >>>>>>>>>>> ok the
> >>>>>>>>>>> make the alignment (into the past vs into the future) an operator
> >>>>>>>>>>> dependent behavior?
> >>>>>>>>>>>
> >>>>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems
> >>>>>>>>>>> better to
> >>>>>>>>>>> allow users to pass in a `Materialized` parameter next to
> >>>>>>>>>>> `DistinctParameters` to configure the state store?
> >>>>>>>>>>>
> >>>>>>>>>>> (4) I am wondering if we should really have 4 overloads for
> >>>>>>>>>>> `DistinctParameters.with()`? It might be better to have one
> >>>>>>>>>>> overload
> >>>>>>>>>>> with all require parameters, and add optional parameters using the
> >>>>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal.
> >>>>>>>>>>>
> >>>>>>>>>>> (5) Even if it might be an implementation detail (and maybe the
> >>>>>>>>>>> KIP
> >>>>>>>>>>> itself does not need to mention it), can you give a high level
> >>>>>>>>>>> overview
> >>>>>>>>>>> how you intent to implement it (that would be easier to grog,
> >>>>>>>>>>> compared
> >>>>>>>>>>> to reading the PR).
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> >>>>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
> >>>>>>>>>>>>
> >>>>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
> >>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I'd like to start a discussion for KIP-655.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> KIP-655:
> >>>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment
> >>>>>>>>>>>>> with the API:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>
> >>
> >>
> 

Reply via email to