Ivan,

I read through the discussion and your new proposal. I have a couple of
questions.

1. As we have cancelRepartition, wouldn't selectKey be sufficient. You
still have idExtractor. Maybe I misunderstood the discussion.
2. isPersistent should be replaced by Materialized. It looked like you
agreed to it as it might be useful to specify the retention also.

Thanks
Mohan


On Thu, Jun 24, 2021 at 8:12 AM Ivan Ponomarev <iponoma...@mail.ru.invalid>
wrote:

> Hello all,
>
> I have rewritten the KIP-655 summarizing what was agreed upon during
> this discussion (now the proposal is much simpler and less invasive).
>
> I have also created KIP-759 (cancelRepartition operation) and started a
> discussion for it.
>
> Regards,
>
> Ivan.
>
>
>
> 04.06.2021 8:15, Matthias J. Sax пишет:
> > Just skimmed over the thread -- first of all, I am glad that we could
> > merge KIP-418 and ship it :)
> >
> > About the re-partitioning concerns, there are already two tickets for it:
> >
> >   - https://issues.apache.org/jira/browse/KAFKA-4835
> >   - https://issues.apache.org/jira/browse/KAFKA-10844
> >
> > Thus, it seems best to exclude this topic from this KIP, and do a
> > separate KIP for it (if necessary, we can "pause" this KIP until the
> > repartition KIP is done). It's a long standing "issue" and we should
> > resolve it in a general way I guess.
> >
> > (Did not yet ready all responses in detail yet, so keeping this comment
> > short.)
> >
> >
> > -Matthias
> >
> > On 6/2/21 6:35 AM, John Roesler wrote:
> >> Thanks, Ivan!
> >>
> >> That sounds like a great plan to me. Two smaller KIPs are easier to
> agree on than one big one.
> >>
> >> I agree hopping and sliding windows will actually have a duplicating
> effect. We can avoid adding distinct() to the sliding window interface, but
> hopping windows are just a different parameterization of epoch-aligned
> windows. It seems we can’t do much about that except document the issue.
> >>
> >> Thanks,
> >> John
> >>
> >> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> >>> Hi John!
> >>>
> >>> I think that your proposal is just fantastic, it simplifies things a
> lot!
> >>>
> >>> I also felt uncomfortable due to the fact that the proposed
> `distinct()`
> >>> is not somewhere near `count()` and `reduce(..)`. But
> >>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
> >>> a correct option for  me because of the issue with the unneeded
> >>> repartitioning.
> >>>
> >>> The bold idea that we can just CANCEL the repartitioning didn't came to
> >>> my mind.
> >>>
> >>> What seemed to me a single problem is in fact two unrelated problems:
> >>> `distinct` operation and cancelling the unneeded repartitioning.
> >>>
> >>>   > what if we introduce a parameter to `selectKey()` that specifies
> that
> >>> the caller asserts that the new key does _not_ change the data
> partitioning?
> >>>
> >>> I think a more elegant solution would be not to add a new parameter to
> >>> `selectKey` and all the other key-changing operations (`map`,
> >>> `transform`, `flatMap`, ...), but add a new operator
> >>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation`
> flag
> >>> for the upstream node. Of course, "use it only if you know what you're
> >>> doing" warning is to be added. Well, it's a topic for a separate KIP!
> >>>
> >>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities,
> then
> >>> changes to the API are minimally invasive: we're just adding
> >>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
> >>> that's all.
> >>>
> >>> We can now define `distinct` as an operation that returns only a first
> >>> record that falls into a new window, and filters out all the other
> >>> records that fall into an already existing window. BTW, we can mock the
> >>> behaviour of such an operation with `TopologyTestDriver` using
> >>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> >>>
> >>> Consider the following example (record times are in seconds):
> >>>
> >>> //three bursts of variously ordered records
> >>> 4, 5, 6
> >>> 23, 22, 24
> >>> 34, 33, 32
> >>> //'late arrivals'
> >>> 7, 22, 35
> >>>
> >>>
> >>> 1. 'Epoch-aligned deduplication' using tumbling windows:
> >>>
> >>>
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> >>>
> >>> produces
> >>>
> >>> (key@[00000/10000], 4)
> >>> (key@[20000/30000], 23)
> >>> (key@[30000/40000], 34)
> >>>
> >>> -- that is, one record per epoch-aligned window.
> >>>
> >>> 2. Hopping and sliding windows do not make much sense here, because
> they
> >>> produce multiple intersected windows, so that one record can be
> >>> multiplied, but we want deduplication.
> >>>
> >>> 3. SessionWindows work for 'data-aligned deduplication'.
> >>>
> >>>
> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> >>>
> >>>
> >>> produces only
> >>>
> >>> ([key@4000/4000], 4)
> >>> ([key@23000/23000], 23)
> >>>
> >>> because all the records bigger than 7 are stuck together in one
> session.
> >>> Setting inactivity gap to 9 seconds will return three records:
> >>>
> >>> ([key@4000/4000], 4)
> >>> ([key@23000/23000], 23)
> >>> ([key@34000/34000], 34)
> >>>
> >>> WDYT? If you like this variant, I will re-write KIP-655 and propose a
> >>> separate KIP for `cancelRepartitioning` (or whatever name we will
> choose
> >>> for it).
> >>>
> >>> Regards,
> >>>
> >>> Ivan
> >>>
> >>>
> >>> 24.05.2021 22:32, John Roesler пишет:
> >>>> Hey there, Ivan!
> >>>>
> >>>> In typical fashion, I'm going to make a somewhat outlandish
> >>>> proposal. I'm hoping that we can side-step some of the
> >>>> complications that have arisen. Please bear with me.
> >>>>
> >>>> It seems like `distinct()` is not fundamentally unlike other windowed
> >>>> "aggregation" operations. Your concern about unnecessary
> >>>> repartitioning seems to apply just as well to `count()` as to
> `distinct()`.
> >>>> This has come up before, but I don't remember when: what if we
> >>>> introduce a parameter to `selectKey()` that specifies that the caller
> >>>> asserts that the new key does _not_ change the data partitioning?
> >>>> The docs on that parameter would of course spell out all the "rights
> >>>> and responsibilities" of setting it.
> >>>>
> >>>> In that case, we could indeed get back to
> >>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the
> >>>> key mapper and the windowing function without having to carve out
> >>>> a separate domain just for `distinct()`. All the rest of the KStream
> >>>> operations would also benefit.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Thanks,
> >>>> John
> >>>>
> >>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> >>>>> Hello everyone,
> >>>>>
> >>>>> let me revive the discussion for KIP-655. Now I have some time again
> and
> >>>>> I'm eager to finalize this.
> >>>>>
> >>>>> Based on what was already discussed, I think that we can split the
> >>>>> discussion into three topics for our convenience.
> >>>>>
> >>>>> The three topics are:
> >>>>>
> >>>>> - idExtractor  (how should we extract the deduplication key for the
> record)
> >>>>>
> >>>>> - timeWindows (what time windows should we use)
> >>>>>
> >>>>> - miscellaneous (naming etc.)
> >>>>>
> >>>>> ---- idExtractor ----
> >>>>>
> >>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k,
> v) ->
> >>>>> k.  The drawback here is that we must warn the user to choose such a
> >>>>> function that sets different IDs for records from different
> partitions,
> >>>>> otherwise same IDs might be not co-partitioned (and not deduplicated
> as
> >>>>> a result). Additional concern: what should we do when this function
> >>>>> returns null?
> >>>>>
> >>>>> Matthias proposed key-only deduplication: that is, no idExtractor at
> >>>>> all, and if we want to use `distinct` for a particular identifier, we
> >>>>> must `selectKey()` before. The drawback of this approach is that we
> will
> >>>>> always have repartitioning after the key selection, while in practice
> >>>>> repartitioning will not always be necessary (for example, when the
> data
> >>>>> stream is such that different values infer different keys).
> >>>>>
> >>>>> So here we have a 'safety vs. performance' trade-off. But 'safe'
> variant
> >>>>> is also not very convenient for developers, since we're forcing them
> to
> >>>>> change the structure of their records.
> >>>>>
> >>>>> A 'golden mean' here might be using composite ID with its first
> >>>>> component equals to k and its second component equals to some f(v) (f
> >>>>> defaults to v -> null, and null value returned by f(v) means
> >>>>> 'deduplicate by the key only'). The nuance here is that we will have
> >>>>> serializers only for types of k and f(v), and we must correctly
> >>>>> serialize a tuple (k, f(v)), but of course this is doable.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> ---- timeWindows ----
> >>>>>
> >>>>> Originally I proposed TimeWindows only just because they solved my
> >>>>> particular case :-) but agree with Matthias' and Sophie's objections.
> >>>>>
> >>>>> I like the Sophie's point: we need both epoch-aligned and
> data-aligned
> >>>>> windows. IMO this is absolutely correct: "data-aligned is useful for
> >>>>> example when you know that a large number of updates to a single key
> >>>>> will occur in short bursts, and epoch-aligned when you specifically
> want
> >>>>> to get just a single update per discrete time interval."
> >>>>>
> >>>>> I just cannot agree right away with Sophie's
> >>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies  the
> >>>>> key-only deduplication -- see the previous topic.
> >>>>>
> >>>>> Epoch-aligned windows are very simple: they should forward only one
> >>>>> record per enumerated time window. TimeWindows are exactly what we
> want
> >>>>> here. I mentioned in the KIP both tumbling and hopping windows just
> >>>>> because both are possible for TimeWindows, but indeed I don't see any
> >>>>> real use case for hopping windows, only tumbling windows make sence
> IMO.
> >>>>>
> >>>>> For data-aligned windows SlidingWindow interface seems to be a nearly
> >>>>> valid choice. Nearly. It should forward a record once when it's first
> >>>>> seen, and then not again for any identical records that fall into the
> >>>>> next N timeUnits.  However, we cannot reuse SlidingWindow as is,
> because
> >>>>> just as Matthias noted, SlidingWindows go backward in time, while we
> >>>>> need a windows that go forward in time, and are not opened while
> records
> >>>>> fall into an already existing window. We definitely should make our
> own
> >>>>> implementation, maybe we should call it ExpirationWindow? WDYT?
> >>>>>
> >>>>>
> >>>>> ---- miscellaneous ----
> >>>>>
> >>>>> Persistent/in-memory stores. Matthias proposed to pass Materialized
> >>>>> parameter next to DistinctParameters (and this is necessary, because
> we
> >>>>> will need to provide a serializer for extracted id). This is
> absolutely
> >>>>> valid point, I agree and I will fix it in the KIP.
> >>>>>
> >>>>> Naming. Sophie noted that the Streams DSL operators are typically
> named
> >>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I
> think
> >>>>> that while it's important to stick to the naming conventions, it is
> also
> >>>>> important to think of the experience of those who come from different
> >>>>> stacks/technologies. People who are familiar with SQL and Java
> Streams
> >>>>> API must know for sure what does 'distinct' mean, while data
> >>>>> deduplication in general is a more complex task and thus
> `deduplicate`
> >>>>> might be misleading. But I'm ready to be convinced if the majority
> >>>>> thinks otherwise.
> >>>>>
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Ivan
> >>>>>
> >>>>>
> >>>>>
> >>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> >>>>>> Hey all,
> >>>>>>
> >>>>>> I'm not convinced either epoch-aligned or data-aligned will fit all
> >>>>>> possible use cases.
> >>>>>> Both seem totally reasonable to me: data-aligned is useful for
> example when
> >>>>>> you know
> >>>>>> that a large number of updates to a single key will occur in short
> bursts,
> >>>>>> and epoch-
> >>>>>> aligned when you specifically want to get just a single update per
> discrete
> >>>>>> time
> >>>>>> interval.
> >>>>>>
> >>>>>> Going a step further, though, what if you want just a single update
> per
> >>>>>> calendar
> >>>>>> month, or per year with accounting for leap years? Neither of those
> are
> >>>>>> serviced that
> >>>>>> well by the existing Windows specification to windowed
> aggregations, a
> >>>>>> well-known
> >>>>>> limitation of the current API. There is actually a KIP
> >>>>>> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> >
> >>>>>> going
> >>>>>> on in parallel to fix this
> >>>>>> exact issue and make the windowing interface much more flexible.
> Maybe
> >>>>>> instead
> >>>>>> of re-implementing this windowing interface in a similarly limited
> fashion
> >>>>>> for the
> >>>>>> Distinct operator, we could leverage it here and get all the
> benefits
> >>>>>> coming with
> >>>>>> KIP-645.
> >>>>>>
> >>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config
> from the
> >>>>>> DistinctParameters class, and move the distinct() method from the
> KStream
> >>>>>> interface
> >>>>>> to the TimeWindowedKStream interface. Since it's semantically
> similar to a
> >>>>>> kind of
> >>>>>> windowed aggregation, it makes sense to align it with the existing
> windowing
> >>>>>> framework, ie:
> >>>>>>
> >>>>>> inputStream
> >>>>>>        .groupKyKey()
> >>>>>>        .windowedBy()
> >>>>>>        .distinct()
> >>>>>>
> >>>>>> Then we could use data-aligned windows if SlidingWindows is
> specified in
> >>>>>> the
> >>>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable
> window)
> >>>>>> if a Windows is specified in windowedBy() (or an
> EnumerableWindowDefinition
> >>>>>> once KIP-645 is implemented to replace Windows).
> >>>>>>
> >>>>>> *SlidingWindows*: should forward a record once when it's first
> seen, and
> >>>>>> then not again
> >>>>>> for any identical records that fall into the next N timeUnits. This
> >>>>>> includes out-of-order
> >>>>>> records, ie if you have a SlidingWindows of size 10s and process
> records at
> >>>>>> time
> >>>>>> 15s, 20s, 14s then you would just forward the one at 15s.
> Presumably, if
> >>>>>> you're
> >>>>>> using SlidingWindows, you don't care about what falls into exact
> time
> >>>>>> boxes, you just
> >>>>>> want to deduplicate. If you do care about exact time boxing then
> you should
> >>>>>> use...
> >>>>>>
> >>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward
> only one
> >>>>>> record
> >>>>>> per enumerated time window. If you get a records at 15s, 20s,14s
> where the
> >>>>>> windows
> >>>>>> are enumerated at [5,14], [15, 24], etc then you forward the record
> at 15s
> >>>>>> and also
> >>>>>> the record at 14s
> >>>>>>
> >>>>>> Just an idea: not sure if the impedance mismatch would throw users
> off
> >>>>>> since the
> >>>>>> semantics of the distinct windows are slightly different than in the
> >>>>>> aggregations.
> >>>>>> But if we don't fit this into the existing windowed framework, then
> we
> >>>>>> shouldn't use
> >>>>>> any existing Windows-type classes at all, imo. ie we should create
> a new
> >>>>>> DistinctWindows config class, similar to how stream-stream joins
> get their
> >>>>>> own
> >>>>>> JoinWindows class
> >>>>>>
> >>>>>> I also think that non-windowed deduplication could be useful, in
> which case
> >>>>>> we
> >>>>>> would want to also have the distinct() operator on the KStream
> interface.
> >>>>>>
> >>>>>>
> >>>>>> One quick note regarding the naming: it seems like the Streams DSL
> operators
> >>>>>> are typically named as verbs rather than adjectives, for example.
> #suppress
> >>>>>> or
> >>>>>> #aggregate. I get that there's some precedent for  'distinct'
> specifically,
> >>>>>> but
> >>>>>> maybe something like 'deduplicate' would be more appropriate for
> the Streams
> >>>>>> API.
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
> <iponoma...@mail.ru.invalid>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks for your review! It made me think deeper, and indeed I
> understood
> >>>>>>> that I was missing some important details.
> >>>>>>>
> >>>>>>> To simplify, let me explain my particular use case first so I can
> refer
> >>>>>>> to it later.
> >>>>>>>
> >>>>>>> We have a system that collects information about ongoing live
> sporting
> >>>>>>> events from different sources. The information sources have their
> IDs
> >>>>>>> and these IDs are keys of the stream. Each source emits messages
> >>>>>>> concerning sporting events, and we can have many messages about
> each
> >>>>>>> sporing event from each source. Event ID is extracted from the
> message.
> >>>>>>>
> >>>>>>> We need a database of event IDs that were reported at least once
> by each
> >>>>>>> source (important: events from different sources are considered to
> be
> >>>>>>> different entities). The requirements are:
> >>>>>>>
> >>>>>>> 1) each new event ID should be written to the database as soon as
> possible
> >>>>>>>
> >>>>>>> 2) although it's ok and sometimes even desired to repeat the
> >>>>>>> notification about already known event ID, but we wouldn’t like our
> >>>>>>> database to be bothered by the same event ID more often than once
> in a
> >>>>>>> given period of time (say, 15 minutes).
> >>>>>>>
> >>>>>>> With this example in mind let me answer your questions
> >>>>>>>
> >>>>>>>     > (1) Using the `idExtractor` has the issue that data might
> not be
> >>>>>>>     > co-partitioned as you mentioned in the KIP. Thus, I am
> wondering if it
> >>>>>>>     > might be better to do deduplication only on the key? If one
> sets a new
> >>>>>>>     > key upstream (ie, extracts the deduplication id into the
> key), the
> >>>>>>>     > `distinct` operator could automatically repartition the data
> and thus we
> >>>>>>>     > would avoid user errors.
> >>>>>>>
> >>>>>>> Of course with 'key-only' deduplication + autorepartitioning we
> will
> >>>>>>> never cause problems with co-partitioning. But in practice, we
> often
> >>>>>>> don't need repartitioning even if 'dedup ID' is different from the
> key,
> >>>>>>> like in my example above. So here we have a sort of 'performance vs
> >>>>>>> security' tradeoff.
> >>>>>>>
> >>>>>>> The 'golden middle way' here can be the following: we can form a
> >>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case
> >>>>>>> idExtractor is not provided, we deduplicate by key only (as in
> original
> >>>>>>> proposal). Then idExtractor transforms only the value (and not the
> key)
> >>>>>>> and its result is appended to the key. Records from different
> partitions
> >>>>>>> will inherently have different deduplication IDs and all the data
> will
> >>>>>>> be co-partitioned. As with any stateful operation, we will
> repartition
> >>>>>>> the topic in case the key was changed upstream, but only in this
> case,
> >>>>>>> thus avoiding unnecessary repartitioning. My example above fits
> this
> >>>>>>> perfectly.
> >>>>>>>
> >>>>>>>     > (2) What is the motivation for allowing the `idExtractor` to
> return
> >>>>>>>     > `null`? Might be good to have some use-case examples for
> this feature.
> >>>>>>>
> >>>>>>> Can't think of any use-cases. As it often happens, it's just came
> with a
> >>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here:
> >>>>>>>
> >>>>>>>
> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> >>>>>>>
> >>>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we
> >>>>>>> accept the above proposal of having deduplication ID as KEY +
> postfix,
> >>>>>>> then null can be treated as no postfix at all. If we don't accept
> this
> >>>>>>> approach, then treating nulls as 'no-deduplication' seems to be a
> >>>>>>> reasonable assumption (we can't get or put null as a key to a KV
> store,
> >>>>>>> so a record with null ID is always going to look 'new' for us).
> >>>>>>>
> >>>>>>>
> >>>>>>>     > (2) Is using a `TimeWindow` really what we want? I was
> wondering if a
> >>>>>>>     > `SlidingWindow` might be better? Or maybe we need a new type
> of window?
> >>>>>>>
> >>>>>>> Agree. It's probably not what we want. Once I thought that reusing
> >>>>>>> TimeWindow is a clever idea, now I don't.
> >>>>>>>
> >>>>>>> Do we need epoch alignment in our use case? No, we don't, and I
> don't
> >>>>>>> know if anyone going to need this. Epoch alignment is good for
> >>>>>>> aggregation, but deduplication is a different story.
> >>>>>>>
> >>>>>>> Let me describe the semantic the way I see it now and tell me what
> you
> >>>>>>> think:
> >>>>>>>
> >>>>>>> - the only parameter that defines the deduplication logic is
> 'expiration
> >>>>>>> period'
> >>>>>>>
> >>>>>>> - when a deduplication ID arrives and we cannot find it in the
> store, we
> >>>>>>> forward the message downstream and store the ID + its timestamp.
> >>>>>>>
> >>>>>>> - when an out-of-order ID arrives with an older timestamp and we
> find a
> >>>>>>> 'fresher' record, we do nothing and don't forward the message (???
> OR
> >>>>>>> NOT? In what case would we want to forward an out-of-order
> message?)
> >>>>>>>
> >>>>>>> - when an ID with fresher timestamp arrives we check if it falls
> into
> >>>>>>> the expiration period and either forward it or not, but in both
> cases we
> >>>>>>> update the timestamp of the message in the store
> >>>>>>>
> >>>>>>> - the WindowStore retention mechanism should clean up very old
> records
> >>>>>>> in order not to run out of space.
> >>>>>>>
> >>>>>>>     > (3) `isPersistent` -- instead of using this flag, it seems
> better to
> >>>>>>>     > allow users to pass in a `Materialized` parameter next to
> >>>>>>>     > `DistinctParameters` to configure the state store?
> >>>>>>>
> >>>>>>> Fully agree! Users might also want to change the retention time.
> >>>>>>>
> >>>>>>>     > (4) I am wondering if we should really have 4 overloads for
> >>>>>>>     > `DistinctParameters.with()`? It might be better to have one
> overload
> >>>>>>>     > with all require parameters, and add optional parameters
> using the
> >>>>>>>     > builder pattern? This seems to follow the DSL Grammer
> proposal.
> >>>>>>>
> >>>>>>> Oh, I can explain. We can't fully rely on the builder pattern
> because of
> >>>>>>> Java type inference limitations. We have to provide type
> parameters to
> >>>>>>> the builder methods or the code won't compile: see e. g. this
> >>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and
> following
> >>>>>>> discussion with Tagir Valeev.
> >>>>>>>
> >>>>>>> When we came across the similar difficulties in KIP-418, we finally
> >>>>>>> decided to add all the necessary overloads to parameter class. So
> I just
> >>>>>>> reproduced that approach here.
> >>>>>>>
> >>>>>>>     > (5) Even if it might be an implementation detail (and maybe
> the KIP
> >>>>>>>     > itself does not need to mention it), can you give a high
> level overview
> >>>>>>>     > how you intent to implement it (that would be easier to
> grog, compared
> >>>>>>>     > to reading the PR).
> >>>>>>>
> >>>>>>> Well as with any operation on KStreamImpl level I'm building a
> store and
> >>>>>>> a processor node.
> >>>>>>>
> >>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with
> the
> >>>>>>> logic regarding the forwarding/muting of the records located in
> >>>>>>> KStreamDistinct.KStreamDistinctProcessor#process
> >>>>>>>
> >>>>>>> ----
> >>>>>>>
> >>>>>>> Matthias, if you are still reading this :-) a gentle reminder: my
> PR for
> >>>>>>> already accepted KIP-418 is still waiting for your review. I think
> it's
> >>>>>>> better for me to finalize at least one  KIP before proceeding to a
> new
> >>>>>>> one :-)
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>>
> >>>>>>> Ivan
> >>>>>>>
> >>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет:
> >>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication operator
> is for
> >>>>>>>> sure a good addition.
> >>>>>>>>
> >>>>>>>> Couple of questions:
> >>>>>>>>
> >>>>>>>> (1) Using the `idExtractor` has the issue that data might not be
> >>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am wondering
> if it
> >>>>>>>> might be better to do deduplication only on the key? If one sets
> a new
> >>>>>>>> key upstream (ie, extracts the deduplication id into the key), the
> >>>>>>>> `distinct` operator could automatically repartition the data and
> thus we
> >>>>>>>> would avoid user errors.
> >>>>>>>>
> >>>>>>>> (2) What is the motivation for allowing the `idExtractor` to
> return
> >>>>>>>> `null`? Might be good to have some use-case examples for this
> feature.
> >>>>>>>>
> >>>>>>>> (2) Is using a `TimeWindow` really what we want? I was wondering
> if a
> >>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of
> window?
> >>>>>>>>
> >>>>>>>> It would be helpful if you could describe potential use cases in
> more
> >>>>>>>> detail. -- I am mainly wondering about hopping window? Each
> record would
> >>>>>>>> always falls into multiple window and thus would be emitted
> multiple
> >>>>>>>> times, ie, each time the window closes. Is this really a valid
> use case?
> >>>>>>>>
> >>>>>>>> It seems that for de-duplication, one wants to have some
> "expiration
> >>>>>>>> time", ie, for each ID, deduplicate all consecutive records with
> the
> >>>>>>>> same ID and emit the first record after the "expiration time"
> passed. In
> >>>>>>>> terms of a window, this would mean that the window starts at
> `r.ts` and
> >>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the
> data.
> >>>>>>>> TimeWindows are aligned to the epoch though. While
> `SlidingWindows` also
> >>>>>>>> align to the data, for the aggregation use-case they go backward
> in
> >>>>>>>> time, while we need a window that goes forward in time. It's an
> open
> >>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be ok
> the
> >>>>>>>> make the alignment (into the past vs into the future) an operator
> >>>>>>>> dependent behavior?
> >>>>>>>>
> >>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems better
> to
> >>>>>>>> allow users to pass in a `Materialized` parameter next to
> >>>>>>>> `DistinctParameters` to configure the state store?
> >>>>>>>>
> >>>>>>>> (4) I am wondering if we should really have 4 overloads for
> >>>>>>>> `DistinctParameters.with()`? It might be better to have one
> overload
> >>>>>>>> with all require parameters, and add optional parameters using the
> >>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal.
> >>>>>>>>
> >>>>>>>> (5) Even if it might be an implementation detail (and maybe the
> KIP
> >>>>>>>> itself does not need to mention it), can you give a high level
> overview
> >>>>>>>> how you intent to implement it (that would be easier to grog,
> compared
> >>>>>>>> to reading the PR).
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> >>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
> >>>>>>>>>
> >>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
> >>>>>>>>>> Hello,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to start a discussion for KIP-655.
> >>>>>>>>>>
> >>>>>>>>>> KIP-655:
> >>>>>>>>>>
> >>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I also opened a proof-of-concept PR for you to experiment with
> the API:
> >>>>>>>>>>
> >>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>>
> >>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Reply via email to