Hi Ivan,

Sorry for the silence!

I have just re-read the proposal.

To summarize, you are now only proposing the zero-arg distict() method to be 
added to TimeWindowedKStream and SessionWindowedKStream, right?

I’m in favor of this proposal. 

Thanks,
John

On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
> Hello everyone,
> 
> I would like to remind you about KIP-655 and KIP-759 just in case they 
> got lost in your inbox.
> 
> Now the initial proposal is split into two independent and smaller ones, 
> so it must be easier to review them. Of course, if you have time.
> 
> Regards,
> 
> Ivan
> 
> 
> 24.06.2021 18:11, Ivan Ponomarev пишет:
> > Hello all,
> > 
> > I have rewritten the KIP-655 summarizing what was agreed upon during 
> > this discussion (now the proposal is much simpler and less invasive).
> > 
> > I have also created KIP-759 (cancelRepartition operation) and started a 
> > discussion for it.
> > 
> > Regards,
> > 
> > Ivan.
> > 
> > 
> > 
> > 04.06.2021 8:15, Matthias J. Sax пишет:
> >> Just skimmed over the thread -- first of all, I am glad that we could
> >> merge KIP-418 and ship it :)
> >>
> >> About the re-partitioning concerns, there are already two tickets for it:
> >>
> >>   - https://issues.apache.org/jira/browse/KAFKA-4835
> >>   - https://issues.apache.org/jira/browse/KAFKA-10844
> >>
> >> Thus, it seems best to exclude this topic from this KIP, and do a
> >> separate KIP for it (if necessary, we can "pause" this KIP until the
> >> repartition KIP is done). It's a long standing "issue" and we should
> >> resolve it in a general way I guess.
> >>
> >> (Did not yet ready all responses in detail yet, so keeping this comment
> >> short.)
> >>
> >>
> >> -Matthias
> >>
> >> On 6/2/21 6:35 AM, John Roesler wrote:
> >>> Thanks, Ivan!
> >>>
> >>> That sounds like a great plan to me. Two smaller KIPs are easier to 
> >>> agree on than one big one.
> >>>
> >>> I agree hopping and sliding windows will actually have a duplicating 
> >>> effect. We can avoid adding distinct() to the sliding window 
> >>> interface, but hopping windows are just a different parameterization 
> >>> of epoch-aligned windows. It seems we can’t do much about that except 
> >>> document the issue.
> >>>
> >>> Thanks,
> >>> John
> >>>
> >>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> >>>> Hi John!
> >>>>
> >>>> I think that your proposal is just fantastic, it simplifies things a 
> >>>> lot!
> >>>>
> >>>> I also felt uncomfortable due to the fact that the proposed 
> >>>> `distinct()`
> >>>> is not somewhere near `count()` and `reduce(..)`. But
> >>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
> >>>> a correct option for  me because of the issue with the unneeded
> >>>> repartitioning.
> >>>>
> >>>> The bold idea that we can just CANCEL the repartitioning didn't came to
> >>>> my mind.
> >>>>
> >>>> What seemed to me a single problem is in fact two unrelated problems:
> >>>> `distinct` operation and cancelling the unneeded repartitioning.
> >>>>
> >>>>   > what if we introduce a parameter to `selectKey()` that specifies 
> >>>> that
> >>>> the caller asserts that the new key does _not_ change the data 
> >>>> partitioning?
> >>>>
> >>>> I think a more elegant solution would be not to add a new parameter to
> >>>> `selectKey` and all the other key-changing operations (`map`,
> >>>> `transform`, `flatMap`, ...), but add a new operator
> >>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` 
> >>>> flag
> >>>> for the upstream node. Of course, "use it only if you know what you're
> >>>> doing" warning is to be added. Well, it's a topic for a separate KIP!
> >>>>
> >>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, 
> >>>> then
> >>>> changes to the API are minimally invasive: we're just adding
> >>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
> >>>> that's all.
> >>>>
> >>>> We can now define `distinct` as an operation that returns only a first
> >>>> record that falls into a new window, and filters out all the other
> >>>> records that fall into an already existing window. BTW, we can mock the
> >>>> behaviour of such an operation with `TopologyTestDriver` using
> >>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> >>>>
> >>>> Consider the following example (record times are in seconds):
> >>>>
> >>>> //three bursts of variously ordered records
> >>>> 4, 5, 6
> >>>> 23, 22, 24
> >>>> 34, 33, 32
> >>>> //'late arrivals'
> >>>> 7, 22, 35
> >>>>
> >>>>
> >>>> 1. 'Epoch-aligned deduplication' using tumbling windows:
> >>>>
> >>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> >>>>  
> >>>>
> >>>>
> >>>> produces
> >>>>
> >>>> (key@[00000/10000], 4)
> >>>> (key@[20000/30000], 23)
> >>>> (key@[30000/40000], 34)
> >>>>
> >>>> -- that is, one record per epoch-aligned window.
> >>>>
> >>>> 2. Hopping and sliding windows do not make much sense here, because 
> >>>> they
> >>>> produce multiple intersected windows, so that one record can be
> >>>> multiplied, but we want deduplication.
> >>>>
> >>>> 3. SessionWindows work for 'data-aligned deduplication'.
> >>>>
> >>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> >>>>  
> >>>>
> >>>>
> >>>>
> >>>> produces only
> >>>>
> >>>> ([key@4000/4000], 4)
> >>>> ([key@23000/23000], 23)
> >>>>
> >>>> because all the records bigger than 7 are stuck together in one 
> >>>> session.
> >>>> Setting inactivity gap to 9 seconds will return three records:
> >>>>
> >>>> ([key@4000/4000], 4)
> >>>> ([key@23000/23000], 23)
> >>>> ([key@34000/34000], 34)
> >>>>
> >>>> WDYT? If you like this variant, I will re-write KIP-655 and propose a
> >>>> separate KIP for `cancelRepartitioning` (or whatever name we will 
> >>>> choose
> >>>> for it).
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan
> >>>>
> >>>>
> >>>> 24.05.2021 22:32, John Roesler пишет:
> >>>>> Hey there, Ivan!
> >>>>>
> >>>>> In typical fashion, I'm going to make a somewhat outlandish
> >>>>> proposal. I'm hoping that we can side-step some of the
> >>>>> complications that have arisen. Please bear with me.
> >>>>>
> >>>>> It seems like `distinct()` is not fundamentally unlike other windowed
> >>>>> "aggregation" operations. Your concern about unnecessary
> >>>>> repartitioning seems to apply just as well to `count()` as to 
> >>>>> `distinct()`.
> >>>>> This has come up before, but I don't remember when: what if we
> >>>>> introduce a parameter to `selectKey()` that specifies that the caller
> >>>>> asserts that the new key does _not_ change the data partitioning?
> >>>>> The docs on that parameter would of course spell out all the "rights
> >>>>> and responsibilities" of setting it.
> >>>>>
> >>>>> In that case, we could indeed get back to
> >>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the
> >>>>> key mapper and the windowing function without having to carve out
> >>>>> a separate domain just for `distinct()`. All the rest of the KStream
> >>>>> operations would also benefit.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Thanks,
> >>>>> John
> >>>>>
> >>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> >>>>>> Hello everyone,
> >>>>>>
> >>>>>> let me revive the discussion for KIP-655. Now I have some time 
> >>>>>> again and
> >>>>>> I'm eager to finalize this.
> >>>>>>
> >>>>>> Based on what was already discussed, I think that we can split the
> >>>>>> discussion into three topics for our convenience.
> >>>>>>
> >>>>>> The three topics are:
> >>>>>>
> >>>>>> - idExtractor  (how should we extract the deduplication key for 
> >>>>>> the record)
> >>>>>>
> >>>>>> - timeWindows (what time windows should we use)
> >>>>>>
> >>>>>> - miscellaneous (naming etc.)
> >>>>>>
> >>>>>> ---- idExtractor ----
> >>>>>>
> >>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, 
> >>>>>> v) ->
> >>>>>> k.  The drawback here is that we must warn the user to choose such a
> >>>>>> function that sets different IDs for records from different 
> >>>>>> partitions,
> >>>>>> otherwise same IDs might be not co-partitioned (and not 
> >>>>>> deduplicated as
> >>>>>> a result). Additional concern: what should we do when this function
> >>>>>> returns null?
> >>>>>>
> >>>>>> Matthias proposed key-only deduplication: that is, no idExtractor at
> >>>>>> all, and if we want to use `distinct` for a particular identifier, we
> >>>>>> must `selectKey()` before. The drawback of this approach is that 
> >>>>>> we will
> >>>>>> always have repartitioning after the key selection, while in practice
> >>>>>> repartitioning will not always be necessary (for example, when the 
> >>>>>> data
> >>>>>> stream is such that different values infer different keys).
> >>>>>>
> >>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe' 
> >>>>>> variant
> >>>>>> is also not very convenient for developers, since we're forcing 
> >>>>>> them to
> >>>>>> change the structure of their records.
> >>>>>>
> >>>>>> A 'golden mean' here might be using composite ID with its first
> >>>>>> component equals to k and its second component equals to some f(v) (f
> >>>>>> defaults to v -> null, and null value returned by f(v) means
> >>>>>> 'deduplicate by the key only'). The nuance here is that we will have
> >>>>>> serializers only for types of k and f(v), and we must correctly
> >>>>>> serialize a tuple (k, f(v)), but of course this is doable.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> ---- timeWindows ----
> >>>>>>
> >>>>>> Originally I proposed TimeWindows only just because they solved my
> >>>>>> particular case :-) but agree with Matthias' and Sophie's objections.
> >>>>>>
> >>>>>> I like the Sophie's point: we need both epoch-aligned and 
> >>>>>> data-aligned
> >>>>>> windows. IMO this is absolutely correct: "data-aligned is useful for
> >>>>>> example when you know that a large number of updates to a single key
> >>>>>> will occur in short bursts, and epoch-aligned when you 
> >>>>>> specifically want
> >>>>>> to get just a single update per discrete time interval."
> >>>>>>
> >>>>>> I just cannot agree right away with Sophie's
> >>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies  the
> >>>>>> key-only deduplication -- see the previous topic.
> >>>>>>
> >>>>>> Epoch-aligned windows are very simple: they should forward only one
> >>>>>> record per enumerated time window. TimeWindows are exactly what we 
> >>>>>> want
> >>>>>> here. I mentioned in the KIP both tumbling and hopping windows just
> >>>>>> because both are possible for TimeWindows, but indeed I don't see any
> >>>>>> real use case for hopping windows, only tumbling windows make 
> >>>>>> sence IMO.
> >>>>>>
> >>>>>> For data-aligned windows SlidingWindow interface seems to be a nearly
> >>>>>> valid choice. Nearly. It should forward a record once when it's first
> >>>>>> seen, and then not again for any identical records that fall into the
> >>>>>> next N timeUnits.  However, we cannot reuse SlidingWindow as is, 
> >>>>>> because
> >>>>>> just as Matthias noted, SlidingWindows go backward in time, while we
> >>>>>> need a windows that go forward in time, and are not opened while 
> >>>>>> records
> >>>>>> fall into an already existing window. We definitely should make 
> >>>>>> our own
> >>>>>> implementation, maybe we should call it ExpirationWindow? WDYT?
> >>>>>>
> >>>>>>
> >>>>>> ---- miscellaneous ----
> >>>>>>
> >>>>>> Persistent/in-memory stores. Matthias proposed to pass Materialized
> >>>>>> parameter next to DistinctParameters (and this is necessary, 
> >>>>>> because we
> >>>>>> will need to provide a serializer for extracted id). This is 
> >>>>>> absolutely
> >>>>>> valid point, I agree and I will fix it in the KIP.
> >>>>>>
> >>>>>> Naming. Sophie noted that the Streams DSL operators are typically 
> >>>>>> named
> >>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I 
> >>>>>> think
> >>>>>> that while it's important to stick to the naming conventions, it 
> >>>>>> is also
> >>>>>> important to think of the experience of those who come from different
> >>>>>> stacks/technologies. People who are familiar with SQL and Java 
> >>>>>> Streams
> >>>>>> API must know for sure what does 'distinct' mean, while data
> >>>>>> deduplication in general is a more complex task and thus 
> >>>>>> `deduplicate`
> >>>>>> might be misleading. But I'm ready to be convinced if the majority
> >>>>>> thinks otherwise.
> >>>>>>
> >>>>>>
> >>>>>> Regards,
> >>>>>>
> >>>>>> Ivan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> >>>>>>> Hey all,
> >>>>>>>
> >>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit all
> >>>>>>> possible use cases.
> >>>>>>> Both seem totally reasonable to me: data-aligned is useful for 
> >>>>>>> example when
> >>>>>>> you know
> >>>>>>> that a large number of updates to a single key will occur in 
> >>>>>>> short bursts,
> >>>>>>> and epoch-
> >>>>>>> aligned when you specifically want to get just a single update 
> >>>>>>> per discrete
> >>>>>>> time
> >>>>>>> interval.
> >>>>>>>
> >>>>>>> Going a step further, though, what if you want just a single 
> >>>>>>> update per
> >>>>>>> calendar
> >>>>>>> month, or per year with accounting for leap years? Neither of 
> >>>>>>> those are
> >>>>>>> serviced that
> >>>>>>> well by the existing Windows specification to windowed 
> >>>>>>> aggregations, a
> >>>>>>> well-known
> >>>>>>> limitation of the current API. There is actually a KIP
> >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
> >>>>>>>  
> >>>>>>>
> >>>>>>> going
> >>>>>>> on in parallel to fix this
> >>>>>>> exact issue and make the windowing interface much more flexible. 
> >>>>>>> Maybe
> >>>>>>> instead
> >>>>>>> of re-implementing this windowing interface in a similarly 
> >>>>>>> limited fashion
> >>>>>>> for the
> >>>>>>> Distinct operator, we could leverage it here and get all the 
> >>>>>>> benefits
> >>>>>>> coming with
> >>>>>>> KIP-645.
> >>>>>>>
> >>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config 
> >>>>>>> from the
> >>>>>>> DistinctParameters class, and move the distinct() method from the 
> >>>>>>> KStream
> >>>>>>> interface
> >>>>>>> to the TimeWindowedKStream interface. Since it's semantically 
> >>>>>>> similar to a
> >>>>>>> kind of
> >>>>>>> windowed aggregation, it makes sense to align it with the 
> >>>>>>> existing windowing
> >>>>>>> framework, ie:
> >>>>>>>
> >>>>>>> inputStream
> >>>>>>>        .groupKyKey()
> >>>>>>>        .windowedBy()
> >>>>>>>        .distinct()
> >>>>>>>
> >>>>>>> Then we could use data-aligned windows if SlidingWindows is 
> >>>>>>> specified in
> >>>>>>> the
> >>>>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable 
> >>>>>>> window)
> >>>>>>> if a Windows is specified in windowedBy() (or an 
> >>>>>>> EnumerableWindowDefinition
> >>>>>>> once KIP-645 is implemented to replace Windows).
> >>>>>>>
> >>>>>>> *SlidingWindows*: should forward a record once when it's first 
> >>>>>>> seen, and
> >>>>>>> then not again
> >>>>>>> for any identical records that fall into the next N timeUnits. This
> >>>>>>> includes out-of-order
> >>>>>>> records, ie if you have a SlidingWindows of size 10s and process 
> >>>>>>> records at
> >>>>>>> time
> >>>>>>> 15s, 20s, 14s then you would just forward the one at 15s. 
> >>>>>>> Presumably, if
> >>>>>>> you're
> >>>>>>> using SlidingWindows, you don't care about what falls into exact 
> >>>>>>> time
> >>>>>>> boxes, you just
> >>>>>>> want to deduplicate. If you do care about exact time boxing then 
> >>>>>>> you should
> >>>>>>> use...
> >>>>>>>
> >>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward 
> >>>>>>> only one
> >>>>>>> record
> >>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s 
> >>>>>>> where the
> >>>>>>> windows
> >>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the 
> >>>>>>> record at 15s
> >>>>>>> and also
> >>>>>>> the record at 14s
> >>>>>>>
> >>>>>>> Just an idea: not sure if the impedance mismatch would throw 
> >>>>>>> users off
> >>>>>>> since the
> >>>>>>> semantics of the distinct windows are slightly different than in the
> >>>>>>> aggregations.
> >>>>>>> But if we don't fit this into the existing windowed framework, 
> >>>>>>> then we
> >>>>>>> shouldn't use
> >>>>>>> any existing Windows-type classes at all, imo. ie we should 
> >>>>>>> create a new
> >>>>>>> DistinctWindows config class, similar to how stream-stream joins 
> >>>>>>> get their
> >>>>>>> own
> >>>>>>> JoinWindows class
> >>>>>>>
> >>>>>>> I also think that non-windowed deduplication could be useful, in 
> >>>>>>> which case
> >>>>>>> we
> >>>>>>> would want to also have the distinct() operator on the KStream 
> >>>>>>> interface.
> >>>>>>>
> >>>>>>>
> >>>>>>> One quick note regarding the naming: it seems like the Streams 
> >>>>>>> DSL operators
> >>>>>>> are typically named as verbs rather than adjectives, for example. 
> >>>>>>> #suppress
> >>>>>>> or
> >>>>>>> #aggregate. I get that there's some precedent for  'distinct' 
> >>>>>>> specifically,
> >>>>>>> but
> >>>>>>> maybe something like 'deduplicate' would be more appropriate for 
> >>>>>>> the Streams
> >>>>>>> API.
> >>>>>>>
> >>>>>>> WDYT?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev 
> >>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Matthias,
> >>>>>>>>
> >>>>>>>> Thanks for your review! It made me think deeper, and indeed I 
> >>>>>>>> understood
> >>>>>>>> that I was missing some important details.
> >>>>>>>>
> >>>>>>>> To simplify, let me explain my particular use case first so I 
> >>>>>>>> can refer
> >>>>>>>> to it later.
> >>>>>>>>
> >>>>>>>> We have a system that collects information about ongoing live 
> >>>>>>>> sporting
> >>>>>>>> events from different sources. The information sources have 
> >>>>>>>> their IDs
> >>>>>>>> and these IDs are keys of the stream. Each source emits messages
> >>>>>>>> concerning sporting events, and we can have many messages about 
> >>>>>>>> each
> >>>>>>>> sporing event from each source. Event ID is extracted from the 
> >>>>>>>> message.
> >>>>>>>>
> >>>>>>>> We need a database of event IDs that were reported at least once 
> >>>>>>>> by each
> >>>>>>>> source (important: events from different sources are considered 
> >>>>>>>> to be
> >>>>>>>> different entities). The requirements are:
> >>>>>>>>
> >>>>>>>> 1) each new event ID should be written to the database as soon 
> >>>>>>>> as possible
> >>>>>>>>
> >>>>>>>> 2) although it's ok and sometimes even desired to repeat the
> >>>>>>>> notification about already known event ID, but we wouldn’t like our
> >>>>>>>> database to be bothered by the same event ID more often than 
> >>>>>>>> once in a
> >>>>>>>> given period of time (say, 15 minutes).
> >>>>>>>>
> >>>>>>>> With this example in mind let me answer your questions
> >>>>>>>>
> >>>>>>>>     > (1) Using the `idExtractor` has the issue that data might 
> >>>>>>>> not be
> >>>>>>>>     > co-partitioned as you mentioned in the KIP. Thus, I am 
> >>>>>>>> wondering if it
> >>>>>>>>     > might be better to do deduplication only on the key? If 
> >>>>>>>> one sets a new
> >>>>>>>>     > key upstream (ie, extracts the deduplication id into the 
> >>>>>>>> key), the
> >>>>>>>>     > `distinct` operator could automatically repartition the 
> >>>>>>>> data and thus we
> >>>>>>>>     > would avoid user errors.
> >>>>>>>>
> >>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we 
> >>>>>>>> will
> >>>>>>>> never cause problems with co-partitioning. But in practice, we 
> >>>>>>>> often
> >>>>>>>> don't need repartitioning even if 'dedup ID' is different from 
> >>>>>>>> the key,
> >>>>>>>> like in my example above. So here we have a sort of 'performance vs
> >>>>>>>> security' tradeoff.
> >>>>>>>>
> >>>>>>>> The 'golden middle way' here can be the following: we can form a
> >>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case
> >>>>>>>> idExtractor is not provided, we deduplicate by key only (as in 
> >>>>>>>> original
> >>>>>>>> proposal). Then idExtractor transforms only the value (and not 
> >>>>>>>> the key)
> >>>>>>>> and its result is appended to the key. Records from different 
> >>>>>>>> partitions
> >>>>>>>> will inherently have different deduplication IDs and all the 
> >>>>>>>> data will
> >>>>>>>> be co-partitioned. As with any stateful operation, we will 
> >>>>>>>> repartition
> >>>>>>>> the topic in case the key was changed upstream, but only in this 
> >>>>>>>> case,
> >>>>>>>> thus avoiding unnecessary repartitioning. My example above fits 
> >>>>>>>> this
> >>>>>>>> perfectly.
> >>>>>>>>
> >>>>>>>>     > (2) What is the motivation for allowing the `idExtractor` 
> >>>>>>>> to return
> >>>>>>>>     > `null`? Might be good to have some use-case examples for 
> >>>>>>>> this feature.
> >>>>>>>>
> >>>>>>>> Can't think of any use-cases. As it often happens, it's just 
> >>>>>>>> came with a
> >>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here:
> >>>>>>>>
> >>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> >>>>>>>>  
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we
> >>>>>>>> accept the above proposal of having deduplication ID as KEY + 
> >>>>>>>> postfix,
> >>>>>>>> then null can be treated as no postfix at all. If we don't 
> >>>>>>>> accept this
> >>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be a
> >>>>>>>> reasonable assumption (we can't get or put null as a key to a KV 
> >>>>>>>> store,
> >>>>>>>> so a record with null ID is always going to look 'new' for us).
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>     > (2) Is using a `TimeWindow` really what we want? I was 
> >>>>>>>> wondering if a
> >>>>>>>>     > `SlidingWindow` might be better? Or maybe we need a new 
> >>>>>>>> type of window?
> >>>>>>>>
> >>>>>>>> Agree. It's probably not what we want. Once I thought that reusing
> >>>>>>>> TimeWindow is a clever idea, now I don't.
> >>>>>>>>
> >>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I 
> >>>>>>>> don't
> >>>>>>>> know if anyone going to need this. Epoch alignment is good for
> >>>>>>>> aggregation, but deduplication is a different story.
> >>>>>>>>
> >>>>>>>> Let me describe the semantic the way I see it now and tell me 
> >>>>>>>> what you
> >>>>>>>> think:
> >>>>>>>>
> >>>>>>>> - the only parameter that defines the deduplication logic is 
> >>>>>>>> 'expiration
> >>>>>>>> period'
> >>>>>>>>
> >>>>>>>> - when a deduplication ID arrives and we cannot find it in the 
> >>>>>>>> store, we
> >>>>>>>> forward the message downstream and store the ID + its timestamp.
> >>>>>>>>
> >>>>>>>> - when an out-of-order ID arrives with an older timestamp and we 
> >>>>>>>> find a
> >>>>>>>> 'fresher' record, we do nothing and don't forward the message 
> >>>>>>>> (??? OR
> >>>>>>>> NOT? In what case would we want to forward an out-of-order 
> >>>>>>>> message?)
> >>>>>>>>
> >>>>>>>> - when an ID with fresher timestamp arrives we check if it falls 
> >>>>>>>> into
> >>>>>>>> the expiration period and either forward it or not, but in both 
> >>>>>>>> cases we
> >>>>>>>> update the timestamp of the message in the store
> >>>>>>>>
> >>>>>>>> - the WindowStore retention mechanism should clean up very old 
> >>>>>>>> records
> >>>>>>>> in order not to run out of space.
> >>>>>>>>
> >>>>>>>>     > (3) `isPersistent` -- instead of using this flag, it seems 
> >>>>>>>> better to
> >>>>>>>>     > allow users to pass in a `Materialized` parameter next to
> >>>>>>>>     > `DistinctParameters` to configure the state store?
> >>>>>>>>
> >>>>>>>> Fully agree! Users might also want to change the retention time.
> >>>>>>>>
> >>>>>>>>     > (4) I am wondering if we should really have 4 overloads for
> >>>>>>>>     > `DistinctParameters.with()`? It might be better to have 
> >>>>>>>> one overload
> >>>>>>>>     > with all require parameters, and add optional parameters 
> >>>>>>>> using the
> >>>>>>>>     > builder pattern? This seems to follow the DSL Grammer 
> >>>>>>>> proposal.
> >>>>>>>>
> >>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern 
> >>>>>>>> because of
> >>>>>>>> Java type inference limitations. We have to provide type 
> >>>>>>>> parameters to
> >>>>>>>> the builder methods or the code won't compile: see e. g. this
> >>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and 
> >>>>>>>> following
> >>>>>>>> discussion with Tagir Valeev.
> >>>>>>>>
> >>>>>>>> When we came across the similar difficulties in KIP-418, we finally
> >>>>>>>> decided to add all the necessary overloads to parameter class. 
> >>>>>>>> So I just
> >>>>>>>> reproduced that approach here.
> >>>>>>>>
> >>>>>>>>     > (5) Even if it might be an implementation detail (and 
> >>>>>>>> maybe the KIP
> >>>>>>>>     > itself does not need to mention it), can you give a high 
> >>>>>>>> level overview
> >>>>>>>>     > how you intent to implement it (that would be easier to 
> >>>>>>>> grog, compared
> >>>>>>>>     > to reading the PR).
> >>>>>>>>
> >>>>>>>> Well as with any operation on KStreamImpl level I'm building a 
> >>>>>>>> store and
> >>>>>>>> a processor node.
> >>>>>>>>
> >>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with 
> >>>>>>>> the
> >>>>>>>> logic regarding the forwarding/muting of the records located in
> >>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process
> >>>>>>>>
> >>>>>>>> ----
> >>>>>>>>
> >>>>>>>> Matthias, if you are still reading this :-) a gentle reminder: 
> >>>>>>>> my PR for
> >>>>>>>> already accepted KIP-418 is still waiting for your review. I 
> >>>>>>>> think it's
> >>>>>>>> better for me to finalize at least one  KIP before proceeding to 
> >>>>>>>> a new
> >>>>>>>> one :-)
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>>
> >>>>>>>> Ivan
> >>>>>>>>
> >>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет:
> >>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication 
> >>>>>>>>> operator is for
> >>>>>>>>> sure a good addition.
> >>>>>>>>>
> >>>>>>>>> Couple of questions:
> >>>>>>>>>
> >>>>>>>>> (1) Using the `idExtractor` has the issue that data might not be
> >>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am 
> >>>>>>>>> wondering if it
> >>>>>>>>> might be better to do deduplication only on the key? If one 
> >>>>>>>>> sets a new
> >>>>>>>>> key upstream (ie, extracts the deduplication id into the key), the
> >>>>>>>>> `distinct` operator could automatically repartition the data 
> >>>>>>>>> and thus we
> >>>>>>>>> would avoid user errors.
> >>>>>>>>>
> >>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to 
> >>>>>>>>> return
> >>>>>>>>> `null`? Might be good to have some use-case examples for this 
> >>>>>>>>> feature.
> >>>>>>>>>
> >>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was 
> >>>>>>>>> wondering if a
> >>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of 
> >>>>>>>>> window?
> >>>>>>>>>
> >>>>>>>>> It would be helpful if you could describe potential use cases 
> >>>>>>>>> in more
> >>>>>>>>> detail. -- I am mainly wondering about hopping window? Each 
> >>>>>>>>> record would
> >>>>>>>>> always falls into multiple window and thus would be emitted 
> >>>>>>>>> multiple
> >>>>>>>>> times, ie, each time the window closes. Is this really a valid 
> >>>>>>>>> use case?
> >>>>>>>>>
> >>>>>>>>> It seems that for de-duplication, one wants to have some 
> >>>>>>>>> "expiration
> >>>>>>>>> time", ie, for each ID, deduplicate all consecutive records 
> >>>>>>>>> with the
> >>>>>>>>> same ID and emit the first record after the "expiration time" 
> >>>>>>>>> passed. In
> >>>>>>>>> terms of a window, this would mean that the window starts at 
> >>>>>>>>> `r.ts` and
> >>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the 
> >>>>>>>>> data.
> >>>>>>>>> TimeWindows are aligned to the epoch though. While 
> >>>>>>>>> `SlidingWindows` also
> >>>>>>>>> align to the data, for the aggregation use-case they go 
> >>>>>>>>> backward in
> >>>>>>>>> time, while we need a window that goes forward in time. It's an 
> >>>>>>>>> open
> >>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be 
> >>>>>>>>> ok the
> >>>>>>>>> make the alignment (into the past vs into the future) an operator
> >>>>>>>>> dependent behavior?
> >>>>>>>>>
> >>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems 
> >>>>>>>>> better to
> >>>>>>>>> allow users to pass in a `Materialized` parameter next to
> >>>>>>>>> `DistinctParameters` to configure the state store?
> >>>>>>>>>
> >>>>>>>>> (4) I am wondering if we should really have 4 overloads for
> >>>>>>>>> `DistinctParameters.with()`? It might be better to have one 
> >>>>>>>>> overload
> >>>>>>>>> with all require parameters, and add optional parameters using the
> >>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal.
> >>>>>>>>>
> >>>>>>>>> (5) Even if it might be an implementation detail (and maybe the 
> >>>>>>>>> KIP
> >>>>>>>>> itself does not need to mention it), can you give a high level 
> >>>>>>>>> overview
> >>>>>>>>> how you intent to implement it (that would be easier to grog, 
> >>>>>>>>> compared
> >>>>>>>>> to reading the PR).
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> >>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
> >>>>>>>>>>
> >>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to start a discussion for KIP-655.
> >>>>>>>>>>>
> >>>>>>>>>>> KIP-655:
> >>>>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >>>>>>>>  
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment 
> >>>>>>>>>>> with the API:
> >>>>>>>>>>>
> >>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>>
> >>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> > 
> 
> 

Reply via email to