Hi Bruno, I had previously been thinking to use equals(), since I thought that this might be a stateless operation. Comparing the serialized form requires a serde and a fairly expensive serialization operation, so while byte equality is superior to equals(), we shouldn't use it in operations unless they already require serialization.
I chnaged my mind when I later realized I had been mistaken, and this operation is of course stateful. I hope this helps clarify it. Thanks, -John On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote: > Hi Ivan and John, > > 1. John, could you clarify why comparing serialized values seems the way > to go, now? > > 2. Ivan, Could you please answer my questions that I posted earlier? I > will repost it here: > Ivan, could you please make this matter a bit clearer in the KIP? > Actually, thinking about it again, I do currently not see why it should > not make sense in hopping windows. Regarding this, I do not understand > the following sentence: > > "hopping and sliding windows do not make much sense for distinct() > because they produce multiple intersected windows, so that one record > can be multiplied instead of deduplication." > > Ivan, what do you mean with "multiplied"? > > 3. As I said earlier, I do not think that SQL and the Java Stream API > are good arguments to not use a verb. However, if everybody else is fine > with it, I can get behind it. > > John, good catch about the missing overloads! > BTW, the overload with Named should be there regardless of stateful or > stateless. > > Best, > Bruno > > On 22.07.21 20:58, John Roesler wrote: > > Hi Ivan, > > > > Thanks for the reply. > > > > 1. I think I might have gotten myself confused. I was > > thinking of this operation as stateless, but now I'm not > > sure what I was thinking... This operator has to be > > stateful, right? In that case, I agree that comparing > > serialized values seems to be way to do it. > > > > 2. Thanks for the confirmation > > > > 3. I continue to be satisfied to let you all hash it out. > > > > Thanks, > > -John > > > > On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote: > > > Hi all, > > > > > > 1. Actually I always thought about the serialized byte array only -- at > > > least this is what local stores depend upon, and what Kafka itself > > > depends upon when doing log compaction. > > > > > > I can imagine a case where two different byte arrays deserialize to > > > objects which are `equals` to each other. But I think we can ignore this > > > for now because IMO the risks related to buggy `equals` implementations > > > outweigh the potential benefits. > > > > > > I will mention the duplicate definition in the KIP. > > > > > > 2. I agree with John, he got my point. > > > > > > 3. Let me gently insist on `distinct`. I believe that an exception to > > > the rule is appropriate here, because the name `distinct()` is ubiquitous. > > > > > > It's not only about Java Streams API (or .NET LINQ, which appeared > > > earlier and also has `Distinct`): Spark's DataFrame has `distinct()` > > > method, Hazelcast Jet has `distinct()` method, and I bet I can find more > > > examples if I search. When we teach KStreams, we always say that > > > KStreams are just like other streaming APIs, and they have roots in SQL > > > queries. Users know what `distinct` is and they expect it to be in the > > > API. > > > > > > > > > Regards, > > > > > > Ivan > > > > > > > > > 13.07.2021 0:10, John Roesler пишет: > > > > Hi all, > > > > > > > > Bruno raised some very good points. I’d like to chime in with > > > > additional context. > > > > > > > > 1. Great point. We faced a similar problem defining KIP-557. For 557, > > > > we chose to use the serialized byte array instead of the equals() > > > > method, but I think the situation in KIP-655 is a bit different. I > > > > think it might make sense to use the equals() method here, but am > > > > curious what Ivan thinks. > > > > > > > > 2. I figured we'd do nothing. I thought Ivan was just saying that it > > > > doesn't make a ton of sense to use it, which I agree with, but it > > > > doesn't seem like that means we should prohibit it. > > > > > > > > 3. FWIW, I don't have a strong feeling either way. > > > > > > > > Thanks, > > > > -John > > > > > > > > On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote: > > > > > Hi Ivan, > > > > > > > > > > Thank you for the KIP! > > > > > > > > > > Some aspects are not clear to me from the KIP and I have a proposal. > > > > > > > > > > 1. The KIP does not describe the criteria that define a duplicate. > > > > > Could > > > > > you add a definition of duplicate to the KIP? > > > > > > > > > > 2. The KIP does not describe what happens if distinct() is applied on > > > > > a > > > > > hopping window. On the DSL level, I do not see how you can avoid that > > > > > users apply distinct() on a hopping window, i.e., you cannot avoid it > > > > > at > > > > > compile time, you need to check it at runtime and throw an exception. > > > > > Is > > > > > this correct or am I missing something? > > > > > > > > > > 3. I would also like to back a proposal by Sophie. She proposed to use > > > > > deduplicate() instead of distinct(), since the other DSL operations > > > > > are > > > > > also verbs. I do not think that SQL and the Java Stream API are good > > > > > arguments to not use a verb. > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > > > > > > On 10.07.21 19:11, John Roesler wrote: > > > > > > Hi Ivan, > > > > > > > > > > > > Sorry for the silence! > > > > > > > > > > > > I have just re-read the proposal. > > > > > > > > > > > > To summarize, you are now only proposing the zero-arg distict() > > > > > > method to be added to TimeWindowedKStream and > > > > > > SessionWindowedKStream, right? > > > > > > > > > > > > I’m in favor of this proposal. > > > > > > > > > > > > Thanks, > > > > > > John > > > > > > > > > > > > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: > > > > > > > Hello everyone, > > > > > > > > > > > > > > I would like to remind you about KIP-655 and KIP-759 just in case > > > > > > > they > > > > > > > got lost in your inbox. > > > > > > > > > > > > > > Now the initial proposal is split into two independent and > > > > > > > smaller ones, > > > > > > > so it must be easier to review them. Of course, if you have time. > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Ivan > > > > > > > > > > > > > > > > > > > > > 24.06.2021 18:11, Ivan Ponomarev пишет: > > > > > > > > Hello all, > > > > > > > > > > > > > > > > I have rewritten the KIP-655 summarizing what was agreed upon > > > > > > > > during > > > > > > > > this discussion (now the proposal is much simpler and less > > > > > > > > invasive). > > > > > > > > > > > > > > > > I have also created KIP-759 (cancelRepartition operation) and > > > > > > > > started a > > > > > > > > discussion for it. > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > Ivan. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 04.06.2021 8:15, Matthias J. Sax пишет: > > > > > > > > > Just skimmed over the thread -- first of all, I am glad that > > > > > > > > > we could > > > > > > > > > merge KIP-418 and ship it :) > > > > > > > > > > > > > > > > > > About the re-partitioning concerns, there are already two > > > > > > > > > tickets for it: > > > > > > > > > > > > > > > > > > - https://issues.apache.org/jira/browse/KAFKA-4835 > > > > > > > > > - https://issues.apache.org/jira/browse/KAFKA-10844 > > > > > > > > > > > > > > > > > > Thus, it seems best to exclude this topic from this KIP, and > > > > > > > > > do a > > > > > > > > > separate KIP for it (if necessary, we can "pause" this KIP > > > > > > > > > until the > > > > > > > > > repartition KIP is done). It's a long standing "issue" and we > > > > > > > > > should > > > > > > > > > resolve it in a general way I guess. > > > > > > > > > > > > > > > > > > (Did not yet ready all responses in detail yet, so keeping > > > > > > > > > this comment > > > > > > > > > short.) > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 6/2/21 6:35 AM, John Roesler wrote: > > > > > > > > > > Thanks, Ivan! > > > > > > > > > > > > > > > > > > > > That sounds like a great plan to me. Two smaller KIPs are > > > > > > > > > > easier to > > > > > > > > > > agree on than one big one. > > > > > > > > > > > > > > > > > > > > I agree hopping and sliding windows will actually have a > > > > > > > > > > duplicating > > > > > > > > > > effect. We can avoid adding distinct() to the sliding window > > > > > > > > > > interface, but hopping windows are just a different > > > > > > > > > > parameterization > > > > > > > > > > of epoch-aligned windows. It seems we can’t do much about > > > > > > > > > > that except > > > > > > > > > > document the issue. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: > > > > > > > > > > > Hi John! > > > > > > > > > > > > > > > > > > > > > > I think that your proposal is just fantastic, it > > > > > > > > > > > simplifies things a > > > > > > > > > > > lot! > > > > > > > > > > > > > > > > > > > > > > I also felt uncomfortable due to the fact that the > > > > > > > > > > > proposed > > > > > > > > > > > `distinct()` > > > > > > > > > > > is not somewhere near `count()` and `reduce(..)`. But > > > > > > > > > > > `selectKey(..).groupByKey().windowedBy(..).distinct()` > > > > > > > > > > > didn't look like > > > > > > > > > > > a correct option for me because of the issue with the > > > > > > > > > > > unneeded > > > > > > > > > > > repartitioning. > > > > > > > > > > > > > > > > > > > > > > The bold idea that we can just CANCEL the repartitioning > > > > > > > > > > > didn't came to > > > > > > > > > > > my mind. > > > > > > > > > > > > > > > > > > > > > > What seemed to me a single problem is in fact two > > > > > > > > > > > unrelated problems: > > > > > > > > > > > `distinct` operation and cancelling the unneeded > > > > > > > > > > > repartitioning. > > > > > > > > > > > > > > > > > > > > > > > what if we introduce a parameter to `selectKey()` > > > > > > > > > > > that specifies > > > > > > > > > > > that > > > > > > > > > > > the caller asserts that the new key does _not_ change the > > > > > > > > > > > data > > > > > > > > > > > partitioning? > > > > > > > > > > > > > > > > > > > > > > I think a more elegant solution would be not to add a new > > > > > > > > > > > parameter to > > > > > > > > > > > `selectKey` and all the other key-changing operations > > > > > > > > > > > (`map`, > > > > > > > > > > > `transform`, `flatMap`, ...), but add a new operator > > > > > > > > > > > `KStream#cancelRepartitioning()` that resets > > > > > > > > > > > `keyChangingOperation` > > > > > > > > > > > flag > > > > > > > > > > > for the upstream node. Of course, "use it only if you > > > > > > > > > > > know what you're > > > > > > > > > > > doing" warning is to be added. Well, it's a topic for a > > > > > > > > > > > separate KIP! > > > > > > > > > > > > > > > > > > > > > > Concerning `distinct()`. If we use `XXXWindowedKStream` > > > > > > > > > > > facilities, > > > > > > > > > > > then > > > > > > > > > > > changes to the API are minimally invasive: we're just > > > > > > > > > > > adding > > > > > > > > > > > `distinct()` to TimeWindowedKStream and > > > > > > > > > > > SessionWindowedKStream, and > > > > > > > > > > > that's all. > > > > > > > > > > > > > > > > > > > > > > We can now define `distinct` as an operation that returns > > > > > > > > > > > only a first > > > > > > > > > > > record that falls into a new window, and filters out all > > > > > > > > > > > the other > > > > > > > > > > > records that fall into an already existing window. BTW, > > > > > > > > > > > we can mock the > > > > > > > > > > > behaviour of such an operation with `TopologyTestDriver` > > > > > > > > > > > using > > > > > > > > > > > `reduce((l, r) -> STOP)`.filterNot((k, > > > > > > > > > > > v)->STOP.equals(v)). ;-) > > > > > > > > > > > > > > > > > > > > > > Consider the following example (record times are in > > > > > > > > > > > seconds): > > > > > > > > > > > > > > > > > > > > > > //three bursts of variously ordered records > > > > > > > > > > > 4, 5, 6 > > > > > > > > > > > 23, 22, 24 > > > > > > > > > > > 34, 33, 32 > > > > > > > > > > > //'late arrivals' > > > > > > > > > > > 7, 22, 35 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. 'Epoch-aligned deduplication' using tumbling windows: > > > > > > > > > > > > > > > > > > > > > > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > produces > > > > > > > > > > > > > > > > > > > > > > (key@[00000/10000], 4) > > > > > > > > > > > (key@[20000/30000], 23) > > > > > > > > > > > (key@[30000/40000], 34) > > > > > > > > > > > > > > > > > > > > > > -- that is, one record per epoch-aligned window. > > > > > > > > > > > > > > > > > > > > > > 2. Hopping and sliding windows do not make much sense > > > > > > > > > > > here, because > > > > > > > > > > > they > > > > > > > > > > > produce multiple intersected windows, so that one record > > > > > > > > > > > can be > > > > > > > > > > > multiplied, but we want deduplication. > > > > > > > > > > > > > > > > > > > > > > 3. SessionWindows work for 'data-aligned deduplication'. > > > > > > > > > > > > > > > > > > > > > > .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > produces only > > > > > > > > > > > > > > > > > > > > > > ([key@4000/4000], 4) > > > > > > > > > > > ([key@23000/23000], 23) > > > > > > > > > > > > > > > > > > > > > > because all the records bigger than 7 are stuck together > > > > > > > > > > > in one > > > > > > > > > > > session. > > > > > > > > > > > Setting inactivity gap to 9 seconds will return three > > > > > > > > > > > records: > > > > > > > > > > > > > > > > > > > > > > ([key@4000/4000], 4) > > > > > > > > > > > ([key@23000/23000], 23) > > > > > > > > > > > ([key@34000/34000], 34) > > > > > > > > > > > > > > > > > > > > > > WDYT? If you like this variant, I will re-write KIP-655 > > > > > > > > > > > and propose a > > > > > > > > > > > separate KIP for `cancelRepartitioning` (or whatever name > > > > > > > > > > > we will > > > > > > > > > > > choose > > > > > > > > > > > for it). > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > Ivan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 24.05.2021 22:32, John Roesler пишет: > > > > > > > > > > > > Hey there, Ivan! > > > > > > > > > > > > > > > > > > > > > > > > In typical fashion, I'm going to make a somewhat > > > > > > > > > > > > outlandish > > > > > > > > > > > > proposal. I'm hoping that we can side-step some of the > > > > > > > > > > > > complications that have arisen. Please bear with me. > > > > > > > > > > > > > > > > > > > > > > > > It seems like `distinct()` is not fundamentally unlike > > > > > > > > > > > > other windowed > > > > > > > > > > > > "aggregation" operations. Your concern about unnecessary > > > > > > > > > > > > repartitioning seems to apply just as well to `count()` > > > > > > > > > > > > as to > > > > > > > > > > > > `distinct()`. > > > > > > > > > > > > This has come up before, but I don't remember when: > > > > > > > > > > > > what if we > > > > > > > > > > > > introduce a parameter to `selectKey()` that specifies > > > > > > > > > > > > that the caller > > > > > > > > > > > > asserts that the new key does _not_ change the data > > > > > > > > > > > > partitioning? > > > > > > > > > > > > The docs on that parameter would of course spell out > > > > > > > > > > > > all the "rights > > > > > > > > > > > > and responsibilities" of setting it. > > > > > > > > > > > > > > > > > > > > > > > > In that case, we could indeed get back to > > > > > > > > > > > > `selectKey(A).windowBy(B).distinct(...)`, where we get > > > > > > > > > > > > to compose the > > > > > > > > > > > > key mapper and the windowing function without having to > > > > > > > > > > > > carve out > > > > > > > > > > > > a separate domain just for `distinct()`. All the rest > > > > > > > > > > > > of the KStream > > > > > > > > > > > > operations would also benefit. > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > John > > > > > > > > > > > > > > > > > > > > > > > > On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > > > > > > > > > > > > > let me revive the discussion for KIP-655. Now I have > > > > > > > > > > > > > some time > > > > > > > > > > > > > again and > > > > > > > > > > > > > I'm eager to finalize this. > > > > > > > > > > > > > > > > > > > > > > > > > > Based on what was already discussed, I think that we > > > > > > > > > > > > > can split the > > > > > > > > > > > > > discussion into three topics for our convenience. > > > > > > > > > > > > > > > > > > > > > > > > > > The three topics are: > > > > > > > > > > > > > > > > > > > > > > > > > > - idExtractor (how should we extract the > > > > > > > > > > > > > deduplication key for > > > > > > > > > > > > > the record) > > > > > > > > > > > > > > > > > > > > > > > > > > - timeWindows (what time windows should we use) > > > > > > > > > > > > > > > > > > > > > > > > > > - miscellaneous (naming etc.) > > > > > > > > > > > > > > > > > > > > > > > > > > ---- idExtractor ---- > > > > > > > > > > > > > > > > > > > > > > > > > > Original proposal: use (k, v) -> f(k, v) mapper, > > > > > > > > > > > > > defaulting to (k, > > > > > > > > > > > > > v) -> > > > > > > > > > > > > > k. The drawback here is that we must warn the user > > > > > > > > > > > > > to choose such a > > > > > > > > > > > > > function that sets different IDs for records from > > > > > > > > > > > > > different > > > > > > > > > > > > > partitions, > > > > > > > > > > > > > otherwise same IDs might be not co-partitioned (and > > > > > > > > > > > > > not > > > > > > > > > > > > > deduplicated as > > > > > > > > > > > > > a result). Additional concern: what should we do when > > > > > > > > > > > > > this function > > > > > > > > > > > > > returns null? > > > > > > > > > > > > > > > > > > > > > > > > > > Matthias proposed key-only deduplication: that is, no > > > > > > > > > > > > > idExtractor at > > > > > > > > > > > > > all, and if we want to use `distinct` for a > > > > > > > > > > > > > particular identifier, we > > > > > > > > > > > > > must `selectKey()` before. The drawback of this > > > > > > > > > > > > > approach is that > > > > > > > > > > > > > we will > > > > > > > > > > > > > always have repartitioning after the key selection, > > > > > > > > > > > > > while in practice > > > > > > > > > > > > > repartitioning will not always be necessary (for > > > > > > > > > > > > > example, when the > > > > > > > > > > > > > data > > > > > > > > > > > > > stream is such that different values infer different > > > > > > > > > > > > > keys). > > > > > > > > > > > > > > > > > > > > > > > > > > So here we have a 'safety vs. performance' trade-off. > > > > > > > > > > > > > But 'safe' > > > > > > > > > > > > > variant > > > > > > > > > > > > > is also not very convenient for developers, since > > > > > > > > > > > > > we're forcing > > > > > > > > > > > > > them to > > > > > > > > > > > > > change the structure of their records. > > > > > > > > > > > > > > > > > > > > > > > > > > A 'golden mean' here might be using composite ID with > > > > > > > > > > > > > its first > > > > > > > > > > > > > component equals to k and its second component equals > > > > > > > > > > > > > to some f(v) (f > > > > > > > > > > > > > defaults to v -> null, and null value returned by > > > > > > > > > > > > > f(v) means > > > > > > > > > > > > > 'deduplicate by the key only'). The nuance here is > > > > > > > > > > > > > that we will have > > > > > > > > > > > > > serializers only for types of k and f(v), and we must > > > > > > > > > > > > > correctly > > > > > > > > > > > > > serialize a tuple (k, f(v)), but of course this is > > > > > > > > > > > > > doable. > > > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > ---- timeWindows ---- > > > > > > > > > > > > > > > > > > > > > > > > > > Originally I proposed TimeWindows only just because > > > > > > > > > > > > > they solved my > > > > > > > > > > > > > particular case :-) but agree with Matthias' and > > > > > > > > > > > > > Sophie's objections. > > > > > > > > > > > > > > > > > > > > > > > > > > I like the Sophie's point: we need both epoch-aligned > > > > > > > > > > > > > and > > > > > > > > > > > > > data-aligned > > > > > > > > > > > > > windows. IMO this is absolutely correct: > > > > > > > > > > > > > "data-aligned is useful for > > > > > > > > > > > > > example when you know that a large number of updates > > > > > > > > > > > > > to a single key > > > > > > > > > > > > > will occur in short bursts, and epoch-aligned when you > > > > > > > > > > > > > specifically want > > > > > > > > > > > > > to get just a single update per discrete time > > > > > > > > > > > > > interval." > > > > > > > > > > > > > > > > > > > > > > > > > > I just cannot agree right away with Sophie's > > > > > > > > > > > > > .groupByKey().windowedBy(...).distinct() proposal, as > > > > > > > > > > > > > it implies the > > > > > > > > > > > > > key-only deduplication -- see the previous topic. > > > > > > > > > > > > > > > > > > > > > > > > > > Epoch-aligned windows are very simple: they should > > > > > > > > > > > > > forward only one > > > > > > > > > > > > > record per enumerated time window. TimeWindows are > > > > > > > > > > > > > exactly what we > > > > > > > > > > > > > want > > > > > > > > > > > > > here. I mentioned in the KIP both tumbling and > > > > > > > > > > > > > hopping windows just > > > > > > > > > > > > > because both are possible for TimeWindows, but indeed > > > > > > > > > > > > > I don't see any > > > > > > > > > > > > > real use case for hopping windows, only tumbling > > > > > > > > > > > > > windows make > > > > > > > > > > > > > sence IMO. > > > > > > > > > > > > > > > > > > > > > > > > > > For data-aligned windows SlidingWindow interface > > > > > > > > > > > > > seems to be a nearly > > > > > > > > > > > > > valid choice. Nearly. It should forward a record once > > > > > > > > > > > > > when it's first > > > > > > > > > > > > > seen, and then not again for any identical records > > > > > > > > > > > > > that fall into the > > > > > > > > > > > > > next N timeUnits. However, we cannot reuse > > > > > > > > > > > > > SlidingWindow as is, > > > > > > > > > > > > > because > > > > > > > > > > > > > just as Matthias noted, SlidingWindows go backward in > > > > > > > > > > > > > time, while we > > > > > > > > > > > > > need a windows that go forward in time, and are not > > > > > > > > > > > > > opened while > > > > > > > > > > > > > records > > > > > > > > > > > > > fall into an already existing window. We definitely > > > > > > > > > > > > > should make > > > > > > > > > > > > > our own > > > > > > > > > > > > > implementation, maybe we should call it > > > > > > > > > > > > > ExpirationWindow? WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ---- miscellaneous ---- > > > > > > > > > > > > > > > > > > > > > > > > > > Persistent/in-memory stores. Matthias proposed to > > > > > > > > > > > > > pass Materialized > > > > > > > > > > > > > parameter next to DistinctParameters (and this is > > > > > > > > > > > > > necessary, > > > > > > > > > > > > > because we > > > > > > > > > > > > > will need to provide a serializer for extracted id). > > > > > > > > > > > > > This is > > > > > > > > > > > > > absolutely > > > > > > > > > > > > > valid point, I agree and I will fix it in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > Naming. Sophie noted that the Streams DSL operators > > > > > > > > > > > > > are typically > > > > > > > > > > > > > named > > > > > > > > > > > > > as verbs, so she proposes `deduplicate` in favour of > > > > > > > > > > > > > `distinct`. I > > > > > > > > > > > > > think > > > > > > > > > > > > > that while it's important to stick to the naming > > > > > > > > > > > > > conventions, it > > > > > > > > > > > > > is also > > > > > > > > > > > > > important to think of the experience of those who > > > > > > > > > > > > > come from different > > > > > > > > > > > > > stacks/technologies. People who are familiar with SQL > > > > > > > > > > > > > and Java > > > > > > > > > > > > > Streams > > > > > > > > > > > > > API must know for sure what does 'distinct' mean, > > > > > > > > > > > > > while data > > > > > > > > > > > > > deduplication in general is a more complex task and > > > > > > > > > > > > > thus > > > > > > > > > > > > > `deduplicate` > > > > > > > > > > > > > might be misleading. But I'm ready to be convinced if > > > > > > > > > > > > > the majority > > > > > > > > > > > > > thinks otherwise. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > > > > > Ivan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 14.09.2020 21:31, Sophie Blee-Goldman пишет: > > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not convinced either epoch-aligned or > > > > > > > > > > > > > > data-aligned will fit all > > > > > > > > > > > > > > possible use cases. > > > > > > > > > > > > > > Both seem totally reasonable to me: data-aligned is > > > > > > > > > > > > > > useful for > > > > > > > > > > > > > > example when > > > > > > > > > > > > > > you know > > > > > > > > > > > > > > that a large number of updates to a single key will > > > > > > > > > > > > > > occur in > > > > > > > > > > > > > > short bursts, > > > > > > > > > > > > > > and epoch- > > > > > > > > > > > > > > aligned when you specifically want to get just a > > > > > > > > > > > > > > single update > > > > > > > > > > > > > > per discrete > > > > > > > > > > > > > > time > > > > > > > > > > > > > > interval. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Going a step further, though, what if you want just > > > > > > > > > > > > > > a single > > > > > > > > > > > > > > update per > > > > > > > > > > > > > > calendar > > > > > > > > > > > > > > month, or per year with accounting for leap years? > > > > > > > > > > > > > > Neither of > > > > > > > > > > > > > > those are > > > > > > > > > > > > > > serviced that > > > > > > > > > > > > > > well by the existing Windows specification to > > > > > > > > > > > > > > windowed > > > > > > > > > > > > > > aggregations, a > > > > > > > > > > > > > > well-known > > > > > > > > > > > > > > limitation of the current API. There is actually a > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> > > > > > > > > > > > > > > > > > > > > > > > > > > > > going > > > > > > > > > > > > > > on in parallel to fix this > > > > > > > > > > > > > > exact issue and make the windowing interface much > > > > > > > > > > > > > > more flexible. > > > > > > > > > > > > > > Maybe > > > > > > > > > > > > > > instead > > > > > > > > > > > > > > of re-implementing this windowing interface in a > > > > > > > > > > > > > > similarly > > > > > > > > > > > > > > limited fashion > > > > > > > > > > > > > > for the > > > > > > > > > > > > > > Distinct operator, we could leverage it here and > > > > > > > > > > > > > > get all the > > > > > > > > > > > > > > benefits > > > > > > > > > > > > > > coming with > > > > > > > > > > > > > > KIP-645. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Specifically, I'm proposing to remove the > > > > > > > > > > > > > > TimeWindows/etc config > > > > > > > > > > > > > > from the > > > > > > > > > > > > > > DistinctParameters class, and move the distinct() > > > > > > > > > > > > > > method from the > > > > > > > > > > > > > > KStream > > > > > > > > > > > > > > interface > > > > > > > > > > > > > > to the TimeWindowedKStream interface. Since it's > > > > > > > > > > > > > > semantically > > > > > > > > > > > > > > similar to a > > > > > > > > > > > > > > kind of > > > > > > > > > > > > > > windowed aggregation, it makes sense to align it > > > > > > > > > > > > > > with the > > > > > > > > > > > > > > existing windowing > > > > > > > > > > > > > > framework, ie: > > > > > > > > > > > > > > > > > > > > > > > > > > > > inputStream > > > > > > > > > > > > > > .groupKyKey() > > > > > > > > > > > > > > .windowedBy() > > > > > > > > > > > > > > .distinct() > > > > > > > > > > > > > > > > > > > > > > > > > > > > Then we could use data-aligned windows if > > > > > > > > > > > > > > SlidingWindows is > > > > > > > > > > > > > > specified in > > > > > > > > > > > > > > the > > > > > > > > > > > > > > windowedBy(), and epoch-aligned (or some other kind > > > > > > > > > > > > > > of enumerable > > > > > > > > > > > > > > window) > > > > > > > > > > > > > > if a Windows is specified in windowedBy() (or an > > > > > > > > > > > > > > EnumerableWindowDefinition > > > > > > > > > > > > > > once KIP-645 is implemented to replace Windows). > > > > > > > > > > > > > > > > > > > > > > > > > > > > *SlidingWindows*: should forward a record once when > > > > > > > > > > > > > > it's first > > > > > > > > > > > > > > seen, and > > > > > > > > > > > > > > then not again > > > > > > > > > > > > > > for any identical records that fall into the next N > > > > > > > > > > > > > > timeUnits. This > > > > > > > > > > > > > > includes out-of-order > > > > > > > > > > > > > > records, ie if you have a SlidingWindows of size > > > > > > > > > > > > > > 10s and process > > > > > > > > > > > > > > records at > > > > > > > > > > > > > > time > > > > > > > > > > > > > > 15s, 20s, 14s then you would just forward the one > > > > > > > > > > > > > > at 15s. > > > > > > > > > > > > > > Presumably, if > > > > > > > > > > > > > > you're > > > > > > > > > > > > > > using SlidingWindows, you don't care about what > > > > > > > > > > > > > > falls into exact > > > > > > > > > > > > > > time > > > > > > > > > > > > > > boxes, you just > > > > > > > > > > > > > > want to deduplicate. If you do care about exact > > > > > > > > > > > > > > time boxing then > > > > > > > > > > > > > > you should > > > > > > > > > > > > > > use... > > > > > > > > > > > > > > > > > > > > > > > > > > > > *EnumerableWindowDefinition* (eg *TimeWindows*): > > > > > > > > > > > > > > should forward > > > > > > > > > > > > > > only one > > > > > > > > > > > > > > record > > > > > > > > > > > > > > per enumerated time window. If you get a records at > > > > > > > > > > > > > > 15s, 20s,14s > > > > > > > > > > > > > > where the > > > > > > > > > > > > > > windows > > > > > > > > > > > > > > are enumerated at [5,14], [15, 24], etc then you > > > > > > > > > > > > > > forward the > > > > > > > > > > > > > > record at 15s > > > > > > > > > > > > > > and also > > > > > > > > > > > > > > the record at 14s > > > > > > > > > > > > > > > > > > > > > > > > > > > > Just an idea: not sure if the impedance mismatch > > > > > > > > > > > > > > would throw > > > > > > > > > > > > > > users off > > > > > > > > > > > > > > since the > > > > > > > > > > > > > > semantics of the distinct windows are slightly > > > > > > > > > > > > > > different than in the > > > > > > > > > > > > > > aggregations. > > > > > > > > > > > > > > But if we don't fit this into the existing windowed > > > > > > > > > > > > > > framework, > > > > > > > > > > > > > > then we > > > > > > > > > > > > > > shouldn't use > > > > > > > > > > > > > > any existing Windows-type classes at all, imo. ie > > > > > > > > > > > > > > we should > > > > > > > > > > > > > > create a new > > > > > > > > > > > > > > DistinctWindows config class, similar to how > > > > > > > > > > > > > > stream-stream joins > > > > > > > > > > > > > > get their > > > > > > > > > > > > > > own > > > > > > > > > > > > > > JoinWindows class > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also think that non-windowed deduplication could > > > > > > > > > > > > > > be useful, in > > > > > > > > > > > > > > which case > > > > > > > > > > > > > > we > > > > > > > > > > > > > > would want to also have the distinct() operator on > > > > > > > > > > > > > > the KStream > > > > > > > > > > > > > > interface. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One quick note regarding the naming: it seems like > > > > > > > > > > > > > > the Streams > > > > > > > > > > > > > > DSL operators > > > > > > > > > > > > > > are typically named as verbs rather than > > > > > > > > > > > > > > adjectives, for example. > > > > > > > > > > > > > > #suppress > > > > > > > > > > > > > > or > > > > > > > > > > > > > > #aggregate. I get that there's some precedent for > > > > > > > > > > > > > > 'distinct' > > > > > > > > > > > > > > specifically, > > > > > > > > > > > > > > but > > > > > > > > > > > > > > maybe something like 'deduplicate' would be more > > > > > > > > > > > > > > appropriate for > > > > > > > > > > > > > > the Streams > > > > > > > > > > > > > > API. > > > > > > > > > > > > > > > > > > > > > > > > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev > > > > > > > > > > > > > > <iponoma...@mail.ru.invalid> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Matthias, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your review! It made me think deeper, > > > > > > > > > > > > > > > and indeed I > > > > > > > > > > > > > > > understood > > > > > > > > > > > > > > > that I was missing some important details. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To simplify, let me explain my particular use > > > > > > > > > > > > > > > case first so I > > > > > > > > > > > > > > > can refer > > > > > > > > > > > > > > > to it later. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We have a system that collects information about > > > > > > > > > > > > > > > ongoing live > > > > > > > > > > > > > > > sporting > > > > > > > > > > > > > > > events from different sources. The information > > > > > > > > > > > > > > > sources have > > > > > > > > > > > > > > > their IDs > > > > > > > > > > > > > > > and these IDs are keys of the stream. Each source > > > > > > > > > > > > > > > emits messages > > > > > > > > > > > > > > > concerning sporting events, and we can have many > > > > > > > > > > > > > > > messages about > > > > > > > > > > > > > > > each > > > > > > > > > > > > > > > sporing event from each source. Event ID is > > > > > > > > > > > > > > > extracted from the > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We need a database of event IDs that were > > > > > > > > > > > > > > > reported at least once > > > > > > > > > > > > > > > by each > > > > > > > > > > > > > > > source (important: events from different sources > > > > > > > > > > > > > > > are considered > > > > > > > > > > > > > > > to be > > > > > > > > > > > > > > > different entities). The requirements are: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) each new event ID should be written to the > > > > > > > > > > > > > > > database as soon > > > > > > > > > > > > > > > as possible > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) although it's ok and sometimes even desired to > > > > > > > > > > > > > > > repeat the > > > > > > > > > > > > > > > notification about already known event ID, but we > > > > > > > > > > > > > > > wouldn’t like our > > > > > > > > > > > > > > > database to be bothered by the same event ID more > > > > > > > > > > > > > > > often than > > > > > > > > > > > > > > > once in a > > > > > > > > > > > > > > > given period of time (say, 15 minutes). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > With this example in mind let me answer your > > > > > > > > > > > > > > > questions > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) Using the `idExtractor` has the > > > > > > > > > > > > > > > issue that data might > > > > > > > > > > > > > > > not be > > > > > > > > > > > > > > > > co-partitioned as you mentioned in the > > > > > > > > > > > > > > > KIP. Thus, I am > > > > > > > > > > > > > > > wondering if it > > > > > > > > > > > > > > > > might be better to do deduplication only > > > > > > > > > > > > > > > on the key? If > > > > > > > > > > > > > > > one sets a new > > > > > > > > > > > > > > > > key upstream (ie, extracts the > > > > > > > > > > > > > > > deduplication id into the > > > > > > > > > > > > > > > key), the > > > > > > > > > > > > > > > > `distinct` operator could automatically > > > > > > > > > > > > > > > repartition the > > > > > > > > > > > > > > > data and thus we > > > > > > > > > > > > > > > > would avoid user errors. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Of course with 'key-only' deduplication + > > > > > > > > > > > > > > > autorepartitioning we > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > never cause problems with co-partitioning. But in > > > > > > > > > > > > > > > practice, we > > > > > > > > > > > > > > > often > > > > > > > > > > > > > > > don't need repartitioning even if 'dedup ID' is > > > > > > > > > > > > > > > different from > > > > > > > > > > > > > > > the key, > > > > > > > > > > > > > > > like in my example above. So here we have a sort > > > > > > > > > > > > > > > of 'performance vs > > > > > > > > > > > > > > > security' tradeoff. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The 'golden middle way' here can be the > > > > > > > > > > > > > > > following: we can form a > > > > > > > > > > > > > > > deduplication ID as KEY + separator + > > > > > > > > > > > > > > > idExtractor(VALUE). In case > > > > > > > > > > > > > > > idExtractor is not provided, we deduplicate by > > > > > > > > > > > > > > > key only (as in > > > > > > > > > > > > > > > original > > > > > > > > > > > > > > > proposal). Then idExtractor transforms only the > > > > > > > > > > > > > > > value (and not > > > > > > > > > > > > > > > the key) > > > > > > > > > > > > > > > and its result is appended to the key. Records > > > > > > > > > > > > > > > from different > > > > > > > > > > > > > > > partitions > > > > > > > > > > > > > > > will inherently have different deduplication IDs > > > > > > > > > > > > > > > and all the > > > > > > > > > > > > > > > data will > > > > > > > > > > > > > > > be co-partitioned. As with any stateful > > > > > > > > > > > > > > > operation, we will > > > > > > > > > > > > > > > repartition > > > > > > > > > > > > > > > the topic in case the key was changed upstream, > > > > > > > > > > > > > > > but only in this > > > > > > > > > > > > > > > case, > > > > > > > > > > > > > > > thus avoiding unnecessary repartitioning. My > > > > > > > > > > > > > > > example above fits > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > perfectly. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) What is the motivation for allowing > > > > > > > > > > > > > > > the `idExtractor` > > > > > > > > > > > > > > > to return > > > > > > > > > > > > > > > > `null`? Might be good to have some > > > > > > > > > > > > > > > use-case examples for > > > > > > > > > > > > > > > this feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can't think of any use-cases. As it often > > > > > > > > > > > > > > > happens, it's just > > > > > > > > > > > > > > > came with a > > > > > > > > > > > > > > > copy-paste from StackOverflow -- see Michael > > > > > > > > > > > > > > > Noll's answer here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > But, jokes aside, we'll have to decide what to do > > > > > > > > > > > > > > > with nulls. If we > > > > > > > > > > > > > > > accept the above proposal of having deduplication > > > > > > > > > > > > > > > ID as KEY + > > > > > > > > > > > > > > > postfix, > > > > > > > > > > > > > > > then null can be treated as no postfix at all. If > > > > > > > > > > > > > > > we don't > > > > > > > > > > > > > > > accept this > > > > > > > > > > > > > > > approach, then treating nulls as > > > > > > > > > > > > > > > 'no-deduplication' seems to be a > > > > > > > > > > > > > > > reasonable assumption (we can't get or put null > > > > > > > > > > > > > > > as a key to a KV > > > > > > > > > > > > > > > store, > > > > > > > > > > > > > > > so a record with null ID is always going to look > > > > > > > > > > > > > > > 'new' for us). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) Is using a `TimeWindow` really what > > > > > > > > > > > > > > > we want? I was > > > > > > > > > > > > > > > wondering if a > > > > > > > > > > > > > > > > `SlidingWindow` might be better? Or > > > > > > > > > > > > > > > maybe we need a new > > > > > > > > > > > > > > > type of window? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Agree. It's probably not what we want. Once I > > > > > > > > > > > > > > > thought that reusing > > > > > > > > > > > > > > > TimeWindow is a clever idea, now I don't. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Do we need epoch alignment in our use case? No, > > > > > > > > > > > > > > > we don't, and I > > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > know if anyone going to need this. Epoch > > > > > > > > > > > > > > > alignment is good for > > > > > > > > > > > > > > > aggregation, but deduplication is a different > > > > > > > > > > > > > > > story. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Let me describe the semantic the way I see it now > > > > > > > > > > > > > > > and tell me > > > > > > > > > > > > > > > what you > > > > > > > > > > > > > > > think: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - the only parameter that defines the > > > > > > > > > > > > > > > deduplication logic is > > > > > > > > > > > > > > > 'expiration > > > > > > > > > > > > > > > period' > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - when a deduplication ID arrives and we cannot > > > > > > > > > > > > > > > find it in the > > > > > > > > > > > > > > > store, we > > > > > > > > > > > > > > > forward the message downstream and store the ID + > > > > > > > > > > > > > > > its timestamp. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - when an out-of-order ID arrives with an older > > > > > > > > > > > > > > > timestamp and we > > > > > > > > > > > > > > > find a > > > > > > > > > > > > > > > 'fresher' record, we do nothing and don't forward > > > > > > > > > > > > > > > the message > > > > > > > > > > > > > > > (??? OR > > > > > > > > > > > > > > > NOT? In what case would we want to forward an > > > > > > > > > > > > > > > out-of-order > > > > > > > > > > > > > > > message?) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - when an ID with fresher timestamp arrives we > > > > > > > > > > > > > > > check if it falls > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > the expiration period and either forward it or > > > > > > > > > > > > > > > not, but in both > > > > > > > > > > > > > > > cases we > > > > > > > > > > > > > > > update the timestamp of the message in the store > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - the WindowStore retention mechanism should > > > > > > > > > > > > > > > clean up very old > > > > > > > > > > > > > > > records > > > > > > > > > > > > > > > in order not to run out of space. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (3) `isPersistent` -- instead of using > > > > > > > > > > > > > > > this flag, it seems > > > > > > > > > > > > > > > better to > > > > > > > > > > > > > > > > allow users to pass in a `Materialized` > > > > > > > > > > > > > > > parameter next to > > > > > > > > > > > > > > > > `DistinctParameters` to configure the > > > > > > > > > > > > > > > state store? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Fully agree! Users might also want to change the > > > > > > > > > > > > > > > retention time. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (4) I am wondering if we should really > > > > > > > > > > > > > > > have 4 overloads for > > > > > > > > > > > > > > > > `DistinctParameters.with()`? It might be > > > > > > > > > > > > > > > better to have > > > > > > > > > > > > > > > one overload > > > > > > > > > > > > > > > > with all require parameters, and add > > > > > > > > > > > > > > > optional parameters > > > > > > > > > > > > > > > using the > > > > > > > > > > > > > > > > builder pattern? This seems to follow > > > > > > > > > > > > > > > the DSL Grammer > > > > > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Oh, I can explain. We can't fully rely on the > > > > > > > > > > > > > > > builder pattern > > > > > > > > > > > > > > > because of > > > > > > > > > > > > > > > Java type inference limitations. We have to > > > > > > > > > > > > > > > provide type > > > > > > > > > > > > > > > parameters to > > > > > > > > > > > > > > > the builder methods or the code won't compile: > > > > > > > > > > > > > > > see e. g. this > > > > > > > > > > > > > > > https://twitter.com/inponomarev/status/1265053286933159938 > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > following > > > > > > > > > > > > > > > discussion with Tagir Valeev. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > When we came across the similar difficulties in > > > > > > > > > > > > > > > KIP-418, we finally > > > > > > > > > > > > > > > decided to add all the necessary overloads to > > > > > > > > > > > > > > > parameter class. > > > > > > > > > > > > > > > So I just > > > > > > > > > > > > > > > reproduced that approach here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (5) Even if it might be an > > > > > > > > > > > > > > > implementation detail (and > > > > > > > > > > > > > > > maybe the KIP > > > > > > > > > > > > > > > > itself does not need to mention it), can > > > > > > > > > > > > > > > you give a high > > > > > > > > > > > > > > > level overview > > > > > > > > > > > > > > > > how you intent to implement it (that > > > > > > > > > > > > > > > would be easier to > > > > > > > > > > > > > > > grog, compared > > > > > > > > > > > > > > > > to reading the PR). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Well as with any operation on KStreamImpl level > > > > > > > > > > > > > > > I'm building a > > > > > > > > > > > > > > > store and > > > > > > > > > > > > > > > a processor node. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > KStreamDistinct class is going to be the > > > > > > > > > > > > > > > ProcessorSupplier, with > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > logic regarding the forwarding/muting of the > > > > > > > > > > > > > > > records located in > > > > > > > > > > > > > > > KStreamDistinct.KStreamDistinctProcessor#process > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ---- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Matthias, if you are still reading this :-) a > > > > > > > > > > > > > > > gentle reminder: > > > > > > > > > > > > > > > my PR for > > > > > > > > > > > > > > > already accepted KIP-418 is still waiting for > > > > > > > > > > > > > > > your review. I > > > > > > > > > > > > > > > think it's > > > > > > > > > > > > > > > better for me to finalize at least one KIP > > > > > > > > > > > > > > > before proceeding to > > > > > > > > > > > > > > > a new > > > > > > > > > > > > > > > one :-) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ivan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 03.09.2020 4:20, Matthias J. Sax пишет: > > > > > > > > > > > > > > > > Thanks for the KIP Ivan. Having a built-in > > > > > > > > > > > > > > > > deduplication > > > > > > > > > > > > > > > > operator is for > > > > > > > > > > > > > > > > sure a good addition. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Couple of questions: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (1) Using the `idExtractor` has the issue that > > > > > > > > > > > > > > > > data might not be > > > > > > > > > > > > > > > > co-partitioned as you mentioned in the KIP. > > > > > > > > > > > > > > > > Thus, I am > > > > > > > > > > > > > > > > wondering if it > > > > > > > > > > > > > > > > might be better to do deduplication only on the > > > > > > > > > > > > > > > > key? If one > > > > > > > > > > > > > > > > sets a new > > > > > > > > > > > > > > > > key upstream (ie, extracts the deduplication id > > > > > > > > > > > > > > > > into the key), the > > > > > > > > > > > > > > > > `distinct` operator could automatically > > > > > > > > > > > > > > > > repartition the data > > > > > > > > > > > > > > > > and thus we > > > > > > > > > > > > > > > > would avoid user errors. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) What is the motivation for allowing the > > > > > > > > > > > > > > > > `idExtractor` to > > > > > > > > > > > > > > > > return > > > > > > > > > > > > > > > > `null`? Might be good to have some use-case > > > > > > > > > > > > > > > > examples for this > > > > > > > > > > > > > > > > feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2) Is using a `TimeWindow` really what we > > > > > > > > > > > > > > > > want? I was > > > > > > > > > > > > > > > > wondering if a > > > > > > > > > > > > > > > > `SlidingWindow` might be better? Or maybe we > > > > > > > > > > > > > > > > need a new type of > > > > > > > > > > > > > > > > window? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It would be helpful if you could describe > > > > > > > > > > > > > > > > potential use cases > > > > > > > > > > > > > > > > in more > > > > > > > > > > > > > > > > detail. -- I am mainly wondering about hopping > > > > > > > > > > > > > > > > window? Each > > > > > > > > > > > > > > > > record would > > > > > > > > > > > > > > > > always falls into multiple window and thus > > > > > > > > > > > > > > > > would be emitted > > > > > > > > > > > > > > > > multiple > > > > > > > > > > > > > > > > times, ie, each time the window closes. Is this > > > > > > > > > > > > > > > > really a valid > > > > > > > > > > > > > > > > use case? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems that for de-duplication, one wants to > > > > > > > > > > > > > > > > have some > > > > > > > > > > > > > > > > "expiration > > > > > > > > > > > > > > > > time", ie, for each ID, deduplicate all > > > > > > > > > > > > > > > > consecutive records > > > > > > > > > > > > > > > > with the > > > > > > > > > > > > > > > > same ID and emit the first record after the > > > > > > > > > > > > > > > > "expiration time" > > > > > > > > > > > > > > > > passed. In > > > > > > > > > > > > > > > > terms of a window, this would mean that the > > > > > > > > > > > > > > > > window starts at > > > > > > > > > > > > > > > > `r.ts` and > > > > > > > > > > > > > > > > ends at `r.ts + windowSize`, ie, the window is > > > > > > > > > > > > > > > > aligned to the > > > > > > > > > > > > > > > > data. > > > > > > > > > > > > > > > > TimeWindows are aligned to the epoch though. > > > > > > > > > > > > > > > > While > > > > > > > > > > > > > > > > `SlidingWindows` also > > > > > > > > > > > > > > > > align to the data, for the aggregation use-case > > > > > > > > > > > > > > > > they go > > > > > > > > > > > > > > > > backward in > > > > > > > > > > > > > > > > time, while we need a window that goes forward > > > > > > > > > > > > > > > > in time. It's an > > > > > > > > > > > > > > > > open > > > > > > > > > > > > > > > > question if we can re-purpose `SlidingWindows` > > > > > > > > > > > > > > > > -- it might be > > > > > > > > > > > > > > > > ok the > > > > > > > > > > > > > > > > make the alignment (into the past vs into the > > > > > > > > > > > > > > > > future) an operator > > > > > > > > > > > > > > > > dependent behavior? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (3) `isPersistent` -- instead of using this > > > > > > > > > > > > > > > > flag, it seems > > > > > > > > > > > > > > > > better to > > > > > > > > > > > > > > > > allow users to pass in a `Materialized` > > > > > > > > > > > > > > > > parameter next to > > > > > > > > > > > > > > > > `DistinctParameters` to configure the state > > > > > > > > > > > > > > > > store? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (4) I am wondering if we should really have 4 > > > > > > > > > > > > > > > > overloads for > > > > > > > > > > > > > > > > `DistinctParameters.with()`? It might be better > > > > > > > > > > > > > > > > to have one > > > > > > > > > > > > > > > > overload > > > > > > > > > > > > > > > > with all require parameters, and add optional > > > > > > > > > > > > > > > > parameters using the > > > > > > > > > > > > > > > > builder pattern? This seems to follow the DSL > > > > > > > > > > > > > > > > Grammer proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (5) Even if it might be an implementation > > > > > > > > > > > > > > > > detail (and maybe the > > > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > itself does not need to mention it), can you > > > > > > > > > > > > > > > > give a high level > > > > > > > > > > > > > > > > overview > > > > > > > > > > > > > > > > how you intent to implement it (that would be > > > > > > > > > > > > > > > > easier to grog, > > > > > > > > > > > > > > > > compared > > > > > > > > > > > > > > > > to reading the PR). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 8/23/20 4:29 PM, Ivan Ponomarev wrote: > > > > > > > > > > > > > > > > > Sorry, I forgot to add [DISCUSS] tag to the > > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 24.08.2020 2:27, Ivan Ponomarev пишет: > > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to start a discussion for KIP-655. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > KIP-655: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also opened a proof-of-concept PR for you > > > > > > > > > > > > > > > > > > to experiment > > > > > > > > > > > > > > > > > > with the API: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > PR#9210: > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/9210 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ivan Ponomarev > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >