Hi Bruno,

I had previously been thinking to use equals(), since I
thought that this might be a stateless operation. Comparing
the serialized form requires a serde and a fairly expensive
serialization operation, so while byte equality is superior
to equals(), we shouldn't use it in operations unless they
already require serialization.

I chnaged my mind when I later realized I had been mistaken,
and this operation is of course stateful. 

I hope this helps clarify it.

Thanks,
-John

On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:
> Hi Ivan and John,
> 
> 1. John, could you clarify why comparing serialized values seems the way 
> to go, now?
> 
> 2. Ivan, Could you please answer my questions that I posted earlier? I 
> will repost it here:
> Ivan, could you please make this matter a bit clearer in the KIP? 
> Actually, thinking about it again, I do currently not see why it should 
> not make sense in hopping windows. Regarding this, I do not understand 
> the following sentence:
> 
> "hopping and sliding windows do not make much sense for distinct() 
> because they produce multiple intersected windows, so that one record 
> can be multiplied instead of deduplication."
> 
> Ivan, what do you mean with "multiplied"?
> 
> 3. As I said earlier, I do not think that SQL and the Java Stream API 
> are good arguments to not use a verb. However, if everybody else is fine 
> with it, I can get behind it.
> 
> John, good catch about the missing overloads!
> BTW, the overload with Named should be there regardless of stateful or 
> stateless.
> 
> Best,
> Bruno
> 
> On 22.07.21 20:58, John Roesler wrote:
> > Hi Ivan,
> > 
> > Thanks for the reply.
> > 
> > 1. I think I might have gotten myself confused. I was
> > thinking of this operation as stateless, but now I'm not
> > sure what I was thinking... This operator has to be
> > stateful, right? In that case, I agree that comparing
> > serialized values seems to be way to do it.
> > 
> > 2. Thanks for the confirmation
> > 
> > 3. I continue to be satisfied to let you all hash it out.
> > 
> > Thanks,
> > -John
> > 
> > On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:
> > > Hi all,
> > > 
> > > 1. Actually I always thought about the serialized byte array only -- at
> > > least this is what local stores depend upon, and what Kafka itself
> > > depends upon when doing log compaction.
> > > 
> > > I can imagine a case where two different byte arrays deserialize to
> > > objects which are `equals` to each other. But I think we can ignore this
> > > for now because IMO the risks related to buggy `equals` implementations
> > > outweigh the potential benefits.
> > > 
> > > I will mention the duplicate definition in the KIP.
> > > 
> > > 2. I agree with John, he got my point.
> > > 
> > > 3. Let me gently insist on `distinct`. I believe that an exception to
> > > the rule is appropriate here, because the name `distinct()` is ubiquitous.
> > > 
> > > It's not only about Java Streams API (or .NET LINQ, which appeared
> > > earlier and also has `Distinct`): Spark's DataFrame has `distinct()`
> > > method, Hazelcast Jet has `distinct()` method, and I bet I can find more
> > > examples if I search. When we teach KStreams, we always say that
> > > KStreams are just like other streaming APIs, and they have roots in SQL
> > > queries. Users know what `distinct` is and they expect it to be in the 
> > > API.
> > > 
> > > 
> > > Regards,
> > > 
> > > Ivan
> > > 
> > > 
> > > 13.07.2021 0:10, John Roesler пишет:
> > > > Hi all,
> > > > 
> > > > Bruno raised some very good points. I’d like to chime in with 
> > > > additional context.
> > > > 
> > > > 1. Great point. We faced a similar problem defining KIP-557. For 557, 
> > > > we chose to use the serialized byte array instead of the equals() 
> > > > method, but I think the situation in KIP-655 is a bit different. I 
> > > > think it might make sense to use the equals() method here, but am 
> > > > curious what Ivan thinks.
> > > > 
> > > > 2. I figured we'd do nothing. I thought Ivan was just saying that it 
> > > > doesn't make a ton of sense to use it, which I agree with, but it 
> > > > doesn't seem like that means we should prohibit it.
> > > > 
> > > > 3. FWIW, I don't have a strong feeling either way.
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:
> > > > > Hi Ivan,
> > > > > 
> > > > > Thank you for the KIP!
> > > > > 
> > > > > Some aspects are not clear to me from the KIP and I have a proposal.
> > > > > 
> > > > > 1. The KIP does not describe the criteria that define a duplicate. 
> > > > > Could
> > > > > you add a definition of duplicate to the KIP?
> > > > > 
> > > > > 2. The KIP does not describe what happens if distinct() is applied on 
> > > > > a
> > > > > hopping window. On the DSL level, I do not see how you can avoid that
> > > > > users apply distinct() on a hopping window, i.e., you cannot avoid it 
> > > > > at
> > > > > compile time, you need to check it at runtime and throw an exception. 
> > > > > Is
> > > > > this correct or am I missing something?
> > > > > 
> > > > > 3. I would also like to back a proposal by Sophie. She proposed to use
> > > > > deduplicate() instead of distinct(), since the other DSL operations 
> > > > > are
> > > > > also verbs. I do not think that SQL and the Java Stream API are good
> > > > > arguments to not use a verb.
> > > > > 
> > > > > Best,
> > > > > Bruno
> > > > > 
> > > > > 
> > > > > On 10.07.21 19:11, John Roesler wrote:
> > > > > > Hi Ivan,
> > > > > > 
> > > > > > Sorry for the silence!
> > > > > > 
> > > > > > I have just re-read the proposal.
> > > > > > 
> > > > > > To summarize, you are now only proposing the zero-arg distict() 
> > > > > > method to be added to TimeWindowedKStream and 
> > > > > > SessionWindowedKStream, right?
> > > > > > 
> > > > > > I’m in favor of this proposal.
> > > > > > 
> > > > > > Thanks,
> > > > > > John
> > > > > > 
> > > > > > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
> > > > > > > Hello everyone,
> > > > > > > 
> > > > > > > I would like to remind you about KIP-655 and KIP-759 just in case 
> > > > > > > they
> > > > > > > got lost in your inbox.
> > > > > > > 
> > > > > > > Now the initial proposal is split into two independent and 
> > > > > > > smaller ones,
> > > > > > > so it must be easier to review them. Of course, if you have time.
> > > > > > > 
> > > > > > > Regards,
> > > > > > > 
> > > > > > > Ivan
> > > > > > > 
> > > > > > > 
> > > > > > > 24.06.2021 18:11, Ivan Ponomarev пишет:
> > > > > > > > Hello all,
> > > > > > > > 
> > > > > > > > I have rewritten the KIP-655 summarizing what was agreed upon 
> > > > > > > > during
> > > > > > > > this discussion (now the proposal is much simpler and less 
> > > > > > > > invasive).
> > > > > > > > 
> > > > > > > > I have also created KIP-759 (cancelRepartition operation) and 
> > > > > > > > started a
> > > > > > > > discussion for it.
> > > > > > > > 
> > > > > > > > Regards,
> > > > > > > > 
> > > > > > > > Ivan.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 04.06.2021 8:15, Matthias J. Sax пишет:
> > > > > > > > > Just skimmed over the thread -- first of all, I am glad that 
> > > > > > > > > we could
> > > > > > > > > merge KIP-418 and ship it :)
> > > > > > > > > 
> > > > > > > > > About the re-partitioning concerns, there are already two 
> > > > > > > > > tickets for it:
> > > > > > > > > 
> > > > > > > > >      - https://issues.apache.org/jira/browse/KAFKA-4835
> > > > > > > > >      - https://issues.apache.org/jira/browse/KAFKA-10844
> > > > > > > > > 
> > > > > > > > > Thus, it seems best to exclude this topic from this KIP, and 
> > > > > > > > > do a
> > > > > > > > > separate KIP for it (if necessary, we can "pause" this KIP 
> > > > > > > > > until the
> > > > > > > > > repartition KIP is done). It's a long standing "issue" and we 
> > > > > > > > > should
> > > > > > > > > resolve it in a general way I guess.
> > > > > > > > > 
> > > > > > > > > (Did not yet ready all responses in detail yet, so keeping 
> > > > > > > > > this comment
> > > > > > > > > short.)
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > -Matthias
> > > > > > > > > 
> > > > > > > > > On 6/2/21 6:35 AM, John Roesler wrote:
> > > > > > > > > > Thanks, Ivan!
> > > > > > > > > > 
> > > > > > > > > > That sounds like a great plan to me. Two smaller KIPs are 
> > > > > > > > > > easier to
> > > > > > > > > > agree on than one big one.
> > > > > > > > > > 
> > > > > > > > > > I agree hopping and sliding windows will actually have a 
> > > > > > > > > > duplicating
> > > > > > > > > > effect. We can avoid adding distinct() to the sliding window
> > > > > > > > > > interface, but hopping windows are just a different 
> > > > > > > > > > parameterization
> > > > > > > > > > of epoch-aligned windows. It seems we can’t do much about 
> > > > > > > > > > that except
> > > > > > > > > > document the issue.
> > > > > > > > > > 
> > > > > > > > > > Thanks,
> > > > > > > > > > John
> > > > > > > > > > 
> > > > > > > > > > On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> > > > > > > > > > > Hi John!
> > > > > > > > > > > 
> > > > > > > > > > > I think that your proposal is just fantastic, it 
> > > > > > > > > > > simplifies things a
> > > > > > > > > > > lot!
> > > > > > > > > > > 
> > > > > > > > > > > I also felt uncomfortable due to the fact that the 
> > > > > > > > > > > proposed
> > > > > > > > > > > `distinct()`
> > > > > > > > > > > is not somewhere near `count()` and `reduce(..)`. But
> > > > > > > > > > > `selectKey(..).groupByKey().windowedBy(..).distinct()` 
> > > > > > > > > > > didn't look like
> > > > > > > > > > > a correct option for  me because of the issue with the 
> > > > > > > > > > > unneeded
> > > > > > > > > > > repartitioning.
> > > > > > > > > > > 
> > > > > > > > > > > The bold idea that we can just CANCEL the repartitioning 
> > > > > > > > > > > didn't came to
> > > > > > > > > > > my mind.
> > > > > > > > > > > 
> > > > > > > > > > > What seemed to me a single problem is in fact two 
> > > > > > > > > > > unrelated problems:
> > > > > > > > > > > `distinct` operation and cancelling the unneeded 
> > > > > > > > > > > repartitioning.
> > > > > > > > > > > 
> > > > > > > > > > >      > what if we introduce a parameter to `selectKey()` 
> > > > > > > > > > > that specifies
> > > > > > > > > > > that
> > > > > > > > > > > the caller asserts that the new key does _not_ change the 
> > > > > > > > > > > data
> > > > > > > > > > > partitioning?
> > > > > > > > > > > 
> > > > > > > > > > > I think a more elegant solution would be not to add a new 
> > > > > > > > > > > parameter to
> > > > > > > > > > > `selectKey` and all the other key-changing operations 
> > > > > > > > > > > (`map`,
> > > > > > > > > > > `transform`, `flatMap`, ...), but add a new operator
> > > > > > > > > > > `KStream#cancelRepartitioning()` that resets 
> > > > > > > > > > > `keyChangingOperation`
> > > > > > > > > > > flag
> > > > > > > > > > > for the upstream node. Of course, "use it only if you 
> > > > > > > > > > > know what you're
> > > > > > > > > > > doing" warning is to be added. Well, it's a topic for a 
> > > > > > > > > > > separate KIP!
> > > > > > > > > > > 
> > > > > > > > > > > Concerning `distinct()`. If we use `XXXWindowedKStream` 
> > > > > > > > > > > facilities,
> > > > > > > > > > > then
> > > > > > > > > > > changes to the API are minimally invasive: we're just 
> > > > > > > > > > > adding
> > > > > > > > > > > `distinct()` to TimeWindowedKStream and 
> > > > > > > > > > > SessionWindowedKStream, and
> > > > > > > > > > > that's all.
> > > > > > > > > > > 
> > > > > > > > > > > We can now define `distinct` as an operation that returns 
> > > > > > > > > > > only a first
> > > > > > > > > > > record that falls into a new window, and filters out all 
> > > > > > > > > > > the other
> > > > > > > > > > > records that fall into an already existing window. BTW, 
> > > > > > > > > > > we can mock the
> > > > > > > > > > > behaviour of such an operation with `TopologyTestDriver` 
> > > > > > > > > > > using
> > > > > > > > > > > `reduce((l, r) -> STOP)`.filterNot((k, 
> > > > > > > > > > > v)->STOP.equals(v)).  ;-)
> > > > > > > > > > > 
> > > > > > > > > > > Consider the following example (record times are in 
> > > > > > > > > > > seconds):
> > > > > > > > > > > 
> > > > > > > > > > > //three bursts of variously ordered records
> > > > > > > > > > > 4, 5, 6
> > > > > > > > > > > 23, 22, 24
> > > > > > > > > > > 34, 33, 32
> > > > > > > > > > > //'late arrivals'
> > > > > > > > > > > 7, 22, 35
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 1. 'Epoch-aligned deduplication' using tumbling windows:
> > > > > > > > > > > 
> > > > > > > > > > > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > produces
> > > > > > > > > > > 
> > > > > > > > > > > (key@[00000/10000], 4)
> > > > > > > > > > > (key@[20000/30000], 23)
> > > > > > > > > > > (key@[30000/40000], 34)
> > > > > > > > > > > 
> > > > > > > > > > > -- that is, one record per epoch-aligned window.
> > > > > > > > > > > 
> > > > > > > > > > > 2. Hopping and sliding windows do not make much sense 
> > > > > > > > > > > here, because
> > > > > > > > > > > they
> > > > > > > > > > > produce multiple intersected windows, so that one record 
> > > > > > > > > > > can be
> > > > > > > > > > > multiplied, but we want deduplication.
> > > > > > > > > > > 
> > > > > > > > > > > 3. SessionWindows work for 'data-aligned deduplication'.
> > > > > > > > > > > 
> > > > > > > > > > > .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > produces only
> > > > > > > > > > > 
> > > > > > > > > > > ([key@4000/4000], 4)
> > > > > > > > > > > ([key@23000/23000], 23)
> > > > > > > > > > > 
> > > > > > > > > > > because all the records bigger than 7 are stuck together 
> > > > > > > > > > > in one
> > > > > > > > > > > session.
> > > > > > > > > > > Setting inactivity gap to 9 seconds will return three 
> > > > > > > > > > > records:
> > > > > > > > > > > 
> > > > > > > > > > > ([key@4000/4000], 4)
> > > > > > > > > > > ([key@23000/23000], 23)
> > > > > > > > > > > ([key@34000/34000], 34)
> > > > > > > > > > > 
> > > > > > > > > > > WDYT? If you like this variant, I will re-write KIP-655 
> > > > > > > > > > > and propose a
> > > > > > > > > > > separate KIP for `cancelRepartitioning` (or whatever name 
> > > > > > > > > > > we will
> > > > > > > > > > > choose
> > > > > > > > > > > for it).
> > > > > > > > > > > 
> > > > > > > > > > > Regards,
> > > > > > > > > > > 
> > > > > > > > > > > Ivan
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 24.05.2021 22:32, John Roesler пишет:
> > > > > > > > > > > > Hey there, Ivan!
> > > > > > > > > > > > 
> > > > > > > > > > > > In typical fashion, I'm going to make a somewhat 
> > > > > > > > > > > > outlandish
> > > > > > > > > > > > proposal. I'm hoping that we can side-step some of the
> > > > > > > > > > > > complications that have arisen. Please bear with me.
> > > > > > > > > > > > 
> > > > > > > > > > > > It seems like `distinct()` is not fundamentally unlike 
> > > > > > > > > > > > other windowed
> > > > > > > > > > > > "aggregation" operations. Your concern about unnecessary
> > > > > > > > > > > > repartitioning seems to apply just as well to `count()` 
> > > > > > > > > > > > as to
> > > > > > > > > > > > `distinct()`.
> > > > > > > > > > > > This has come up before, but I don't remember when: 
> > > > > > > > > > > > what if we
> > > > > > > > > > > > introduce a parameter to `selectKey()` that specifies 
> > > > > > > > > > > > that the caller
> > > > > > > > > > > > asserts that the new key does _not_ change the data 
> > > > > > > > > > > > partitioning?
> > > > > > > > > > > > The docs on that parameter would of course spell out 
> > > > > > > > > > > > all the "rights
> > > > > > > > > > > > and responsibilities" of setting it.
> > > > > > > > > > > > 
> > > > > > > > > > > > In that case, we could indeed get back to
> > > > > > > > > > > > `selectKey(A).windowBy(B).distinct(...)`, where we get 
> > > > > > > > > > > > to compose the
> > > > > > > > > > > > key mapper and the windowing function without having to 
> > > > > > > > > > > > carve out
> > > > > > > > > > > > a separate domain just for `distinct()`. All the rest 
> > > > > > > > > > > > of the KStream
> > > > > > > > > > > > operations would also benefit.
> > > > > > > > > > > > 
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > > 
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > John
> > > > > > > > > > > > 
> > > > > > > > > > > > On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> > > > > > > > > > > > > Hello everyone,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > let me revive the discussion for KIP-655. Now I have 
> > > > > > > > > > > > > some time
> > > > > > > > > > > > > again and
> > > > > > > > > > > > > I'm eager to finalize this.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Based on what was already discussed, I think that we 
> > > > > > > > > > > > > can split the
> > > > > > > > > > > > > discussion into three topics for our convenience.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > The three topics are:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > - idExtractor  (how should we extract the 
> > > > > > > > > > > > > deduplication key for
> > > > > > > > > > > > > the record)
> > > > > > > > > > > > > 
> > > > > > > > > > > > > - timeWindows (what time windows should we use)
> > > > > > > > > > > > > 
> > > > > > > > > > > > > - miscellaneous (naming etc.)
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ---- idExtractor ----
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Original proposal: use (k, v) -> f(k, v) mapper, 
> > > > > > > > > > > > > defaulting to (k,
> > > > > > > > > > > > > v) ->
> > > > > > > > > > > > > k.  The drawback here is that we must warn the user 
> > > > > > > > > > > > > to choose such a
> > > > > > > > > > > > > function that sets different IDs for records from 
> > > > > > > > > > > > > different
> > > > > > > > > > > > > partitions,
> > > > > > > > > > > > > otherwise same IDs might be not co-partitioned (and 
> > > > > > > > > > > > > not
> > > > > > > > > > > > > deduplicated as
> > > > > > > > > > > > > a result). Additional concern: what should we do when 
> > > > > > > > > > > > > this function
> > > > > > > > > > > > > returns null?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Matthias proposed key-only deduplication: that is, no 
> > > > > > > > > > > > > idExtractor at
> > > > > > > > > > > > > all, and if we want to use `distinct` for a 
> > > > > > > > > > > > > particular identifier, we
> > > > > > > > > > > > > must `selectKey()` before. The drawback of this 
> > > > > > > > > > > > > approach is that
> > > > > > > > > > > > > we will
> > > > > > > > > > > > > always have repartitioning after the key selection, 
> > > > > > > > > > > > > while in practice
> > > > > > > > > > > > > repartitioning will not always be necessary (for 
> > > > > > > > > > > > > example, when the
> > > > > > > > > > > > > data
> > > > > > > > > > > > > stream is such that different values infer different 
> > > > > > > > > > > > > keys).
> > > > > > > > > > > > > 
> > > > > > > > > > > > > So here we have a 'safety vs. performance' trade-off. 
> > > > > > > > > > > > > But 'safe'
> > > > > > > > > > > > > variant
> > > > > > > > > > > > > is also not very convenient for developers, since 
> > > > > > > > > > > > > we're forcing
> > > > > > > > > > > > > them to
> > > > > > > > > > > > > change the structure of their records.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > A 'golden mean' here might be using composite ID with 
> > > > > > > > > > > > > its first
> > > > > > > > > > > > > component equals to k and its second component equals 
> > > > > > > > > > > > > to some f(v) (f
> > > > > > > > > > > > > defaults to v -> null, and null value returned by 
> > > > > > > > > > > > > f(v) means
> > > > > > > > > > > > > 'deduplicate by the key only'). The nuance here is 
> > > > > > > > > > > > > that we will have
> > > > > > > > > > > > > serializers only for types of k and f(v), and we must 
> > > > > > > > > > > > > correctly
> > > > > > > > > > > > > serialize a tuple (k, f(v)), but of course this is 
> > > > > > > > > > > > > doable.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > What do you think?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ---- timeWindows ----
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Originally I proposed TimeWindows only just because 
> > > > > > > > > > > > > they solved my
> > > > > > > > > > > > > particular case :-) but agree with Matthias' and 
> > > > > > > > > > > > > Sophie's objections.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I like the Sophie's point: we need both epoch-aligned 
> > > > > > > > > > > > > and
> > > > > > > > > > > > > data-aligned
> > > > > > > > > > > > > windows. IMO this is absolutely correct: 
> > > > > > > > > > > > > "data-aligned is useful for
> > > > > > > > > > > > > example when you know that a large number of updates 
> > > > > > > > > > > > > to a single key
> > > > > > > > > > > > > will occur in short bursts, and epoch-aligned when you
> > > > > > > > > > > > > specifically want
> > > > > > > > > > > > > to get just a single update per discrete time 
> > > > > > > > > > > > > interval."
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I just cannot agree right away with Sophie's
> > > > > > > > > > > > > .groupByKey().windowedBy(...).distinct() proposal, as 
> > > > > > > > > > > > > it implies  the
> > > > > > > > > > > > > key-only deduplication -- see the previous topic.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Epoch-aligned windows are very simple: they should 
> > > > > > > > > > > > > forward only one
> > > > > > > > > > > > > record per enumerated time window. TimeWindows are 
> > > > > > > > > > > > > exactly what we
> > > > > > > > > > > > > want
> > > > > > > > > > > > > here. I mentioned in the KIP both tumbling and 
> > > > > > > > > > > > > hopping windows just
> > > > > > > > > > > > > because both are possible for TimeWindows, but indeed 
> > > > > > > > > > > > > I don't see any
> > > > > > > > > > > > > real use case for hopping windows, only tumbling 
> > > > > > > > > > > > > windows make
> > > > > > > > > > > > > sence IMO.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > For data-aligned windows SlidingWindow interface 
> > > > > > > > > > > > > seems to be a nearly
> > > > > > > > > > > > > valid choice. Nearly. It should forward a record once 
> > > > > > > > > > > > > when it's first
> > > > > > > > > > > > > seen, and then not again for any identical records 
> > > > > > > > > > > > > that fall into the
> > > > > > > > > > > > > next N timeUnits.  However, we cannot reuse 
> > > > > > > > > > > > > SlidingWindow as is,
> > > > > > > > > > > > > because
> > > > > > > > > > > > > just as Matthias noted, SlidingWindows go backward in 
> > > > > > > > > > > > > time, while we
> > > > > > > > > > > > > need a windows that go forward in time, and are not 
> > > > > > > > > > > > > opened while
> > > > > > > > > > > > > records
> > > > > > > > > > > > > fall into an already existing window. We definitely 
> > > > > > > > > > > > > should make
> > > > > > > > > > > > > our own
> > > > > > > > > > > > > implementation, maybe we should call it 
> > > > > > > > > > > > > ExpirationWindow? WDYT?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ---- miscellaneous ----
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Persistent/in-memory stores. Matthias proposed to 
> > > > > > > > > > > > > pass Materialized
> > > > > > > > > > > > > parameter next to DistinctParameters (and this is 
> > > > > > > > > > > > > necessary,
> > > > > > > > > > > > > because we
> > > > > > > > > > > > > will need to provide a serializer for extracted id). 
> > > > > > > > > > > > > This is
> > > > > > > > > > > > > absolutely
> > > > > > > > > > > > > valid point, I agree and I will fix it in the KIP.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Naming. Sophie noted that the Streams DSL operators 
> > > > > > > > > > > > > are typically
> > > > > > > > > > > > > named
> > > > > > > > > > > > > as verbs, so she proposes `deduplicate` in favour of 
> > > > > > > > > > > > > `distinct`. I
> > > > > > > > > > > > > think
> > > > > > > > > > > > > that while it's important to stick to the naming 
> > > > > > > > > > > > > conventions, it
> > > > > > > > > > > > > is also
> > > > > > > > > > > > > important to think of the experience of those who 
> > > > > > > > > > > > > come from different
> > > > > > > > > > > > > stacks/technologies. People who are familiar with SQL 
> > > > > > > > > > > > > and Java
> > > > > > > > > > > > > Streams
> > > > > > > > > > > > > API must know for sure what does 'distinct' mean, 
> > > > > > > > > > > > > while data
> > > > > > > > > > > > > deduplication in general is a more complex task and 
> > > > > > > > > > > > > thus
> > > > > > > > > > > > > `deduplicate`
> > > > > > > > > > > > > might be misleading. But I'm ready to be convinced if 
> > > > > > > > > > > > > the majority
> > > > > > > > > > > > > thinks otherwise.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Ivan
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> > > > > > > > > > > > > > Hey all,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I'm not convinced either epoch-aligned or 
> > > > > > > > > > > > > > data-aligned will fit all
> > > > > > > > > > > > > > possible use cases.
> > > > > > > > > > > > > > Both seem totally reasonable to me: data-aligned is 
> > > > > > > > > > > > > > useful for
> > > > > > > > > > > > > > example when
> > > > > > > > > > > > > > you know
> > > > > > > > > > > > > > that a large number of updates to a single key will 
> > > > > > > > > > > > > > occur in
> > > > > > > > > > > > > > short bursts,
> > > > > > > > > > > > > > and epoch-
> > > > > > > > > > > > > > aligned when you specifically want to get just a 
> > > > > > > > > > > > > > single update
> > > > > > > > > > > > > > per discrete
> > > > > > > > > > > > > > time
> > > > > > > > > > > > > > interval.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Going a step further, though, what if you want just 
> > > > > > > > > > > > > > a single
> > > > > > > > > > > > > > update per
> > > > > > > > > > > > > > calendar
> > > > > > > > > > > > > > month, or per year with accounting for leap years? 
> > > > > > > > > > > > > > Neither of
> > > > > > > > > > > > > > those are
> > > > > > > > > > > > > > serviced that
> > > > > > > > > > > > > > well by the existing Windows specification to 
> > > > > > > > > > > > > > windowed
> > > > > > > > > > > > > > aggregations, a
> > > > > > > > > > > > > > well-known
> > > > > > > > > > > > > > limitation of the current API. There is actually a 
> > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > going
> > > > > > > > > > > > > > on in parallel to fix this
> > > > > > > > > > > > > > exact issue and make the windowing interface much 
> > > > > > > > > > > > > > more flexible.
> > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > instead
> > > > > > > > > > > > > > of re-implementing this windowing interface in a 
> > > > > > > > > > > > > > similarly
> > > > > > > > > > > > > > limited fashion
> > > > > > > > > > > > > > for the
> > > > > > > > > > > > > > Distinct operator, we could leverage it here and 
> > > > > > > > > > > > > > get all the
> > > > > > > > > > > > > > benefits
> > > > > > > > > > > > > > coming with
> > > > > > > > > > > > > > KIP-645.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Specifically, I'm proposing to remove the 
> > > > > > > > > > > > > > TimeWindows/etc config
> > > > > > > > > > > > > > from the
> > > > > > > > > > > > > > DistinctParameters class, and move the distinct() 
> > > > > > > > > > > > > > method from the
> > > > > > > > > > > > > > KStream
> > > > > > > > > > > > > > interface
> > > > > > > > > > > > > > to the TimeWindowedKStream interface. Since it's 
> > > > > > > > > > > > > > semantically
> > > > > > > > > > > > > > similar to a
> > > > > > > > > > > > > > kind of
> > > > > > > > > > > > > > windowed aggregation, it makes sense to align it 
> > > > > > > > > > > > > > with the
> > > > > > > > > > > > > > existing windowing
> > > > > > > > > > > > > > framework, ie:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > inputStream
> > > > > > > > > > > > > >           .groupKyKey()
> > > > > > > > > > > > > >           .windowedBy()
> > > > > > > > > > > > > >           .distinct()
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Then we could use data-aligned windows if 
> > > > > > > > > > > > > > SlidingWindows is
> > > > > > > > > > > > > > specified in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > windowedBy(), and epoch-aligned (or some other kind 
> > > > > > > > > > > > > > of enumerable
> > > > > > > > > > > > > > window)
> > > > > > > > > > > > > > if a Windows is specified in windowedBy() (or an
> > > > > > > > > > > > > > EnumerableWindowDefinition
> > > > > > > > > > > > > > once KIP-645 is implemented to replace Windows).
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > *SlidingWindows*: should forward a record once when 
> > > > > > > > > > > > > > it's first
> > > > > > > > > > > > > > seen, and
> > > > > > > > > > > > > > then not again
> > > > > > > > > > > > > > for any identical records that fall into the next N 
> > > > > > > > > > > > > > timeUnits. This
> > > > > > > > > > > > > > includes out-of-order
> > > > > > > > > > > > > > records, ie if you have a SlidingWindows of size 
> > > > > > > > > > > > > > 10s and process
> > > > > > > > > > > > > > records at
> > > > > > > > > > > > > > time
> > > > > > > > > > > > > > 15s, 20s, 14s then you would just forward the one 
> > > > > > > > > > > > > > at 15s.
> > > > > > > > > > > > > > Presumably, if
> > > > > > > > > > > > > > you're
> > > > > > > > > > > > > > using SlidingWindows, you don't care about what 
> > > > > > > > > > > > > > falls into exact
> > > > > > > > > > > > > > time
> > > > > > > > > > > > > > boxes, you just
> > > > > > > > > > > > > > want to deduplicate. If you do care about exact 
> > > > > > > > > > > > > > time boxing then
> > > > > > > > > > > > > > you should
> > > > > > > > > > > > > > use...
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > *EnumerableWindowDefinition* (eg *TimeWindows*): 
> > > > > > > > > > > > > > should forward
> > > > > > > > > > > > > > only one
> > > > > > > > > > > > > > record
> > > > > > > > > > > > > > per enumerated time window. If you get a records at 
> > > > > > > > > > > > > > 15s, 20s,14s
> > > > > > > > > > > > > > where the
> > > > > > > > > > > > > > windows
> > > > > > > > > > > > > > are enumerated at [5,14], [15, 24], etc then you 
> > > > > > > > > > > > > > forward the
> > > > > > > > > > > > > > record at 15s
> > > > > > > > > > > > > > and also
> > > > > > > > > > > > > > the record at 14s
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Just an idea: not sure if the impedance mismatch 
> > > > > > > > > > > > > > would throw
> > > > > > > > > > > > > > users off
> > > > > > > > > > > > > > since the
> > > > > > > > > > > > > > semantics of the distinct windows are slightly 
> > > > > > > > > > > > > > different than in the
> > > > > > > > > > > > > > aggregations.
> > > > > > > > > > > > > > But if we don't fit this into the existing windowed 
> > > > > > > > > > > > > > framework,
> > > > > > > > > > > > > > then we
> > > > > > > > > > > > > > shouldn't use
> > > > > > > > > > > > > > any existing Windows-type classes at all, imo. ie 
> > > > > > > > > > > > > > we should
> > > > > > > > > > > > > > create a new
> > > > > > > > > > > > > > DistinctWindows config class, similar to how 
> > > > > > > > > > > > > > stream-stream joins
> > > > > > > > > > > > > > get their
> > > > > > > > > > > > > > own
> > > > > > > > > > > > > > JoinWindows class
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I also think that non-windowed deduplication could 
> > > > > > > > > > > > > > be useful, in
> > > > > > > > > > > > > > which case
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > would want to also have the distinct() operator on 
> > > > > > > > > > > > > > the KStream
> > > > > > > > > > > > > > interface.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > One quick note regarding the naming: it seems like 
> > > > > > > > > > > > > > the Streams
> > > > > > > > > > > > > > DSL operators
> > > > > > > > > > > > > > are typically named as verbs rather than 
> > > > > > > > > > > > > > adjectives, for example.
> > > > > > > > > > > > > > #suppress
> > > > > > > > > > > > > > or
> > > > > > > > > > > > > > #aggregate. I get that there's some precedent for  
> > > > > > > > > > > > > > 'distinct'
> > > > > > > > > > > > > > specifically,
> > > > > > > > > > > > > > but
> > > > > > > > > > > > > > maybe something like 'deduplicate' would be more 
> > > > > > > > > > > > > > appropriate for
> > > > > > > > > > > > > > the Streams
> > > > > > > > > > > > > > API.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > WDYT?
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
> > > > > > > > > > > > > > <iponoma...@mail.ru.invalid>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Hi Matthias,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thanks for your review! It made me think deeper, 
> > > > > > > > > > > > > > > and indeed I
> > > > > > > > > > > > > > > understood
> > > > > > > > > > > > > > > that I was missing some important details.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > To simplify, let me explain my particular use 
> > > > > > > > > > > > > > > case first so I
> > > > > > > > > > > > > > > can refer
> > > > > > > > > > > > > > > to it later.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > We have a system that collects information about 
> > > > > > > > > > > > > > > ongoing live
> > > > > > > > > > > > > > > sporting
> > > > > > > > > > > > > > > events from different sources. The information 
> > > > > > > > > > > > > > > sources have
> > > > > > > > > > > > > > > their IDs
> > > > > > > > > > > > > > > and these IDs are keys of the stream. Each source 
> > > > > > > > > > > > > > > emits messages
> > > > > > > > > > > > > > > concerning sporting events, and we can have many 
> > > > > > > > > > > > > > > messages about
> > > > > > > > > > > > > > > each
> > > > > > > > > > > > > > > sporing event from each source. Event ID is 
> > > > > > > > > > > > > > > extracted from the
> > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > We need a database of event IDs that were 
> > > > > > > > > > > > > > > reported at least once
> > > > > > > > > > > > > > > by each
> > > > > > > > > > > > > > > source (important: events from different sources 
> > > > > > > > > > > > > > > are considered
> > > > > > > > > > > > > > > to be
> > > > > > > > > > > > > > > different entities). The requirements are:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 1) each new event ID should be written to the 
> > > > > > > > > > > > > > > database as soon
> > > > > > > > > > > > > > > as possible
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 2) although it's ok and sometimes even desired to 
> > > > > > > > > > > > > > > repeat the
> > > > > > > > > > > > > > > notification about already known event ID, but we 
> > > > > > > > > > > > > > > wouldn’t like our
> > > > > > > > > > > > > > > database to be bothered by the same event ID more 
> > > > > > > > > > > > > > > often than
> > > > > > > > > > > > > > > once in a
> > > > > > > > > > > > > > > given period of time (say, 15 minutes).
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > With this example in mind let me answer your 
> > > > > > > > > > > > > > > questions
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (1) Using the `idExtractor` has the 
> > > > > > > > > > > > > > > issue that data might
> > > > > > > > > > > > > > > not be
> > > > > > > > > > > > > > >        > co-partitioned as you mentioned in the 
> > > > > > > > > > > > > > > KIP. Thus, I am
> > > > > > > > > > > > > > > wondering if it
> > > > > > > > > > > > > > >        > might be better to do deduplication only 
> > > > > > > > > > > > > > > on the key? If
> > > > > > > > > > > > > > > one sets a new
> > > > > > > > > > > > > > >        > key upstream (ie, extracts the 
> > > > > > > > > > > > > > > deduplication id into the
> > > > > > > > > > > > > > > key), the
> > > > > > > > > > > > > > >        > `distinct` operator could automatically 
> > > > > > > > > > > > > > > repartition the
> > > > > > > > > > > > > > > data and thus we
> > > > > > > > > > > > > > >        > would avoid user errors.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Of course with 'key-only' deduplication + 
> > > > > > > > > > > > > > > autorepartitioning we
> > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > never cause problems with co-partitioning. But in 
> > > > > > > > > > > > > > > practice, we
> > > > > > > > > > > > > > > often
> > > > > > > > > > > > > > > don't need repartitioning even if 'dedup ID' is 
> > > > > > > > > > > > > > > different from
> > > > > > > > > > > > > > > the key,
> > > > > > > > > > > > > > > like in my example above. So here we have a sort 
> > > > > > > > > > > > > > > of 'performance vs
> > > > > > > > > > > > > > > security' tradeoff.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > The 'golden middle way' here can be the 
> > > > > > > > > > > > > > > following: we can form a
> > > > > > > > > > > > > > > deduplication ID as KEY + separator + 
> > > > > > > > > > > > > > > idExtractor(VALUE). In case
> > > > > > > > > > > > > > > idExtractor is not provided, we deduplicate by 
> > > > > > > > > > > > > > > key only (as in
> > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > proposal). Then idExtractor transforms only the 
> > > > > > > > > > > > > > > value (and not
> > > > > > > > > > > > > > > the key)
> > > > > > > > > > > > > > > and its result is appended to the key. Records 
> > > > > > > > > > > > > > > from different
> > > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > > will inherently have different deduplication IDs 
> > > > > > > > > > > > > > > and all the
> > > > > > > > > > > > > > > data will
> > > > > > > > > > > > > > > be co-partitioned. As with any stateful 
> > > > > > > > > > > > > > > operation, we will
> > > > > > > > > > > > > > > repartition
> > > > > > > > > > > > > > > the topic in case the key was changed upstream, 
> > > > > > > > > > > > > > > but only in this
> > > > > > > > > > > > > > > case,
> > > > > > > > > > > > > > > thus avoiding unnecessary repartitioning. My 
> > > > > > > > > > > > > > > example above fits
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > perfectly.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (2) What is the motivation for allowing 
> > > > > > > > > > > > > > > the `idExtractor`
> > > > > > > > > > > > > > > to return
> > > > > > > > > > > > > > >        > `null`? Might be good to have some 
> > > > > > > > > > > > > > > use-case examples for
> > > > > > > > > > > > > > > this feature.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Can't think of any use-cases. As it often 
> > > > > > > > > > > > > > > happens, it's just
> > > > > > > > > > > > > > > came with a
> > > > > > > > > > > > > > > copy-paste from StackOverflow -- see Michael 
> > > > > > > > > > > > > > > Noll's answer here:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > But, jokes aside, we'll have to decide what to do 
> > > > > > > > > > > > > > > with nulls. If we
> > > > > > > > > > > > > > > accept the above proposal of having deduplication 
> > > > > > > > > > > > > > > ID as KEY +
> > > > > > > > > > > > > > > postfix,
> > > > > > > > > > > > > > > then null can be treated as no postfix at all. If 
> > > > > > > > > > > > > > > we don't
> > > > > > > > > > > > > > > accept this
> > > > > > > > > > > > > > > approach, then treating nulls as 
> > > > > > > > > > > > > > > 'no-deduplication' seems to be a
> > > > > > > > > > > > > > > reasonable assumption (we can't get or put null 
> > > > > > > > > > > > > > > as a key to a KV
> > > > > > > > > > > > > > > store,
> > > > > > > > > > > > > > > so a record with null ID is always going to look 
> > > > > > > > > > > > > > > 'new' for us).
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (2) Is using a `TimeWindow` really what 
> > > > > > > > > > > > > > > we want? I was
> > > > > > > > > > > > > > > wondering if a
> > > > > > > > > > > > > > >        > `SlidingWindow` might be better? Or 
> > > > > > > > > > > > > > > maybe we need a new
> > > > > > > > > > > > > > > type of window?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Agree. It's probably not what we want. Once I 
> > > > > > > > > > > > > > > thought that reusing
> > > > > > > > > > > > > > > TimeWindow is a clever idea, now I don't.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Do we need epoch alignment in our use case? No, 
> > > > > > > > > > > > > > > we don't, and I
> > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > know if anyone going to need this. Epoch 
> > > > > > > > > > > > > > > alignment is good for
> > > > > > > > > > > > > > > aggregation, but deduplication is a different 
> > > > > > > > > > > > > > > story.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Let me describe the semantic the way I see it now 
> > > > > > > > > > > > > > > and tell me
> > > > > > > > > > > > > > > what you
> > > > > > > > > > > > > > > think:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > - the only parameter that defines the 
> > > > > > > > > > > > > > > deduplication logic is
> > > > > > > > > > > > > > > 'expiration
> > > > > > > > > > > > > > > period'
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > - when a deduplication ID arrives and we cannot 
> > > > > > > > > > > > > > > find it in the
> > > > > > > > > > > > > > > store, we
> > > > > > > > > > > > > > > forward the message downstream and store the ID + 
> > > > > > > > > > > > > > > its timestamp.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > - when an out-of-order ID arrives with an older 
> > > > > > > > > > > > > > > timestamp and we
> > > > > > > > > > > > > > > find a
> > > > > > > > > > > > > > > 'fresher' record, we do nothing and don't forward 
> > > > > > > > > > > > > > > the message
> > > > > > > > > > > > > > > (??? OR
> > > > > > > > > > > > > > > NOT? In what case would we want to forward an 
> > > > > > > > > > > > > > > out-of-order
> > > > > > > > > > > > > > > message?)
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > - when an ID with fresher timestamp arrives we 
> > > > > > > > > > > > > > > check if it falls
> > > > > > > > > > > > > > > into
> > > > > > > > > > > > > > > the expiration period and either forward it or 
> > > > > > > > > > > > > > > not, but in both
> > > > > > > > > > > > > > > cases we
> > > > > > > > > > > > > > > update the timestamp of the message in the store
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > - the WindowStore retention mechanism should 
> > > > > > > > > > > > > > > clean up very old
> > > > > > > > > > > > > > > records
> > > > > > > > > > > > > > > in order not to run out of space.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (3) `isPersistent` -- instead of using 
> > > > > > > > > > > > > > > this flag, it seems
> > > > > > > > > > > > > > > better to
> > > > > > > > > > > > > > >        > allow users to pass in a `Materialized` 
> > > > > > > > > > > > > > > parameter next to
> > > > > > > > > > > > > > >        > `DistinctParameters` to configure the 
> > > > > > > > > > > > > > > state store?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Fully agree! Users might also want to change the 
> > > > > > > > > > > > > > > retention time.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (4) I am wondering if we should really 
> > > > > > > > > > > > > > > have 4 overloads for
> > > > > > > > > > > > > > >        > `DistinctParameters.with()`? It might be 
> > > > > > > > > > > > > > > better to have
> > > > > > > > > > > > > > > one overload
> > > > > > > > > > > > > > >        > with all require parameters, and add 
> > > > > > > > > > > > > > > optional parameters
> > > > > > > > > > > > > > > using the
> > > > > > > > > > > > > > >        > builder pattern? This seems to follow 
> > > > > > > > > > > > > > > the DSL Grammer
> > > > > > > > > > > > > > > proposal.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Oh, I can explain. We can't fully rely on the 
> > > > > > > > > > > > > > > builder pattern
> > > > > > > > > > > > > > > because of
> > > > > > > > > > > > > > > Java type inference limitations. We have to 
> > > > > > > > > > > > > > > provide type
> > > > > > > > > > > > > > > parameters to
> > > > > > > > > > > > > > > the builder methods or the code won't compile: 
> > > > > > > > > > > > > > > see e. g. this
> > > > > > > > > > > > > > > https://twitter.com/inponomarev/status/1265053286933159938
> > > > > > > > > > > > > > >  and
> > > > > > > > > > > > > > > following
> > > > > > > > > > > > > > > discussion with Tagir Valeev.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > When we came across the similar difficulties in 
> > > > > > > > > > > > > > > KIP-418, we finally
> > > > > > > > > > > > > > > decided to add all the necessary overloads to 
> > > > > > > > > > > > > > > parameter class.
> > > > > > > > > > > > > > > So I just
> > > > > > > > > > > > > > > reproduced that approach here.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > >        > (5) Even if it might be an 
> > > > > > > > > > > > > > > implementation detail (and
> > > > > > > > > > > > > > > maybe the KIP
> > > > > > > > > > > > > > >        > itself does not need to mention it), can 
> > > > > > > > > > > > > > > you give a high
> > > > > > > > > > > > > > > level overview
> > > > > > > > > > > > > > >        > how you intent to implement it (that 
> > > > > > > > > > > > > > > would be easier to
> > > > > > > > > > > > > > > grog, compared
> > > > > > > > > > > > > > >        > to reading the PR).
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Well as with any operation on KStreamImpl level 
> > > > > > > > > > > > > > > I'm building a
> > > > > > > > > > > > > > > store and
> > > > > > > > > > > > > > > a processor node.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > KStreamDistinct class is going to be the 
> > > > > > > > > > > > > > > ProcessorSupplier, with
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > logic regarding the forwarding/muting of the 
> > > > > > > > > > > > > > > records located in
> > > > > > > > > > > > > > > KStreamDistinct.KStreamDistinctProcessor#process
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > ----
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Matthias, if you are still reading this :-) a 
> > > > > > > > > > > > > > > gentle reminder:
> > > > > > > > > > > > > > > my PR for
> > > > > > > > > > > > > > > already accepted KIP-418 is still waiting for 
> > > > > > > > > > > > > > > your review. I
> > > > > > > > > > > > > > > think it's
> > > > > > > > > > > > > > > better for me to finalize at least one  KIP 
> > > > > > > > > > > > > > > before proceeding to
> > > > > > > > > > > > > > > a new
> > > > > > > > > > > > > > > one :-)
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Ivan
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 03.09.2020 4:20, Matthias J. Sax пишет:
> > > > > > > > > > > > > > > > Thanks for the KIP Ivan. Having a built-in 
> > > > > > > > > > > > > > > > deduplication
> > > > > > > > > > > > > > > > operator is for
> > > > > > > > > > > > > > > > sure a good addition.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Couple of questions:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (1) Using the `idExtractor` has the issue that 
> > > > > > > > > > > > > > > > data might not be
> > > > > > > > > > > > > > > > co-partitioned as you mentioned in the KIP. 
> > > > > > > > > > > > > > > > Thus, I am
> > > > > > > > > > > > > > > > wondering if it
> > > > > > > > > > > > > > > > might be better to do deduplication only on the 
> > > > > > > > > > > > > > > > key? If one
> > > > > > > > > > > > > > > > sets a new
> > > > > > > > > > > > > > > > key upstream (ie, extracts the deduplication id 
> > > > > > > > > > > > > > > > into the key), the
> > > > > > > > > > > > > > > > `distinct` operator could automatically 
> > > > > > > > > > > > > > > > repartition the data
> > > > > > > > > > > > > > > > and thus we
> > > > > > > > > > > > > > > > would avoid user errors.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (2) What is the motivation for allowing the 
> > > > > > > > > > > > > > > > `idExtractor` to
> > > > > > > > > > > > > > > > return
> > > > > > > > > > > > > > > > `null`? Might be good to have some use-case 
> > > > > > > > > > > > > > > > examples for this
> > > > > > > > > > > > > > > > feature.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (2) Is using a `TimeWindow` really what we 
> > > > > > > > > > > > > > > > want? I was
> > > > > > > > > > > > > > > > wondering if a
> > > > > > > > > > > > > > > > `SlidingWindow` might be better? Or maybe we 
> > > > > > > > > > > > > > > > need a new type of
> > > > > > > > > > > > > > > > window?
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > It would be helpful if you could describe 
> > > > > > > > > > > > > > > > potential use cases
> > > > > > > > > > > > > > > > in more
> > > > > > > > > > > > > > > > detail. -- I am mainly wondering about hopping 
> > > > > > > > > > > > > > > > window? Each
> > > > > > > > > > > > > > > > record would
> > > > > > > > > > > > > > > > always falls into multiple window and thus 
> > > > > > > > > > > > > > > > would be emitted
> > > > > > > > > > > > > > > > multiple
> > > > > > > > > > > > > > > > times, ie, each time the window closes. Is this 
> > > > > > > > > > > > > > > > really a valid
> > > > > > > > > > > > > > > > use case?
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > It seems that for de-duplication, one wants to 
> > > > > > > > > > > > > > > > have some
> > > > > > > > > > > > > > > > "expiration
> > > > > > > > > > > > > > > > time", ie, for each ID, deduplicate all 
> > > > > > > > > > > > > > > > consecutive records
> > > > > > > > > > > > > > > > with the
> > > > > > > > > > > > > > > > same ID and emit the first record after the 
> > > > > > > > > > > > > > > > "expiration time"
> > > > > > > > > > > > > > > > passed. In
> > > > > > > > > > > > > > > > terms of a window, this would mean that the 
> > > > > > > > > > > > > > > > window starts at
> > > > > > > > > > > > > > > > `r.ts` and
> > > > > > > > > > > > > > > > ends at `r.ts + windowSize`, ie, the window is 
> > > > > > > > > > > > > > > > aligned to the
> > > > > > > > > > > > > > > > data.
> > > > > > > > > > > > > > > > TimeWindows are aligned to the epoch though. 
> > > > > > > > > > > > > > > > While
> > > > > > > > > > > > > > > > `SlidingWindows` also
> > > > > > > > > > > > > > > > align to the data, for the aggregation use-case 
> > > > > > > > > > > > > > > > they go
> > > > > > > > > > > > > > > > backward in
> > > > > > > > > > > > > > > > time, while we need a window that goes forward 
> > > > > > > > > > > > > > > > in time. It's an
> > > > > > > > > > > > > > > > open
> > > > > > > > > > > > > > > > question if we can re-purpose `SlidingWindows` 
> > > > > > > > > > > > > > > > -- it might be
> > > > > > > > > > > > > > > > ok the
> > > > > > > > > > > > > > > > make the alignment (into the past vs into the 
> > > > > > > > > > > > > > > > future) an operator
> > > > > > > > > > > > > > > > dependent behavior?
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (3) `isPersistent` -- instead of using this 
> > > > > > > > > > > > > > > > flag, it seems
> > > > > > > > > > > > > > > > better to
> > > > > > > > > > > > > > > > allow users to pass in a `Materialized` 
> > > > > > > > > > > > > > > > parameter next to
> > > > > > > > > > > > > > > > `DistinctParameters` to configure the state 
> > > > > > > > > > > > > > > > store?
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (4) I am wondering if we should really have 4 
> > > > > > > > > > > > > > > > overloads for
> > > > > > > > > > > > > > > > `DistinctParameters.with()`? It might be better 
> > > > > > > > > > > > > > > > to have one
> > > > > > > > > > > > > > > > overload
> > > > > > > > > > > > > > > > with all require parameters, and add optional 
> > > > > > > > > > > > > > > > parameters using the
> > > > > > > > > > > > > > > > builder pattern? This seems to follow the DSL 
> > > > > > > > > > > > > > > > Grammer proposal.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > (5) Even if it might be an implementation 
> > > > > > > > > > > > > > > > detail (and maybe the
> > > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > itself does not need to mention it), can you 
> > > > > > > > > > > > > > > > give a high level
> > > > > > > > > > > > > > > > overview
> > > > > > > > > > > > > > > > how you intent to implement it (that would be 
> > > > > > > > > > > > > > > > easier to grog,
> > > > > > > > > > > > > > > > compared
> > > > > > > > > > > > > > > > to reading the PR).
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> > > > > > > > > > > > > > > > > Sorry, I forgot to add [DISCUSS] tag to the 
> > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 24.08.2020 2:27, Ivan Ponomarev пишет:
> > > > > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'd like to start a discussion for KIP-655.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > KIP-655:
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I also opened a proof-of-concept PR for you 
> > > > > > > > > > > > > > > > > > to experiment
> > > > > > > > > > > > > > > > > > with the API:
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > PR#9210: 
> > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/9210
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Ivan Ponomarev
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > 
> > > 
> > > 
> > 
> > 


Reply via email to