Hi all, Bruno raised some very good points. I’d like to chime in with additional context.
1. Great point. We faced a similar problem defining KIP-557. For 557, we chose to use the serialized byte array instead of the equals() method, but I think the situation in KIP-655 is a bit different. I think it might make sense to use the equals() method here, but am curious what Ivan thinks. 2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't make a ton of sense to use it, which I agree with, but it doesn't seem like that means we should prohibit it. 3. FWIW, I don't have a strong feeling either way. Thanks, -John On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote: > Hi Ivan, > > Thank you for the KIP! > > Some aspects are not clear to me from the KIP and I have a proposal. > > 1. The KIP does not describe the criteria that define a duplicate. Could > you add a definition of duplicate to the KIP? > > 2. The KIP does not describe what happens if distinct() is applied on a > hopping window. On the DSL level, I do not see how you can avoid that > users apply distinct() on a hopping window, i.e., you cannot avoid it at > compile time, you need to check it at runtime and throw an exception. Is > this correct or am I missing something? > > 3. I would also like to back a proposal by Sophie. She proposed to use > deduplicate() instead of distinct(), since the other DSL operations are > also verbs. I do not think that SQL and the Java Stream API are good > arguments to not use a verb. > > Best, > Bruno > > > On 10.07.21 19:11, John Roesler wrote: > > Hi Ivan, > > > > Sorry for the silence! > > > > I have just re-read the proposal. > > > > To summarize, you are now only proposing the zero-arg distict() method to > > be added to TimeWindowedKStream and SessionWindowedKStream, right? > > > > I’m in favor of this proposal. > > > > Thanks, > > John > > > > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: > >> Hello everyone, > >> > >> I would like to remind you about KIP-655 and KIP-759 just in case they > >> got lost in your inbox. > >> > >> Now the initial proposal is split into two independent and smaller ones, > >> so it must be easier to review them. Of course, if you have time. > >> > >> Regards, > >> > >> Ivan > >> > >> > >> 24.06.2021 18:11, Ivan Ponomarev пишет: > >>> Hello all, > >>> > >>> I have rewritten the KIP-655 summarizing what was agreed upon during > >>> this discussion (now the proposal is much simpler and less invasive). > >>> > >>> I have also created KIP-759 (cancelRepartition operation) and started a > >>> discussion for it. > >>> > >>> Regards, > >>> > >>> Ivan. > >>> > >>> > >>> > >>> 04.06.2021 8:15, Matthias J. Sax пишет: > >>>> Just skimmed over the thread -- first of all, I am glad that we could > >>>> merge KIP-418 and ship it :) > >>>> > >>>> About the re-partitioning concerns, there are already two tickets for it: > >>>> > >>>> - https://issues.apache.org/jira/browse/KAFKA-4835 > >>>> - https://issues.apache.org/jira/browse/KAFKA-10844 > >>>> > >>>> Thus, it seems best to exclude this topic from this KIP, and do a > >>>> separate KIP for it (if necessary, we can "pause" this KIP until the > >>>> repartition KIP is done). It's a long standing "issue" and we should > >>>> resolve it in a general way I guess. > >>>> > >>>> (Did not yet ready all responses in detail yet, so keeping this comment > >>>> short.) > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 6/2/21 6:35 AM, John Roesler wrote: > >>>>> Thanks, Ivan! > >>>>> > >>>>> That sounds like a great plan to me. Two smaller KIPs are easier to > >>>>> agree on than one big one. > >>>>> > >>>>> I agree hopping and sliding windows will actually have a duplicating > >>>>> effect. We can avoid adding distinct() to the sliding window > >>>>> interface, but hopping windows are just a different parameterization > >>>>> of epoch-aligned windows. It seems we can’t do much about that except > >>>>> document the issue. > >>>>> > >>>>> Thanks, > >>>>> John > >>>>> > >>>>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: > >>>>>> Hi John! > >>>>>> > >>>>>> I think that your proposal is just fantastic, it simplifies things a > >>>>>> lot! > >>>>>> > >>>>>> I also felt uncomfortable due to the fact that the proposed > >>>>>> `distinct()` > >>>>>> is not somewhere near `count()` and `reduce(..)`. But > >>>>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like > >>>>>> a correct option for me because of the issue with the unneeded > >>>>>> repartitioning. > >>>>>> > >>>>>> The bold idea that we can just CANCEL the repartitioning didn't came to > >>>>>> my mind. > >>>>>> > >>>>>> What seemed to me a single problem is in fact two unrelated problems: > >>>>>> `distinct` operation and cancelling the unneeded repartitioning. > >>>>>> > >>>>>> > what if we introduce a parameter to `selectKey()` that specifies > >>>>>> that > >>>>>> the caller asserts that the new key does _not_ change the data > >>>>>> partitioning? > >>>>>> > >>>>>> I think a more elegant solution would be not to add a new parameter to > >>>>>> `selectKey` and all the other key-changing operations (`map`, > >>>>>> `transform`, `flatMap`, ...), but add a new operator > >>>>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` > >>>>>> flag > >>>>>> for the upstream node. Of course, "use it only if you know what you're > >>>>>> doing" warning is to be added. Well, it's a topic for a separate KIP! > >>>>>> > >>>>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, > >>>>>> then > >>>>>> changes to the API are minimally invasive: we're just adding > >>>>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and > >>>>>> that's all. > >>>>>> > >>>>>> We can now define `distinct` as an operation that returns only a first > >>>>>> record that falls into a new window, and filters out all the other > >>>>>> records that fall into an already existing window. BTW, we can mock the > >>>>>> behaviour of such an operation with `TopologyTestDriver` using > >>>>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) > >>>>>> > >>>>>> Consider the following example (record times are in seconds): > >>>>>> > >>>>>> //three bursts of variously ordered records > >>>>>> 4, 5, 6 > >>>>>> 23, 22, 24 > >>>>>> 34, 33, 32 > >>>>>> //'late arrivals' > >>>>>> 7, 22, 35 > >>>>>> > >>>>>> > >>>>>> 1. 'Epoch-aligned deduplication' using tumbling windows: > >>>>>> > >>>>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() > >>>>>> > >>>>>> > >>>>>> produces > >>>>>> > >>>>>> (key@[00000/10000], 4) > >>>>>> (key@[20000/30000], 23) > >>>>>> (key@[30000/40000], 34) > >>>>>> > >>>>>> -- that is, one record per epoch-aligned window. > >>>>>> > >>>>>> 2. Hopping and sliding windows do not make much sense here, because > >>>>>> they > >>>>>> produce multiple intersected windows, so that one record can be > >>>>>> multiplied, but we want deduplication. > >>>>>> > >>>>>> 3. SessionWindows work for 'data-aligned deduplication'. > >>>>>> > >>>>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() > >>>>>> > >>>>>> > >>>>>> > >>>>>> produces only > >>>>>> > >>>>>> ([key@4000/4000], 4) > >>>>>> ([key@23000/23000], 23) > >>>>>> > >>>>>> because all the records bigger than 7 are stuck together in one > >>>>>> session. > >>>>>> Setting inactivity gap to 9 seconds will return three records: > >>>>>> > >>>>>> ([key@4000/4000], 4) > >>>>>> ([key@23000/23000], 23) > >>>>>> ([key@34000/34000], 34) > >>>>>> > >>>>>> WDYT? If you like this variant, I will re-write KIP-655 and propose a > >>>>>> separate KIP for `cancelRepartitioning` (or whatever name we will > >>>>>> choose > >>>>>> for it). > >>>>>> > >>>>>> Regards, > >>>>>> > >>>>>> Ivan > >>>>>> > >>>>>> > >>>>>> 24.05.2021 22:32, John Roesler пишет: > >>>>>>> Hey there, Ivan! > >>>>>>> > >>>>>>> In typical fashion, I'm going to make a somewhat outlandish > >>>>>>> proposal. I'm hoping that we can side-step some of the > >>>>>>> complications that have arisen. Please bear with me. > >>>>>>> > >>>>>>> It seems like `distinct()` is not fundamentally unlike other windowed > >>>>>>> "aggregation" operations. Your concern about unnecessary > >>>>>>> repartitioning seems to apply just as well to `count()` as to > >>>>>>> `distinct()`. > >>>>>>> This has come up before, but I don't remember when: what if we > >>>>>>> introduce a parameter to `selectKey()` that specifies that the caller > >>>>>>> asserts that the new key does _not_ change the data partitioning? > >>>>>>> The docs on that parameter would of course spell out all the "rights > >>>>>>> and responsibilities" of setting it. > >>>>>>> > >>>>>>> In that case, we could indeed get back to > >>>>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the > >>>>>>> key mapper and the windowing function without having to carve out > >>>>>>> a separate domain just for `distinct()`. All the rest of the KStream > >>>>>>> operations would also benefit. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> John > >>>>>>> > >>>>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: > >>>>>>>> Hello everyone, > >>>>>>>> > >>>>>>>> let me revive the discussion for KIP-655. Now I have some time > >>>>>>>> again and > >>>>>>>> I'm eager to finalize this. > >>>>>>>> > >>>>>>>> Based on what was already discussed, I think that we can split the > >>>>>>>> discussion into three topics for our convenience. > >>>>>>>> > >>>>>>>> The three topics are: > >>>>>>>> > >>>>>>>> - idExtractor (how should we extract the deduplication key for > >>>>>>>> the record) > >>>>>>>> > >>>>>>>> - timeWindows (what time windows should we use) > >>>>>>>> > >>>>>>>> - miscellaneous (naming etc.) > >>>>>>>> > >>>>>>>> ---- idExtractor ---- > >>>>>>>> > >>>>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, > >>>>>>>> v) -> > >>>>>>>> k. The drawback here is that we must warn the user to choose such a > >>>>>>>> function that sets different IDs for records from different > >>>>>>>> partitions, > >>>>>>>> otherwise same IDs might be not co-partitioned (and not > >>>>>>>> deduplicated as > >>>>>>>> a result). Additional concern: what should we do when this function > >>>>>>>> returns null? > >>>>>>>> > >>>>>>>> Matthias proposed key-only deduplication: that is, no idExtractor at > >>>>>>>> all, and if we want to use `distinct` for a particular identifier, we > >>>>>>>> must `selectKey()` before. The drawback of this approach is that > >>>>>>>> we will > >>>>>>>> always have repartitioning after the key selection, while in practice > >>>>>>>> repartitioning will not always be necessary (for example, when the > >>>>>>>> data > >>>>>>>> stream is such that different values infer different keys). > >>>>>>>> > >>>>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe' > >>>>>>>> variant > >>>>>>>> is also not very convenient for developers, since we're forcing > >>>>>>>> them to > >>>>>>>> change the structure of their records. > >>>>>>>> > >>>>>>>> A 'golden mean' here might be using composite ID with its first > >>>>>>>> component equals to k and its second component equals to some f(v) (f > >>>>>>>> defaults to v -> null, and null value returned by f(v) means > >>>>>>>> 'deduplicate by the key only'). The nuance here is that we will have > >>>>>>>> serializers only for types of k and f(v), and we must correctly > >>>>>>>> serialize a tuple (k, f(v)), but of course this is doable. > >>>>>>>> > >>>>>>>> What do you think? > >>>>>>>> > >>>>>>>> ---- timeWindows ---- > >>>>>>>> > >>>>>>>> Originally I proposed TimeWindows only just because they solved my > >>>>>>>> particular case :-) but agree with Matthias' and Sophie's objections. > >>>>>>>> > >>>>>>>> I like the Sophie's point: we need both epoch-aligned and > >>>>>>>> data-aligned > >>>>>>>> windows. IMO this is absolutely correct: "data-aligned is useful for > >>>>>>>> example when you know that a large number of updates to a single key > >>>>>>>> will occur in short bursts, and epoch-aligned when you > >>>>>>>> specifically want > >>>>>>>> to get just a single update per discrete time interval." > >>>>>>>> > >>>>>>>> I just cannot agree right away with Sophie's > >>>>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies the > >>>>>>>> key-only deduplication -- see the previous topic. > >>>>>>>> > >>>>>>>> Epoch-aligned windows are very simple: they should forward only one > >>>>>>>> record per enumerated time window. TimeWindows are exactly what we > >>>>>>>> want > >>>>>>>> here. I mentioned in the KIP both tumbling and hopping windows just > >>>>>>>> because both are possible for TimeWindows, but indeed I don't see any > >>>>>>>> real use case for hopping windows, only tumbling windows make > >>>>>>>> sence IMO. > >>>>>>>> > >>>>>>>> For data-aligned windows SlidingWindow interface seems to be a nearly > >>>>>>>> valid choice. Nearly. It should forward a record once when it's first > >>>>>>>> seen, and then not again for any identical records that fall into the > >>>>>>>> next N timeUnits. However, we cannot reuse SlidingWindow as is, > >>>>>>>> because > >>>>>>>> just as Matthias noted, SlidingWindows go backward in time, while we > >>>>>>>> need a windows that go forward in time, and are not opened while > >>>>>>>> records > >>>>>>>> fall into an already existing window. We definitely should make > >>>>>>>> our own > >>>>>>>> implementation, maybe we should call it ExpirationWindow? WDYT? > >>>>>>>> > >>>>>>>> > >>>>>>>> ---- miscellaneous ---- > >>>>>>>> > >>>>>>>> Persistent/in-memory stores. Matthias proposed to pass Materialized > >>>>>>>> parameter next to DistinctParameters (and this is necessary, > >>>>>>>> because we > >>>>>>>> will need to provide a serializer for extracted id). This is > >>>>>>>> absolutely > >>>>>>>> valid point, I agree and I will fix it in the KIP. > >>>>>>>> > >>>>>>>> Naming. Sophie noted that the Streams DSL operators are typically > >>>>>>>> named > >>>>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. I > >>>>>>>> think > >>>>>>>> that while it's important to stick to the naming conventions, it > >>>>>>>> is also > >>>>>>>> important to think of the experience of those who come from different > >>>>>>>> stacks/technologies. People who are familiar with SQL and Java > >>>>>>>> Streams > >>>>>>>> API must know for sure what does 'distinct' mean, while data > >>>>>>>> deduplication in general is a more complex task and thus > >>>>>>>> `deduplicate` > >>>>>>>> might be misleading. But I'm ready to be convinced if the majority > >>>>>>>> thinks otherwise. > >>>>>>>> > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> > >>>>>>>> Ivan > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет: > >>>>>>>>> Hey all, > >>>>>>>>> > >>>>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit all > >>>>>>>>> possible use cases. > >>>>>>>>> Both seem totally reasonable to me: data-aligned is useful for > >>>>>>>>> example when > >>>>>>>>> you know > >>>>>>>>> that a large number of updates to a single key will occur in > >>>>>>>>> short bursts, > >>>>>>>>> and epoch- > >>>>>>>>> aligned when you specifically want to get just a single update > >>>>>>>>> per discrete > >>>>>>>>> time > >>>>>>>>> interval. > >>>>>>>>> > >>>>>>>>> Going a step further, though, what if you want just a single > >>>>>>>>> update per > >>>>>>>>> calendar > >>>>>>>>> month, or per year with accounting for leap years? Neither of > >>>>>>>>> those are > >>>>>>>>> serviced that > >>>>>>>>> well by the existing Windows specification to windowed > >>>>>>>>> aggregations, a > >>>>>>>>> well-known > >>>>>>>>> limitation of the current API. There is actually a KIP > >>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> > >>>>>>>>> > >>>>>>>>> going > >>>>>>>>> on in parallel to fix this > >>>>>>>>> exact issue and make the windowing interface much more flexible. > >>>>>>>>> Maybe > >>>>>>>>> instead > >>>>>>>>> of re-implementing this windowing interface in a similarly > >>>>>>>>> limited fashion > >>>>>>>>> for the > >>>>>>>>> Distinct operator, we could leverage it here and get all the > >>>>>>>>> benefits > >>>>>>>>> coming with > >>>>>>>>> KIP-645. > >>>>>>>>> > >>>>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config > >>>>>>>>> from the > >>>>>>>>> DistinctParameters class, and move the distinct() method from the > >>>>>>>>> KStream > >>>>>>>>> interface > >>>>>>>>> to the TimeWindowedKStream interface. Since it's semantically > >>>>>>>>> similar to a > >>>>>>>>> kind of > >>>>>>>>> windowed aggregation, it makes sense to align it with the > >>>>>>>>> existing windowing > >>>>>>>>> framework, ie: > >>>>>>>>> > >>>>>>>>> inputStream > >>>>>>>>> .groupKyKey() > >>>>>>>>> .windowedBy() > >>>>>>>>> .distinct() > >>>>>>>>> > >>>>>>>>> Then we could use data-aligned windows if SlidingWindows is > >>>>>>>>> specified in > >>>>>>>>> the > >>>>>>>>> windowedBy(), and epoch-aligned (or some other kind of enumerable > >>>>>>>>> window) > >>>>>>>>> if a Windows is specified in windowedBy() (or an > >>>>>>>>> EnumerableWindowDefinition > >>>>>>>>> once KIP-645 is implemented to replace Windows). > >>>>>>>>> > >>>>>>>>> *SlidingWindows*: should forward a record once when it's first > >>>>>>>>> seen, and > >>>>>>>>> then not again > >>>>>>>>> for any identical records that fall into the next N timeUnits. This > >>>>>>>>> includes out-of-order > >>>>>>>>> records, ie if you have a SlidingWindows of size 10s and process > >>>>>>>>> records at > >>>>>>>>> time > >>>>>>>>> 15s, 20s, 14s then you would just forward the one at 15s. > >>>>>>>>> Presumably, if > >>>>>>>>> you're > >>>>>>>>> using SlidingWindows, you don't care about what falls into exact > >>>>>>>>> time > >>>>>>>>> boxes, you just > >>>>>>>>> want to deduplicate. If you do care about exact time boxing then > >>>>>>>>> you should > >>>>>>>>> use... > >>>>>>>>> > >>>>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward > >>>>>>>>> only one > >>>>>>>>> record > >>>>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s > >>>>>>>>> where the > >>>>>>>>> windows > >>>>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the > >>>>>>>>> record at 15s > >>>>>>>>> and also > >>>>>>>>> the record at 14s > >>>>>>>>> > >>>>>>>>> Just an idea: not sure if the impedance mismatch would throw > >>>>>>>>> users off > >>>>>>>>> since the > >>>>>>>>> semantics of the distinct windows are slightly different than in the > >>>>>>>>> aggregations. > >>>>>>>>> But if we don't fit this into the existing windowed framework, > >>>>>>>>> then we > >>>>>>>>> shouldn't use > >>>>>>>>> any existing Windows-type classes at all, imo. ie we should > >>>>>>>>> create a new > >>>>>>>>> DistinctWindows config class, similar to how stream-stream joins > >>>>>>>>> get their > >>>>>>>>> own > >>>>>>>>> JoinWindows class > >>>>>>>>> > >>>>>>>>> I also think that non-windowed deduplication could be useful, in > >>>>>>>>> which case > >>>>>>>>> we > >>>>>>>>> would want to also have the distinct() operator on the KStream > >>>>>>>>> interface. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> One quick note regarding the naming: it seems like the Streams > >>>>>>>>> DSL operators > >>>>>>>>> are typically named as verbs rather than adjectives, for example. > >>>>>>>>> #suppress > >>>>>>>>> or > >>>>>>>>> #aggregate. I get that there's some precedent for 'distinct' > >>>>>>>>> specifically, > >>>>>>>>> but > >>>>>>>>> maybe something like 'deduplicate' would be more appropriate for > >>>>>>>>> the Streams > >>>>>>>>> API. > >>>>>>>>> > >>>>>>>>> WDYT? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev > >>>>>>>>> <iponoma...@mail.ru.invalid> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Matthias, > >>>>>>>>>> > >>>>>>>>>> Thanks for your review! It made me think deeper, and indeed I > >>>>>>>>>> understood > >>>>>>>>>> that I was missing some important details. > >>>>>>>>>> > >>>>>>>>>> To simplify, let me explain my particular use case first so I > >>>>>>>>>> can refer > >>>>>>>>>> to it later. > >>>>>>>>>> > >>>>>>>>>> We have a system that collects information about ongoing live > >>>>>>>>>> sporting > >>>>>>>>>> events from different sources. The information sources have > >>>>>>>>>> their IDs > >>>>>>>>>> and these IDs are keys of the stream. Each source emits messages > >>>>>>>>>> concerning sporting events, and we can have many messages about > >>>>>>>>>> each > >>>>>>>>>> sporing event from each source. Event ID is extracted from the > >>>>>>>>>> message. > >>>>>>>>>> > >>>>>>>>>> We need a database of event IDs that were reported at least once > >>>>>>>>>> by each > >>>>>>>>>> source (important: events from different sources are considered > >>>>>>>>>> to be > >>>>>>>>>> different entities). The requirements are: > >>>>>>>>>> > >>>>>>>>>> 1) each new event ID should be written to the database as soon > >>>>>>>>>> as possible > >>>>>>>>>> > >>>>>>>>>> 2) although it's ok and sometimes even desired to repeat the > >>>>>>>>>> notification about already known event ID, but we wouldn’t like our > >>>>>>>>>> database to be bothered by the same event ID more often than > >>>>>>>>>> once in a > >>>>>>>>>> given period of time (say, 15 minutes). > >>>>>>>>>> > >>>>>>>>>> With this example in mind let me answer your questions > >>>>>>>>>> > >>>>>>>>>> > (1) Using the `idExtractor` has the issue that data might > >>>>>>>>>> not be > >>>>>>>>>> > co-partitioned as you mentioned in the KIP. Thus, I am > >>>>>>>>>> wondering if it > >>>>>>>>>> > might be better to do deduplication only on the key? If > >>>>>>>>>> one sets a new > >>>>>>>>>> > key upstream (ie, extracts the deduplication id into the > >>>>>>>>>> key), the > >>>>>>>>>> > `distinct` operator could automatically repartition the > >>>>>>>>>> data and thus we > >>>>>>>>>> > would avoid user errors. > >>>>>>>>>> > >>>>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we > >>>>>>>>>> will > >>>>>>>>>> never cause problems with co-partitioning. But in practice, we > >>>>>>>>>> often > >>>>>>>>>> don't need repartitioning even if 'dedup ID' is different from > >>>>>>>>>> the key, > >>>>>>>>>> like in my example above. So here we have a sort of 'performance vs > >>>>>>>>>> security' tradeoff. > >>>>>>>>>> > >>>>>>>>>> The 'golden middle way' here can be the following: we can form a > >>>>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case > >>>>>>>>>> idExtractor is not provided, we deduplicate by key only (as in > >>>>>>>>>> original > >>>>>>>>>> proposal). Then idExtractor transforms only the value (and not > >>>>>>>>>> the key) > >>>>>>>>>> and its result is appended to the key. Records from different > >>>>>>>>>> partitions > >>>>>>>>>> will inherently have different deduplication IDs and all the > >>>>>>>>>> data will > >>>>>>>>>> be co-partitioned. As with any stateful operation, we will > >>>>>>>>>> repartition > >>>>>>>>>> the topic in case the key was changed upstream, but only in this > >>>>>>>>>> case, > >>>>>>>>>> thus avoiding unnecessary repartitioning. My example above fits > >>>>>>>>>> this > >>>>>>>>>> perfectly. > >>>>>>>>>> > >>>>>>>>>> > (2) What is the motivation for allowing the `idExtractor` > >>>>>>>>>> to return > >>>>>>>>>> > `null`? Might be good to have some use-case examples for > >>>>>>>>>> this feature. > >>>>>>>>>> > >>>>>>>>>> Can't think of any use-cases. As it often happens, it's just > >>>>>>>>>> came with a > >>>>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here: > >>>>>>>>>> > >>>>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> But, jokes aside, we'll have to decide what to do with nulls. If we > >>>>>>>>>> accept the above proposal of having deduplication ID as KEY + > >>>>>>>>>> postfix, > >>>>>>>>>> then null can be treated as no postfix at all. If we don't > >>>>>>>>>> accept this > >>>>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be a > >>>>>>>>>> reasonable assumption (we can't get or put null as a key to a KV > >>>>>>>>>> store, > >>>>>>>>>> so a record with null ID is always going to look 'new' for us). > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > (2) Is using a `TimeWindow` really what we want? I was > >>>>>>>>>> wondering if a > >>>>>>>>>> > `SlidingWindow` might be better? Or maybe we need a new > >>>>>>>>>> type of window? > >>>>>>>>>> > >>>>>>>>>> Agree. It's probably not what we want. Once I thought that reusing > >>>>>>>>>> TimeWindow is a clever idea, now I don't. > >>>>>>>>>> > >>>>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I > >>>>>>>>>> don't > >>>>>>>>>> know if anyone going to need this. Epoch alignment is good for > >>>>>>>>>> aggregation, but deduplication is a different story. > >>>>>>>>>> > >>>>>>>>>> Let me describe the semantic the way I see it now and tell me > >>>>>>>>>> what you > >>>>>>>>>> think: > >>>>>>>>>> > >>>>>>>>>> - the only parameter that defines the deduplication logic is > >>>>>>>>>> 'expiration > >>>>>>>>>> period' > >>>>>>>>>> > >>>>>>>>>> - when a deduplication ID arrives and we cannot find it in the > >>>>>>>>>> store, we > >>>>>>>>>> forward the message downstream and store the ID + its timestamp. > >>>>>>>>>> > >>>>>>>>>> - when an out-of-order ID arrives with an older timestamp and we > >>>>>>>>>> find a > >>>>>>>>>> 'fresher' record, we do nothing and don't forward the message > >>>>>>>>>> (??? OR > >>>>>>>>>> NOT? In what case would we want to forward an out-of-order > >>>>>>>>>> message?) > >>>>>>>>>> > >>>>>>>>>> - when an ID with fresher timestamp arrives we check if it falls > >>>>>>>>>> into > >>>>>>>>>> the expiration period and either forward it or not, but in both > >>>>>>>>>> cases we > >>>>>>>>>> update the timestamp of the message in the store > >>>>>>>>>> > >>>>>>>>>> - the WindowStore retention mechanism should clean up very old > >>>>>>>>>> records > >>>>>>>>>> in order not to run out of space. > >>>>>>>>>> > >>>>>>>>>> > (3) `isPersistent` -- instead of using this flag, it seems > >>>>>>>>>> better to > >>>>>>>>>> > allow users to pass in a `Materialized` parameter next to > >>>>>>>>>> > `DistinctParameters` to configure the state store? > >>>>>>>>>> > >>>>>>>>>> Fully agree! Users might also want to change the retention time. > >>>>>>>>>> > >>>>>>>>>> > (4) I am wondering if we should really have 4 overloads for > >>>>>>>>>> > `DistinctParameters.with()`? It might be better to have > >>>>>>>>>> one overload > >>>>>>>>>> > with all require parameters, and add optional parameters > >>>>>>>>>> using the > >>>>>>>>>> > builder pattern? This seems to follow the DSL Grammer > >>>>>>>>>> proposal. > >>>>>>>>>> > >>>>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern > >>>>>>>>>> because of > >>>>>>>>>> Java type inference limitations. We have to provide type > >>>>>>>>>> parameters to > >>>>>>>>>> the builder methods or the code won't compile: see e. g. this > >>>>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and > >>>>>>>>>> following > >>>>>>>>>> discussion with Tagir Valeev. > >>>>>>>>>> > >>>>>>>>>> When we came across the similar difficulties in KIP-418, we finally > >>>>>>>>>> decided to add all the necessary overloads to parameter class. > >>>>>>>>>> So I just > >>>>>>>>>> reproduced that approach here. > >>>>>>>>>> > >>>>>>>>>> > (5) Even if it might be an implementation detail (and > >>>>>>>>>> maybe the KIP > >>>>>>>>>> > itself does not need to mention it), can you give a high > >>>>>>>>>> level overview > >>>>>>>>>> > how you intent to implement it (that would be easier to > >>>>>>>>>> grog, compared > >>>>>>>>>> > to reading the PR). > >>>>>>>>>> > >>>>>>>>>> Well as with any operation on KStreamImpl level I'm building a > >>>>>>>>>> store and > >>>>>>>>>> a processor node. > >>>>>>>>>> > >>>>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, with > >>>>>>>>>> the > >>>>>>>>>> logic regarding the forwarding/muting of the records located in > >>>>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process > >>>>>>>>>> > >>>>>>>>>> ---- > >>>>>>>>>> > >>>>>>>>>> Matthias, if you are still reading this :-) a gentle reminder: > >>>>>>>>>> my PR for > >>>>>>>>>> already accepted KIP-418 is still waiting for your review. I > >>>>>>>>>> think it's > >>>>>>>>>> better for me to finalize at least one KIP before proceeding to > >>>>>>>>>> a new > >>>>>>>>>> one :-) > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> > >>>>>>>>>> Ivan > >>>>>>>>>> > >>>>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет: > >>>>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication > >>>>>>>>>>> operator is for > >>>>>>>>>>> sure a good addition. > >>>>>>>>>>> > >>>>>>>>>>> Couple of questions: > >>>>>>>>>>> > >>>>>>>>>>> (1) Using the `idExtractor` has the issue that data might not be > >>>>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am > >>>>>>>>>>> wondering if it > >>>>>>>>>>> might be better to do deduplication only on the key? If one > >>>>>>>>>>> sets a new > >>>>>>>>>>> key upstream (ie, extracts the deduplication id into the key), the > >>>>>>>>>>> `distinct` operator could automatically repartition the data > >>>>>>>>>>> and thus we > >>>>>>>>>>> would avoid user errors. > >>>>>>>>>>> > >>>>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to > >>>>>>>>>>> return > >>>>>>>>>>> `null`? Might be good to have some use-case examples for this > >>>>>>>>>>> feature. > >>>>>>>>>>> > >>>>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was > >>>>>>>>>>> wondering if a > >>>>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type of > >>>>>>>>>>> window? > >>>>>>>>>>> > >>>>>>>>>>> It would be helpful if you could describe potential use cases > >>>>>>>>>>> in more > >>>>>>>>>>> detail. -- I am mainly wondering about hopping window? Each > >>>>>>>>>>> record would > >>>>>>>>>>> always falls into multiple window and thus would be emitted > >>>>>>>>>>> multiple > >>>>>>>>>>> times, ie, each time the window closes. Is this really a valid > >>>>>>>>>>> use case? > >>>>>>>>>>> > >>>>>>>>>>> It seems that for de-duplication, one wants to have some > >>>>>>>>>>> "expiration > >>>>>>>>>>> time", ie, for each ID, deduplicate all consecutive records > >>>>>>>>>>> with the > >>>>>>>>>>> same ID and emit the first record after the "expiration time" > >>>>>>>>>>> passed. In > >>>>>>>>>>> terms of a window, this would mean that the window starts at > >>>>>>>>>>> `r.ts` and > >>>>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the > >>>>>>>>>>> data. > >>>>>>>>>>> TimeWindows are aligned to the epoch though. While > >>>>>>>>>>> `SlidingWindows` also > >>>>>>>>>>> align to the data, for the aggregation use-case they go > >>>>>>>>>>> backward in > >>>>>>>>>>> time, while we need a window that goes forward in time. It's an > >>>>>>>>>>> open > >>>>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be > >>>>>>>>>>> ok the > >>>>>>>>>>> make the alignment (into the past vs into the future) an operator > >>>>>>>>>>> dependent behavior? > >>>>>>>>>>> > >>>>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems > >>>>>>>>>>> better to > >>>>>>>>>>> allow users to pass in a `Materialized` parameter next to > >>>>>>>>>>> `DistinctParameters` to configure the state store? > >>>>>>>>>>> > >>>>>>>>>>> (4) I am wondering if we should really have 4 overloads for > >>>>>>>>>>> `DistinctParameters.with()`? It might be better to have one > >>>>>>>>>>> overload > >>>>>>>>>>> with all require parameters, and add optional parameters using the > >>>>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal. > >>>>>>>>>>> > >>>>>>>>>>> (5) Even if it might be an implementation detail (and maybe the > >>>>>>>>>>> KIP > >>>>>>>>>>> itself does not need to mention it), can you give a high level > >>>>>>>>>>> overview > >>>>>>>>>>> how you intent to implement it (that would be easier to grog, > >>>>>>>>>>> compared > >>>>>>>>>>> to reading the PR). > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: > >>>>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic > >>>>>>>>>>>> > >>>>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: > >>>>>>>>>>>>> Hello, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'd like to start a discussion for KIP-655. > >>>>>>>>>>>>> > >>>>>>>>>>>>> KIP-655: > >>>>>>>>>>>>> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > >>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment > >>>>>>>>>>>>> with the API: > >>>>>>>>>>>>> > >>>>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Ivan Ponomarev > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>> > >> > >> >