Hello Ivan, These are the two proposal you have shared with us
KIP-655: Windowed Distinct Operation for Kafka Streams API https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API KIP-759: Unneeded repartition canceling https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling The first proposal makes sense to introduce the distinct() method and it appears the second proposal is needed to efficiently implement the first one. I will need to spend more time to analyze and understand better the second KIP but I agree with and support the first one right away. Thanks Ivan for taking the time document and communicate this with the project. Sincerely, Israel On Sat, Jul 10, 2021 at 1:11 PM John Roesler <vvcep...@apache.org> wrote: > Hi Ivan, > > Sorry for the silence! > > I have just re-read the proposal. > > To summarize, you are now only proposing the zero-arg distict() method to > be added to TimeWindowedKStream and SessionWindowedKStream, right? > > I’m in favor of this proposal. > > Thanks, > John > > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: > > Hello everyone, > > > > I would like to remind you about KIP-655 and KIP-759 just in case they > > got lost in your inbox. > > > > Now the initial proposal is split into two independent and smaller ones, > > so it must be easier to review them. Of course, if you have time. > > > > Regards, > > > > Ivan > > > > > > 24.06.2021 18:11, Ivan Ponomarev пишет: > > > Hello all, > > > > > > I have rewritten the KIP-655 summarizing what was agreed upon during > > > this discussion (now the proposal is much simpler and less invasive). > > > > > > I have also created KIP-759 (cancelRepartition operation) and started > a > > > discussion for it. > > > > > > Regards, > > > > > > Ivan. > > > > > > > > > > > > 04.06.2021 8:15, Matthias J. Sax пишет: > > >> Just skimmed over the thread -- first of all, I am glad that we could > > >> merge KIP-418 and ship it :) > > >> > > >> About the re-partitioning concerns, there are already two tickets for > it: > > >> > > >> - https://issues.apache.org/jira/browse/KAFKA-4835 > > >> - https://issues.apache.org/jira/browse/KAFKA-10844 > > >> > > >> Thus, it seems best to exclude this topic from this KIP, and do a > > >> separate KIP for it (if necessary, we can "pause" this KIP until the > > >> repartition KIP is done). It's a long standing "issue" and we should > > >> resolve it in a general way I guess. > > >> > > >> (Did not yet ready all responses in detail yet, so keeping this > comment > > >> short.) > > >> > > >> > > >> -Matthias > > >> > > >> On 6/2/21 6:35 AM, John Roesler wrote: > > >>> Thanks, Ivan! > > >>> > > >>> That sounds like a great plan to me. Two smaller KIPs are easier to > > >>> agree on than one big one. > > >>> > > >>> I agree hopping and sliding windows will actually have a duplicating > > >>> effect. We can avoid adding distinct() to the sliding window > > >>> interface, but hopping windows are just a different parameterization > > >>> of epoch-aligned windows. It seems we can’t do much about that > except > > >>> document the issue. > > >>> > > >>> Thanks, > > >>> John > > >>> > > >>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: > > >>>> Hi John! > > >>>> > > >>>> I think that your proposal is just fantastic, it simplifies things > a > > >>>> lot! > > >>>> > > >>>> I also felt uncomfortable due to the fact that the proposed > > >>>> `distinct()` > > >>>> is not somewhere near `count()` and `reduce(..)`. But > > >>>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look > like > > >>>> a correct option for me because of the issue with the unneeded > > >>>> repartitioning. > > >>>> > > >>>> The bold idea that we can just CANCEL the repartitioning didn't > came to > > >>>> my mind. > > >>>> > > >>>> What seemed to me a single problem is in fact two unrelated > problems: > > >>>> `distinct` operation and cancelling the unneeded repartitioning. > > >>>> > > >>>> > what if we introduce a parameter to `selectKey()` that > specifies > > >>>> that > > >>>> the caller asserts that the new key does _not_ change the data > > >>>> partitioning? > > >>>> > > >>>> I think a more elegant solution would be not to add a new parameter > to > > >>>> `selectKey` and all the other key-changing operations (`map`, > > >>>> `transform`, `flatMap`, ...), but add a new operator > > >>>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` > > >>>> flag > > >>>> for the upstream node. Of course, "use it only if you know what > you're > > >>>> doing" warning is to be added. Well, it's a topic for a separate > KIP! > > >>>> > > >>>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, > > >>>> then > > >>>> changes to the API are minimally invasive: we're just adding > > >>>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and > > >>>> that's all. > > >>>> > > >>>> We can now define `distinct` as an operation that returns only a > first > > >>>> record that falls into a new window, and filters out all the other > > >>>> records that fall into an already existing window. BTW, we can mock > the > > >>>> behaviour of such an operation with `TopologyTestDriver` using > > >>>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) > > >>>> > > >>>> Consider the following example (record times are in seconds): > > >>>> > > >>>> //three bursts of variously ordered records > > >>>> 4, 5, 6 > > >>>> 23, 22, 24 > > >>>> 34, 33, 32 > > >>>> //'late arrivals' > > >>>> 7, 22, 35 > > >>>> > > >>>> > > >>>> 1. 'Epoch-aligned deduplication' using tumbling windows: > > >>>> > > >>>> > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() > > >>>> > > >>>> > > >>>> produces > > >>>> > > >>>> (key@[00000/10000], 4) > > >>>> (key@[20000/30000], 23) > > >>>> (key@[30000/40000], 34) > > >>>> > > >>>> -- that is, one record per epoch-aligned window. > > >>>> > > >>>> 2. Hopping and sliding windows do not make much sense here, because > > >>>> they > > >>>> produce multiple intersected windows, so that one record can be > > >>>> multiplied, but we want deduplication. > > >>>> > > >>>> 3. SessionWindows work for 'data-aligned deduplication'. > > >>>> > > >>>> > .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() > > > >>>> > > >>>> > > >>>> > > >>>> produces only > > >>>> > > >>>> ([key@4000/4000], 4) > > >>>> ([key@23000/23000], 23) > > >>>> > > >>>> because all the records bigger than 7 are stuck together in one > > >>>> session. > > >>>> Setting inactivity gap to 9 seconds will return three records: > > >>>> > > >>>> ([key@4000/4000], 4) > > >>>> ([key@23000/23000], 23) > > >>>> ([key@34000/34000], 34) > > >>>> > > >>>> WDYT? If you like this variant, I will re-write KIP-655 and propose > a > > >>>> separate KIP for `cancelRepartitioning` (or whatever name we will > > >>>> choose > > >>>> for it). > > >>>> > > >>>> Regards, > > >>>> > > >>>> Ivan > > >>>> > > >>>> > > >>>> 24.05.2021 22:32, John Roesler пишет: > > >>>>> Hey there, Ivan! > > >>>>> > > >>>>> In typical fashion, I'm going to make a somewhat outlandish > > >>>>> proposal. I'm hoping that we can side-step some of the > > >>>>> complications that have arisen. Please bear with me. > > >>>>> > > >>>>> It seems like `distinct()` is not fundamentally unlike other > windowed > > >>>>> "aggregation" operations. Your concern about unnecessary > > >>>>> repartitioning seems to apply just as well to `count()` as to > > >>>>> `distinct()`. > > >>>>> This has come up before, but I don't remember when: what if we > > >>>>> introduce a parameter to `selectKey()` that specifies that the > caller > > >>>>> asserts that the new key does _not_ change the data partitioning? > > >>>>> The docs on that parameter would of course spell out all the > "rights > > >>>>> and responsibilities" of setting it. > > >>>>> > > >>>>> In that case, we could indeed get back to > > >>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to compose > the > > >>>>> key mapper and the windowing function without having to carve out > > >>>>> a separate domain just for `distinct()`. All the rest of the > KStream > > >>>>> operations would also benefit. > > >>>>> > > >>>>> What do you think? > > >>>>> > > >>>>> Thanks, > > >>>>> John > > >>>>> > > >>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: > > >>>>>> Hello everyone, > > >>>>>> > > >>>>>> let me revive the discussion for KIP-655. Now I have some time > > >>>>>> again and > > >>>>>> I'm eager to finalize this. > > >>>>>> > > >>>>>> Based on what was already discussed, I think that we can split the > > >>>>>> discussion into three topics for our convenience. > > >>>>>> > > >>>>>> The three topics are: > > >>>>>> > > >>>>>> - idExtractor (how should we extract the deduplication key for > > >>>>>> the record) > > >>>>>> > > >>>>>> - timeWindows (what time windows should we use) > > >>>>>> > > >>>>>> - miscellaneous (naming etc.) > > >>>>>> > > >>>>>> ---- idExtractor ---- > > >>>>>> > > >>>>>> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to > (k, > > >>>>>> v) -> > > >>>>>> k. The drawback here is that we must warn the user to choose > such a > > >>>>>> function that sets different IDs for records from different > > >>>>>> partitions, > > >>>>>> otherwise same IDs might be not co-partitioned (and not > > >>>>>> deduplicated as > > >>>>>> a result). Additional concern: what should we do when this > function > > >>>>>> returns null? > > >>>>>> > > >>>>>> Matthias proposed key-only deduplication: that is, no idExtractor > at > > >>>>>> all, and if we want to use `distinct` for a particular > identifier, we > > >>>>>> must `selectKey()` before. The drawback of this approach is that > > >>>>>> we will > > >>>>>> always have repartitioning after the key selection, while in > practice > > >>>>>> repartitioning will not always be necessary (for example, when > the > > >>>>>> data > > >>>>>> stream is such that different values infer different keys). > > >>>>>> > > >>>>>> So here we have a 'safety vs. performance' trade-off. But 'safe' > > >>>>>> variant > > >>>>>> is also not very convenient for developers, since we're forcing > > >>>>>> them to > > >>>>>> change the structure of their records. > > >>>>>> > > >>>>>> A 'golden mean' here might be using composite ID with its first > > >>>>>> component equals to k and its second component equals to some > f(v) (f > > >>>>>> defaults to v -> null, and null value returned by f(v) means > > >>>>>> 'deduplicate by the key only'). The nuance here is that we will > have > > >>>>>> serializers only for types of k and f(v), and we must correctly > > >>>>>> serialize a tuple (k, f(v)), but of course this is doable. > > >>>>>> > > >>>>>> What do you think? > > >>>>>> > > >>>>>> ---- timeWindows ---- > > >>>>>> > > >>>>>> Originally I proposed TimeWindows only just because they solved my > > >>>>>> particular case :-) but agree with Matthias' and Sophie's > objections. > > >>>>>> > > >>>>>> I like the Sophie's point: we need both epoch-aligned and > > >>>>>> data-aligned > > >>>>>> windows. IMO this is absolutely correct: "data-aligned is useful > for > > >>>>>> example when you know that a large number of updates to a single > key > > >>>>>> will occur in short bursts, and epoch-aligned when you > > >>>>>> specifically want > > >>>>>> to get just a single update per discrete time interval." > > >>>>>> > > >>>>>> I just cannot agree right away with Sophie's > > >>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it implies > the > > >>>>>> key-only deduplication -- see the previous topic. > > >>>>>> > > >>>>>> Epoch-aligned windows are very simple: they should forward only > one > > >>>>>> record per enumerated time window. TimeWindows are exactly what > we > > >>>>>> want > > >>>>>> here. I mentioned in the KIP both tumbling and hopping windows > just > > >>>>>> because both are possible for TimeWindows, but indeed I don't see > any > > >>>>>> real use case for hopping windows, only tumbling windows make > > >>>>>> sence IMO. > > >>>>>> > > >>>>>> For data-aligned windows SlidingWindow interface seems to be a > nearly > > >>>>>> valid choice. Nearly. It should forward a record once when it's > first > > >>>>>> seen, and then not again for any identical records that fall into > the > > >>>>>> next N timeUnits. However, we cannot reuse SlidingWindow as is, > > >>>>>> because > > >>>>>> just as Matthias noted, SlidingWindows go backward in time, while > we > > >>>>>> need a windows that go forward in time, and are not opened while > > >>>>>> records > > >>>>>> fall into an already existing window. We definitely should make > > >>>>>> our own > > >>>>>> implementation, maybe we should call it ExpirationWindow? WDYT? > > >>>>>> > > >>>>>> > > >>>>>> ---- miscellaneous ---- > > >>>>>> > > >>>>>> Persistent/in-memory stores. Matthias proposed to pass > Materialized > > >>>>>> parameter next to DistinctParameters (and this is necessary, > > >>>>>> because we > > >>>>>> will need to provide a serializer for extracted id). This is > > >>>>>> absolutely > > >>>>>> valid point, I agree and I will fix it in the KIP. > > >>>>>> > > >>>>>> Naming. Sophie noted that the Streams DSL operators are typically > > >>>>>> named > > >>>>>> as verbs, so she proposes `deduplicate` in favour of `distinct`. > I > > >>>>>> think > > >>>>>> that while it's important to stick to the naming conventions, it > > >>>>>> is also > > >>>>>> important to think of the experience of those who come from > different > > >>>>>> stacks/technologies. People who are familiar with SQL and Java > > >>>>>> Streams > > >>>>>> API must know for sure what does 'distinct' mean, while data > > >>>>>> deduplication in general is a more complex task and thus > > >>>>>> `deduplicate` > > >>>>>> might be misleading. But I'm ready to be convinced if the majority > > >>>>>> thinks otherwise. > > >>>>>> > > >>>>>> > > >>>>>> Regards, > > >>>>>> > > >>>>>> Ivan > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет: > > >>>>>>> Hey all, > > >>>>>>> > > >>>>>>> I'm not convinced either epoch-aligned or data-aligned will fit > all > > >>>>>>> possible use cases. > > >>>>>>> Both seem totally reasonable to me: data-aligned is useful for > > >>>>>>> example when > > >>>>>>> you know > > >>>>>>> that a large number of updates to a single key will occur in > > >>>>>>> short bursts, > > >>>>>>> and epoch- > > >>>>>>> aligned when you specifically want to get just a single update > > >>>>>>> per discrete > > >>>>>>> time > > >>>>>>> interval. > > >>>>>>> > > >>>>>>> Going a step further, though, what if you want just a single > > >>>>>>> update per > > >>>>>>> calendar > > >>>>>>> month, or per year with accounting for leap years? Neither of > > >>>>>>> those are > > >>>>>>> serviced that > > >>>>>>> well by the existing Windows specification to windowed > > >>>>>>> aggregations, a > > >>>>>>> well-known > > >>>>>>> limitation of the current API. There is actually a KIP > > >>>>>>> < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> > > > >>>>>>> > > >>>>>>> going > > >>>>>>> on in parallel to fix this > > >>>>>>> exact issue and make the windowing interface much more flexible. > > >>>>>>> Maybe > > >>>>>>> instead > > >>>>>>> of re-implementing this windowing interface in a similarly > > >>>>>>> limited fashion > > >>>>>>> for the > > >>>>>>> Distinct operator, we could leverage it here and get all the > > >>>>>>> benefits > > >>>>>>> coming with > > >>>>>>> KIP-645. > > >>>>>>> > > >>>>>>> Specifically, I'm proposing to remove the TimeWindows/etc config > > >>>>>>> from the > > >>>>>>> DistinctParameters class, and move the distinct() method from > the > > >>>>>>> KStream > > >>>>>>> interface > > >>>>>>> to the TimeWindowedKStream interface. Since it's semantically > > >>>>>>> similar to a > > >>>>>>> kind of > > >>>>>>> windowed aggregation, it makes sense to align it with the > > >>>>>>> existing windowing > > >>>>>>> framework, ie: > > >>>>>>> > > >>>>>>> inputStream > > >>>>>>> .groupKyKey() > > >>>>>>> .windowedBy() > > >>>>>>> .distinct() > > >>>>>>> > > >>>>>>> Then we could use data-aligned windows if SlidingWindows is > > >>>>>>> specified in > > >>>>>>> the > > >>>>>>> windowedBy(), and epoch-aligned (or some other kind of > enumerable > > >>>>>>> window) > > >>>>>>> if a Windows is specified in windowedBy() (or an > > >>>>>>> EnumerableWindowDefinition > > >>>>>>> once KIP-645 is implemented to replace Windows). > > >>>>>>> > > >>>>>>> *SlidingWindows*: should forward a record once when it's first > > >>>>>>> seen, and > > >>>>>>> then not again > > >>>>>>> for any identical records that fall into the next N timeUnits. > This > > >>>>>>> includes out-of-order > > >>>>>>> records, ie if you have a SlidingWindows of size 10s and process > > >>>>>>> records at > > >>>>>>> time > > >>>>>>> 15s, 20s, 14s then you would just forward the one at 15s. > > >>>>>>> Presumably, if > > >>>>>>> you're > > >>>>>>> using SlidingWindows, you don't care about what falls into exact > > >>>>>>> time > > >>>>>>> boxes, you just > > >>>>>>> want to deduplicate. If you do care about exact time boxing then > > >>>>>>> you should > > >>>>>>> use... > > >>>>>>> > > >>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward > > >>>>>>> only one > > >>>>>>> record > > >>>>>>> per enumerated time window. If you get a records at 15s, 20s,14s > > >>>>>>> where the > > >>>>>>> windows > > >>>>>>> are enumerated at [5,14], [15, 24], etc then you forward the > > >>>>>>> record at 15s > > >>>>>>> and also > > >>>>>>> the record at 14s > > >>>>>>> > > >>>>>>> Just an idea: not sure if the impedance mismatch would throw > > >>>>>>> users off > > >>>>>>> since the > > >>>>>>> semantics of the distinct windows are slightly different than in > the > > >>>>>>> aggregations. > > >>>>>>> But if we don't fit this into the existing windowed framework, > > >>>>>>> then we > > >>>>>>> shouldn't use > > >>>>>>> any existing Windows-type classes at all, imo. ie we should > > >>>>>>> create a new > > >>>>>>> DistinctWindows config class, similar to how stream-stream joins > > >>>>>>> get their > > >>>>>>> own > > >>>>>>> JoinWindows class > > >>>>>>> > > >>>>>>> I also think that non-windowed deduplication could be useful, in > > >>>>>>> which case > > >>>>>>> we > > >>>>>>> would want to also have the distinct() operator on the KStream > > >>>>>>> interface. > > >>>>>>> > > >>>>>>> > > >>>>>>> One quick note regarding the naming: it seems like the Streams > > >>>>>>> DSL operators > > >>>>>>> are typically named as verbs rather than adjectives, for > example. > > >>>>>>> #suppress > > >>>>>>> or > > >>>>>>> #aggregate. I get that there's some precedent for 'distinct' > > >>>>>>> specifically, > > >>>>>>> but > > >>>>>>> maybe something like 'deduplicate' would be more appropriate for > > >>>>>>> the Streams > > >>>>>>> API. > > >>>>>>> > > >>>>>>> WDYT? > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev > > >>>>>>> <iponoma...@mail.ru.invalid> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Matthias, > > >>>>>>>> > > >>>>>>>> Thanks for your review! It made me think deeper, and indeed I > > >>>>>>>> understood > > >>>>>>>> that I was missing some important details. > > >>>>>>>> > > >>>>>>>> To simplify, let me explain my particular use case first so I > > >>>>>>>> can refer > > >>>>>>>> to it later. > > >>>>>>>> > > >>>>>>>> We have a system that collects information about ongoing live > > >>>>>>>> sporting > > >>>>>>>> events from different sources. The information sources have > > >>>>>>>> their IDs > > >>>>>>>> and these IDs are keys of the stream. Each source emits messages > > >>>>>>>> concerning sporting events, and we can have many messages about > > >>>>>>>> each > > >>>>>>>> sporing event from each source. Event ID is extracted from the > > >>>>>>>> message. > > >>>>>>>> > > >>>>>>>> We need a database of event IDs that were reported at least > once > > >>>>>>>> by each > > >>>>>>>> source (important: events from different sources are considered > > >>>>>>>> to be > > >>>>>>>> different entities). The requirements are: > > >>>>>>>> > > >>>>>>>> 1) each new event ID should be written to the database as soon > > >>>>>>>> as possible > > >>>>>>>> > > >>>>>>>> 2) although it's ok and sometimes even desired to repeat the > > >>>>>>>> notification about already known event ID, but we wouldn’t like > our > > >>>>>>>> database to be bothered by the same event ID more often than > > >>>>>>>> once in a > > >>>>>>>> given period of time (say, 15 minutes). > > >>>>>>>> > > >>>>>>>> With this example in mind let me answer your questions > > >>>>>>>> > > >>>>>>>> > (1) Using the `idExtractor` has the issue that data might > > >>>>>>>> not be > > >>>>>>>> > co-partitioned as you mentioned in the KIP. Thus, I am > > >>>>>>>> wondering if it > > >>>>>>>> > might be better to do deduplication only on the key? If > > >>>>>>>> one sets a new > > >>>>>>>> > key upstream (ie, extracts the deduplication id into the > > >>>>>>>> key), the > > >>>>>>>> > `distinct` operator could automatically repartition the > > >>>>>>>> data and thus we > > >>>>>>>> > would avoid user errors. > > >>>>>>>> > > >>>>>>>> Of course with 'key-only' deduplication + autorepartitioning we > > >>>>>>>> will > > >>>>>>>> never cause problems with co-partitioning. But in practice, we > > >>>>>>>> often > > >>>>>>>> don't need repartitioning even if 'dedup ID' is different from > > >>>>>>>> the key, > > >>>>>>>> like in my example above. So here we have a sort of > 'performance vs > > >>>>>>>> security' tradeoff. > > >>>>>>>> > > >>>>>>>> The 'golden middle way' here can be the following: we can form a > > >>>>>>>> deduplication ID as KEY + separator + idExtractor(VALUE). In > case > > >>>>>>>> idExtractor is not provided, we deduplicate by key only (as in > > >>>>>>>> original > > >>>>>>>> proposal). Then idExtractor transforms only the value (and not > > >>>>>>>> the key) > > >>>>>>>> and its result is appended to the key. Records from different > > >>>>>>>> partitions > > >>>>>>>> will inherently have different deduplication IDs and all the > > >>>>>>>> data will > > >>>>>>>> be co-partitioned. As with any stateful operation, we will > > >>>>>>>> repartition > > >>>>>>>> the topic in case the key was changed upstream, but only in > this > > >>>>>>>> case, > > >>>>>>>> thus avoiding unnecessary repartitioning. My example above fits > > >>>>>>>> this > > >>>>>>>> perfectly. > > >>>>>>>> > > >>>>>>>> > (2) What is the motivation for allowing the `idExtractor` > > >>>>>>>> to return > > >>>>>>>> > `null`? Might be good to have some use-case examples for > > >>>>>>>> this feature. > > >>>>>>>> > > >>>>>>>> Can't think of any use-cases. As it often happens, it's just > > >>>>>>>> came with a > > >>>>>>>> copy-paste from StackOverflow -- see Michael Noll's answer here: > > >>>>>>>> > > >>>>>>>> > https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> But, jokes aside, we'll have to decide what to do with nulls. > If we > > >>>>>>>> accept the above proposal of having deduplication ID as KEY + > > >>>>>>>> postfix, > > >>>>>>>> then null can be treated as no postfix at all. If we don't > > >>>>>>>> accept this > > >>>>>>>> approach, then treating nulls as 'no-deduplication' seems to be > a > > >>>>>>>> reasonable assumption (we can't get or put null as a key to a > KV > > >>>>>>>> store, > > >>>>>>>> so a record with null ID is always going to look 'new' for us). > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > (2) Is using a `TimeWindow` really what we want? I was > > >>>>>>>> wondering if a > > >>>>>>>> > `SlidingWindow` might be better? Or maybe we need a new > > >>>>>>>> type of window? > > >>>>>>>> > > >>>>>>>> Agree. It's probably not what we want. Once I thought that > reusing > > >>>>>>>> TimeWindow is a clever idea, now I don't. > > >>>>>>>> > > >>>>>>>> Do we need epoch alignment in our use case? No, we don't, and I > > >>>>>>>> don't > > >>>>>>>> know if anyone going to need this. Epoch alignment is good for > > >>>>>>>> aggregation, but deduplication is a different story. > > >>>>>>>> > > >>>>>>>> Let me describe the semantic the way I see it now and tell me > > >>>>>>>> what you > > >>>>>>>> think: > > >>>>>>>> > > >>>>>>>> - the only parameter that defines the deduplication logic is > > >>>>>>>> 'expiration > > >>>>>>>> period' > > >>>>>>>> > > >>>>>>>> - when a deduplication ID arrives and we cannot find it in the > > >>>>>>>> store, we > > >>>>>>>> forward the message downstream and store the ID + its timestamp. > > >>>>>>>> > > >>>>>>>> - when an out-of-order ID arrives with an older timestamp and > we > > >>>>>>>> find a > > >>>>>>>> 'fresher' record, we do nothing and don't forward the message > > >>>>>>>> (??? OR > > >>>>>>>> NOT? In what case would we want to forward an out-of-order > > >>>>>>>> message?) > > >>>>>>>> > > >>>>>>>> - when an ID with fresher timestamp arrives we check if it > falls > > >>>>>>>> into > > >>>>>>>> the expiration period and either forward it or not, but in both > > >>>>>>>> cases we > > >>>>>>>> update the timestamp of the message in the store > > >>>>>>>> > > >>>>>>>> - the WindowStore retention mechanism should clean up very old > > >>>>>>>> records > > >>>>>>>> in order not to run out of space. > > >>>>>>>> > > >>>>>>>> > (3) `isPersistent` -- instead of using this flag, it > seems > > >>>>>>>> better to > > >>>>>>>> > allow users to pass in a `Materialized` parameter next to > > >>>>>>>> > `DistinctParameters` to configure the state store? > > >>>>>>>> > > >>>>>>>> Fully agree! Users might also want to change the retention time. > > >>>>>>>> > > >>>>>>>> > (4) I am wondering if we should really have 4 overloads > for > > >>>>>>>> > `DistinctParameters.with()`? It might be better to have > > >>>>>>>> one overload > > >>>>>>>> > with all require parameters, and add optional parameters > > >>>>>>>> using the > > >>>>>>>> > builder pattern? This seems to follow the DSL Grammer > > >>>>>>>> proposal. > > >>>>>>>> > > >>>>>>>> Oh, I can explain. We can't fully rely on the builder pattern > > >>>>>>>> because of > > >>>>>>>> Java type inference limitations. We have to provide type > > >>>>>>>> parameters to > > >>>>>>>> the builder methods or the code won't compile: see e. g. this > > >>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938 and > > >>>>>>>> following > > >>>>>>>> discussion with Tagir Valeev. > > >>>>>>>> > > >>>>>>>> When we came across the similar difficulties in KIP-418, we > finally > > >>>>>>>> decided to add all the necessary overloads to parameter class. > > >>>>>>>> So I just > > >>>>>>>> reproduced that approach here. > > >>>>>>>> > > >>>>>>>> > (5) Even if it might be an implementation detail (and > > >>>>>>>> maybe the KIP > > >>>>>>>> > itself does not need to mention it), can you give a high > > >>>>>>>> level overview > > >>>>>>>> > how you intent to implement it (that would be easier to > > >>>>>>>> grog, compared > > >>>>>>>> > to reading the PR). > > >>>>>>>> > > >>>>>>>> Well as with any operation on KStreamImpl level I'm building a > > >>>>>>>> store and > > >>>>>>>> a processor node. > > >>>>>>>> > > >>>>>>>> KStreamDistinct class is going to be the ProcessorSupplier, > with > > >>>>>>>> the > > >>>>>>>> logic regarding the forwarding/muting of the records located in > > >>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process > > >>>>>>>> > > >>>>>>>> ---- > > >>>>>>>> > > >>>>>>>> Matthias, if you are still reading this :-) a gentle reminder: > > >>>>>>>> my PR for > > >>>>>>>> already accepted KIP-418 is still waiting for your review. I > > >>>>>>>> think it's > > >>>>>>>> better for me to finalize at least one KIP before proceeding > to > > >>>>>>>> a new > > >>>>>>>> one :-) > > >>>>>>>> > > >>>>>>>> Regards, > > >>>>>>>> > > >>>>>>>> Ivan > > >>>>>>>> > > >>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет: > > >>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication > > >>>>>>>>> operator is for > > >>>>>>>>> sure a good addition. > > >>>>>>>>> > > >>>>>>>>> Couple of questions: > > >>>>>>>>> > > >>>>>>>>> (1) Using the `idExtractor` has the issue that data might not > be > > >>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am > > >>>>>>>>> wondering if it > > >>>>>>>>> might be better to do deduplication only on the key? If one > > >>>>>>>>> sets a new > > >>>>>>>>> key upstream (ie, extracts the deduplication id into the key), > the > > >>>>>>>>> `distinct` operator could automatically repartition the data > > >>>>>>>>> and thus we > > >>>>>>>>> would avoid user errors. > > >>>>>>>>> > > >>>>>>>>> (2) What is the motivation for allowing the `idExtractor` to > > >>>>>>>>> return > > >>>>>>>>> `null`? Might be good to have some use-case examples for this > > >>>>>>>>> feature. > > >>>>>>>>> > > >>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was > > >>>>>>>>> wondering if a > > >>>>>>>>> `SlidingWindow` might be better? Or maybe we need a new type > of > > >>>>>>>>> window? > > >>>>>>>>> > > >>>>>>>>> It would be helpful if you could describe potential use cases > > >>>>>>>>> in more > > >>>>>>>>> detail. -- I am mainly wondering about hopping window? Each > > >>>>>>>>> record would > > >>>>>>>>> always falls into multiple window and thus would be emitted > > >>>>>>>>> multiple > > >>>>>>>>> times, ie, each time the window closes. Is this really a valid > > >>>>>>>>> use case? > > >>>>>>>>> > > >>>>>>>>> It seems that for de-duplication, one wants to have some > > >>>>>>>>> "expiration > > >>>>>>>>> time", ie, for each ID, deduplicate all consecutive records > > >>>>>>>>> with the > > >>>>>>>>> same ID and emit the first record after the "expiration time" > > >>>>>>>>> passed. In > > >>>>>>>>> terms of a window, this would mean that the window starts at > > >>>>>>>>> `r.ts` and > > >>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the > > >>>>>>>>> data. > > >>>>>>>>> TimeWindows are aligned to the epoch though. While > > >>>>>>>>> `SlidingWindows` also > > >>>>>>>>> align to the data, for the aggregation use-case they go > > >>>>>>>>> backward in > > >>>>>>>>> time, while we need a window that goes forward in time. It's > an > > >>>>>>>>> open > > >>>>>>>>> question if we can re-purpose `SlidingWindows` -- it might be > > >>>>>>>>> ok the > > >>>>>>>>> make the alignment (into the past vs into the future) an > operator > > >>>>>>>>> dependent behavior? > > >>>>>>>>> > > >>>>>>>>> (3) `isPersistent` -- instead of using this flag, it seems > > >>>>>>>>> better to > > >>>>>>>>> allow users to pass in a `Materialized` parameter next to > > >>>>>>>>> `DistinctParameters` to configure the state store? > > >>>>>>>>> > > >>>>>>>>> (4) I am wondering if we should really have 4 overloads for > > >>>>>>>>> `DistinctParameters.with()`? It might be better to have one > > >>>>>>>>> overload > > >>>>>>>>> with all require parameters, and add optional parameters using > the > > >>>>>>>>> builder pattern? This seems to follow the DSL Grammer proposal. > > >>>>>>>>> > > >>>>>>>>> (5) Even if it might be an implementation detail (and maybe > the > > >>>>>>>>> KIP > > >>>>>>>>> itself does not need to mention it), can you give a high level > > >>>>>>>>> overview > > >>>>>>>>> how you intent to implement it (that would be easier to grog, > > >>>>>>>>> compared > > >>>>>>>>> to reading the PR). > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -Matthias > > >>>>>>>>> > > >>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: > > >>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic > > >>>>>>>>>> > > >>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: > > >>>>>>>>>>> Hello, > > >>>>>>>>>>> > > >>>>>>>>>>> I'd like to start a discussion for KIP-655. > > >>>>>>>>>>> > > >>>>>>>>>>> KIP-655: > > >>>>>>>>>>> > > >>>>>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > >>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> I also opened a proof-of-concept PR for you to experiment > > >>>>>>>>>>> with the API: > > >>>>>>>>>>> > > >>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 > > >>>>>>>>>>> > > >>>>>>>>>>> Regards, > > >>>>>>>>>>> > > >>>>>>>>>>> Ivan Ponomarev > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>> > > >>>> > > > > > > > >