Hi Ivan,

Sorry for the late amendment, but I was just reviewing the
KIP again for your vote thread, and it struck me that, if
this is a stateful operation, we need a few more overloads.

Following the example from the other stateful windowed
operators, we should have:

distinct()
distinct(Named)
distinct(Materialized)

It's a small update, but an important one, since people will
inevitably need to customize the state store for the
operation.

Thanks,
John

On Thu, 2021-07-22 at 13:58 -0500, John Roesler wrote:
> Hi Ivan,
> 
> Thanks for the reply.
> 
> 1. I think I might have gotten myself confused. I was
> thinking of this operation as stateless, but now I'm not
> sure what I was thinking... This operator has to be
> stateful, right? In that case, I agree that comparing
> serialized values seems to be way to do it.
> 
> 2. Thanks for the confirmation
> 
> 3. I continue to be satisfied to let you all hash it out.
> 
> Thanks,
> -John
> 
> On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:
> > Hi all,
> > 
> > 1. Actually I always thought about the serialized byte array only -- at 
> > least this is what local stores depend upon, and what Kafka itself 
> > depends upon when doing log compaction.
> > 
> > I can imagine a case where two different byte arrays deserialize to 
> > objects which are `equals` to each other. But I think we can ignore this 
> > for now because IMO the risks related to buggy `equals` implementations 
> > outweigh the potential benefits.
> > 
> > I will mention the duplicate definition in the KIP.
> > 
> > 2. I agree with John, he got my point.
> > 
> > 3. Let me gently insist on `distinct`. I believe that an exception to 
> > the rule is appropriate here, because the name `distinct()` is ubiquitous.
> > 
> > It's not only about Java Streams API (or .NET LINQ, which appeared 
> > earlier and also has `Distinct`): Spark's DataFrame has `distinct()` 
> > method, Hazelcast Jet has `distinct()` method, and I bet I can find more 
> > examples if I search. When we teach KStreams, we always say that 
> > KStreams are just like other streaming APIs, and they have roots in SQL 
> > queries. Users know what `distinct` is and they expect it to be in the API.
> > 
> > 
> > Regards,
> > 
> > Ivan
> > 
> > 
> > 13.07.2021 0:10, John Roesler пишет:
> > > Hi all,
> > > 
> > > Bruno raised some very good points. I’d like to chime in with additional 
> > > context.
> > > 
> > > 1. Great point. We faced a similar problem defining KIP-557. For 557, we 
> > > chose to use the serialized byte array instead of the equals() method, 
> > > but I think the situation in KIP-655 is a bit different. I think it might 
> > > make sense to use the equals() method here, but am curious what Ivan 
> > > thinks.
> > > 
> > > 2. I figured we'd do nothing. I thought Ivan was just saying that it 
> > > doesn't make a ton of sense to use it, which I agree with, but it doesn't 
> > > seem like that means we should prohibit it.
> > > 
> > > 3. FWIW, I don't have a strong feeling either way.
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:
> > > > Hi Ivan,
> > > > 
> > > > Thank you for the KIP!
> > > > 
> > > > Some aspects are not clear to me from the KIP and I have a proposal.
> > > > 
> > > > 1. The KIP does not describe the criteria that define a duplicate. Could
> > > > you add a definition of duplicate to the KIP?
> > > > 
> > > > 2. The KIP does not describe what happens if distinct() is applied on a
> > > > hopping window. On the DSL level, I do not see how you can avoid that
> > > > users apply distinct() on a hopping window, i.e., you cannot avoid it at
> > > > compile time, you need to check it at runtime and throw an exception. Is
> > > > this correct or am I missing something?
> > > > 
> > > > 3. I would also like to back a proposal by Sophie. She proposed to use
> > > > deduplicate() instead of distinct(), since the other DSL operations are
> > > > also verbs. I do not think that SQL and the Java Stream API are good
> > > > arguments to not use a verb.
> > > > 
> > > > Best,
> > > > Bruno
> > > > 
> > > > 
> > > > On 10.07.21 19:11, John Roesler wrote:
> > > > > Hi Ivan,
> > > > > 
> > > > > Sorry for the silence!
> > > > > 
> > > > > I have just re-read the proposal.
> > > > > 
> > > > > To summarize, you are now only proposing the zero-arg distict() 
> > > > > method to be added to TimeWindowedKStream and SessionWindowedKStream, 
> > > > > right?
> > > > > 
> > > > > I’m in favor of this proposal.
> > > > > 
> > > > > Thanks,
> > > > > John
> > > > > 
> > > > > On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
> > > > > > Hello everyone,
> > > > > > 
> > > > > > I would like to remind you about KIP-655 and KIP-759 just in case 
> > > > > > they
> > > > > > got lost in your inbox.
> > > > > > 
> > > > > > Now the initial proposal is split into two independent and smaller 
> > > > > > ones,
> > > > > > so it must be easier to review them. Of course, if you have time.
> > > > > > 
> > > > > > Regards,
> > > > > > 
> > > > > > Ivan
> > > > > > 
> > > > > > 
> > > > > > 24.06.2021 18:11, Ivan Ponomarev пишет:
> > > > > > > Hello all,
> > > > > > > 
> > > > > > > I have rewritten the KIP-655 summarizing what was agreed upon 
> > > > > > > during
> > > > > > > this discussion (now the proposal is much simpler and less 
> > > > > > > invasive).
> > > > > > > 
> > > > > > > I have also created KIP-759 (cancelRepartition operation) and 
> > > > > > > started a
> > > > > > > discussion for it.
> > > > > > > 
> > > > > > > Regards,
> > > > > > > 
> > > > > > > Ivan.
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 04.06.2021 8:15, Matthias J. Sax пишет:
> > > > > > > > Just skimmed over the thread -- first of all, I am glad that we 
> > > > > > > > could
> > > > > > > > merge KIP-418 and ship it :)
> > > > > > > > 
> > > > > > > > About the re-partitioning concerns, there are already two 
> > > > > > > > tickets for it:
> > > > > > > > 
> > > > > > > >     - https://issues.apache.org/jira/browse/KAFKA-4835
> > > > > > > >     - https://issues.apache.org/jira/browse/KAFKA-10844
> > > > > > > > 
> > > > > > > > Thus, it seems best to exclude this topic from this KIP, and do 
> > > > > > > > a
> > > > > > > > separate KIP for it (if necessary, we can "pause" this KIP 
> > > > > > > > until the
> > > > > > > > repartition KIP is done). It's a long standing "issue" and we 
> > > > > > > > should
> > > > > > > > resolve it in a general way I guess.
> > > > > > > > 
> > > > > > > > (Did not yet ready all responses in detail yet, so keeping this 
> > > > > > > > comment
> > > > > > > > short.)
> > > > > > > > 
> > > > > > > > 
> > > > > > > > -Matthias
> > > > > > > > 
> > > > > > > > On 6/2/21 6:35 AM, John Roesler wrote:
> > > > > > > > > Thanks, Ivan!
> > > > > > > > > 
> > > > > > > > > That sounds like a great plan to me. Two smaller KIPs are 
> > > > > > > > > easier to
> > > > > > > > > agree on than one big one.
> > > > > > > > > 
> > > > > > > > > I agree hopping and sliding windows will actually have a 
> > > > > > > > > duplicating
> > > > > > > > > effect. We can avoid adding distinct() to the sliding window
> > > > > > > > > interface, but hopping windows are just a different 
> > > > > > > > > parameterization
> > > > > > > > > of epoch-aligned windows. It seems we can’t do much about 
> > > > > > > > > that except
> > > > > > > > > document the issue.
> > > > > > > > > 
> > > > > > > > > Thanks,
> > > > > > > > > John
> > > > > > > > > 
> > > > > > > > > On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> > > > > > > > > > Hi John!
> > > > > > > > > > 
> > > > > > > > > > I think that your proposal is just fantastic, it simplifies 
> > > > > > > > > > things a
> > > > > > > > > > lot!
> > > > > > > > > > 
> > > > > > > > > > I also felt uncomfortable due to the fact that the proposed
> > > > > > > > > > `distinct()`
> > > > > > > > > > is not somewhere near `count()` and `reduce(..)`. But
> > > > > > > > > > `selectKey(..).groupByKey().windowedBy(..).distinct()` 
> > > > > > > > > > didn't look like
> > > > > > > > > > a correct option for  me because of the issue with the 
> > > > > > > > > > unneeded
> > > > > > > > > > repartitioning.
> > > > > > > > > > 
> > > > > > > > > > The bold idea that we can just CANCEL the repartitioning 
> > > > > > > > > > didn't came to
> > > > > > > > > > my mind.
> > > > > > > > > > 
> > > > > > > > > > What seemed to me a single problem is in fact two unrelated 
> > > > > > > > > > problems:
> > > > > > > > > > `distinct` operation and cancelling the unneeded 
> > > > > > > > > > repartitioning.
> > > > > > > > > > 
> > > > > > > > > >     > what if we introduce a parameter to `selectKey()` 
> > > > > > > > > > that specifies
> > > > > > > > > > that
> > > > > > > > > > the caller asserts that the new key does _not_ change the 
> > > > > > > > > > data
> > > > > > > > > > partitioning?
> > > > > > > > > > 
> > > > > > > > > > I think a more elegant solution would be not to add a new 
> > > > > > > > > > parameter to
> > > > > > > > > > `selectKey` and all the other key-changing operations 
> > > > > > > > > > (`map`,
> > > > > > > > > > `transform`, `flatMap`, ...), but add a new operator
> > > > > > > > > > `KStream#cancelRepartitioning()` that resets 
> > > > > > > > > > `keyChangingOperation`
> > > > > > > > > > flag
> > > > > > > > > > for the upstream node. Of course, "use it only if you know 
> > > > > > > > > > what you're
> > > > > > > > > > doing" warning is to be added. Well, it's a topic for a 
> > > > > > > > > > separate KIP!
> > > > > > > > > > 
> > > > > > > > > > Concerning `distinct()`. If we use `XXXWindowedKStream` 
> > > > > > > > > > facilities,
> > > > > > > > > > then
> > > > > > > > > > changes to the API are minimally invasive: we're just adding
> > > > > > > > > > `distinct()` to TimeWindowedKStream and 
> > > > > > > > > > SessionWindowedKStream, and
> > > > > > > > > > that's all.
> > > > > > > > > > 
> > > > > > > > > > We can now define `distinct` as an operation that returns 
> > > > > > > > > > only a first
> > > > > > > > > > record that falls into a new window, and filters out all 
> > > > > > > > > > the other
> > > > > > > > > > records that fall into an already existing window. BTW, we 
> > > > > > > > > > can mock the
> > > > > > > > > > behaviour of such an operation with `TopologyTestDriver` 
> > > > > > > > > > using
> > > > > > > > > > `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). 
> > > > > > > > > >  ;-)
> > > > > > > > > > 
> > > > > > > > > > Consider the following example (record times are in 
> > > > > > > > > > seconds):
> > > > > > > > > > 
> > > > > > > > > > //three bursts of variously ordered records
> > > > > > > > > > 4, 5, 6
> > > > > > > > > > 23, 22, 24
> > > > > > > > > > 34, 33, 32
> > > > > > > > > > //'late arrivals'
> > > > > > > > > > 7, 22, 35
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 1. 'Epoch-aligned deduplication' using tumbling windows:
> > > > > > > > > > 
> > > > > > > > > > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > produces
> > > > > > > > > > 
> > > > > > > > > > (key@[00000/10000], 4)
> > > > > > > > > > (key@[20000/30000], 23)
> > > > > > > > > > (key@[30000/40000], 34)
> > > > > > > > > > 
> > > > > > > > > > -- that is, one record per epoch-aligned window.
> > > > > > > > > > 
> > > > > > > > > > 2. Hopping and sliding windows do not make much sense here, 
> > > > > > > > > > because
> > > > > > > > > > they
> > > > > > > > > > produce multiple intersected windows, so that one record 
> > > > > > > > > > can be
> > > > > > > > > > multiplied, but we want deduplication.
> > > > > > > > > > 
> > > > > > > > > > 3. SessionWindows work for 'data-aligned deduplication'.
> > > > > > > > > > 
> > > > > > > > > > .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > produces only
> > > > > > > > > > 
> > > > > > > > > > ([key@4000/4000], 4)
> > > > > > > > > > ([key@23000/23000], 23)
> > > > > > > > > > 
> > > > > > > > > > because all the records bigger than 7 are stuck together in 
> > > > > > > > > > one
> > > > > > > > > > session.
> > > > > > > > > > Setting inactivity gap to 9 seconds will return three 
> > > > > > > > > > records:
> > > > > > > > > > 
> > > > > > > > > > ([key@4000/4000], 4)
> > > > > > > > > > ([key@23000/23000], 23)
> > > > > > > > > > ([key@34000/34000], 34)
> > > > > > > > > > 
> > > > > > > > > > WDYT? If you like this variant, I will re-write KIP-655 and 
> > > > > > > > > > propose a
> > > > > > > > > > separate KIP for `cancelRepartitioning` (or whatever name 
> > > > > > > > > > we will
> > > > > > > > > > choose
> > > > > > > > > > for it).
> > > > > > > > > > 
> > > > > > > > > > Regards,
> > > > > > > > > > 
> > > > > > > > > > Ivan
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 24.05.2021 22:32, John Roesler пишет:
> > > > > > > > > > > Hey there, Ivan!
> > > > > > > > > > > 
> > > > > > > > > > > In typical fashion, I'm going to make a somewhat 
> > > > > > > > > > > outlandish
> > > > > > > > > > > proposal. I'm hoping that we can side-step some of the
> > > > > > > > > > > complications that have arisen. Please bear with me.
> > > > > > > > > > > 
> > > > > > > > > > > It seems like `distinct()` is not fundamentally unlike 
> > > > > > > > > > > other windowed
> > > > > > > > > > > "aggregation" operations. Your concern about unnecessary
> > > > > > > > > > > repartitioning seems to apply just as well to `count()` 
> > > > > > > > > > > as to
> > > > > > > > > > > `distinct()`.
> > > > > > > > > > > This has come up before, but I don't remember when: what 
> > > > > > > > > > > if we
> > > > > > > > > > > introduce a parameter to `selectKey()` that specifies 
> > > > > > > > > > > that the caller
> > > > > > > > > > > asserts that the new key does _not_ change the data 
> > > > > > > > > > > partitioning?
> > > > > > > > > > > The docs on that parameter would of course spell out all 
> > > > > > > > > > > the "rights
> > > > > > > > > > > and responsibilities" of setting it.
> > > > > > > > > > > 
> > > > > > > > > > > In that case, we could indeed get back to
> > > > > > > > > > > `selectKey(A).windowBy(B).distinct(...)`, where we get to 
> > > > > > > > > > > compose the
> > > > > > > > > > > key mapper and the windowing function without having to 
> > > > > > > > > > > carve out
> > > > > > > > > > > a separate domain just for `distinct()`. All the rest of 
> > > > > > > > > > > the KStream
> > > > > > > > > > > operations would also benefit.
> > > > > > > > > > > 
> > > > > > > > > > > What do you think?
> > > > > > > > > > > 
> > > > > > > > > > > Thanks,
> > > > > > > > > > > John
> > > > > > > > > > > 
> > > > > > > > > > > On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> > > > > > > > > > > > Hello everyone,
> > > > > > > > > > > > 
> > > > > > > > > > > > let me revive the discussion for KIP-655. Now I have 
> > > > > > > > > > > > some time
> > > > > > > > > > > > again and
> > > > > > > > > > > > I'm eager to finalize this.
> > > > > > > > > > > > 
> > > > > > > > > > > > Based on what was already discussed, I think that we 
> > > > > > > > > > > > can split the
> > > > > > > > > > > > discussion into three topics for our convenience.
> > > > > > > > > > > > 
> > > > > > > > > > > > The three topics are:
> > > > > > > > > > > > 
> > > > > > > > > > > > - idExtractor  (how should we extract the deduplication 
> > > > > > > > > > > > key for
> > > > > > > > > > > > the record)
> > > > > > > > > > > > 
> > > > > > > > > > > > - timeWindows (what time windows should we use)
> > > > > > > > > > > > 
> > > > > > > > > > > > - miscellaneous (naming etc.)
> > > > > > > > > > > > 
> > > > > > > > > > > > ---- idExtractor ----
> > > > > > > > > > > > 
> > > > > > > > > > > > Original proposal: use (k, v) -> f(k, v) mapper, 
> > > > > > > > > > > > defaulting to (k,
> > > > > > > > > > > > v) ->
> > > > > > > > > > > > k.  The drawback here is that we must warn the user to 
> > > > > > > > > > > > choose such a
> > > > > > > > > > > > function that sets different IDs for records from 
> > > > > > > > > > > > different
> > > > > > > > > > > > partitions,
> > > > > > > > > > > > otherwise same IDs might be not co-partitioned (and not
> > > > > > > > > > > > deduplicated as
> > > > > > > > > > > > a result). Additional concern: what should we do when 
> > > > > > > > > > > > this function
> > > > > > > > > > > > returns null?
> > > > > > > > > > > > 
> > > > > > > > > > > > Matthias proposed key-only deduplication: that is, no 
> > > > > > > > > > > > idExtractor at
> > > > > > > > > > > > all, and if we want to use `distinct` for a particular 
> > > > > > > > > > > > identifier, we
> > > > > > > > > > > > must `selectKey()` before. The drawback of this 
> > > > > > > > > > > > approach is that
> > > > > > > > > > > > we will
> > > > > > > > > > > > always have repartitioning after the key selection, 
> > > > > > > > > > > > while in practice
> > > > > > > > > > > > repartitioning will not always be necessary (for 
> > > > > > > > > > > > example, when the
> > > > > > > > > > > > data
> > > > > > > > > > > > stream is such that different values infer different 
> > > > > > > > > > > > keys).
> > > > > > > > > > > > 
> > > > > > > > > > > > So here we have a 'safety vs. performance' trade-off. 
> > > > > > > > > > > > But 'safe'
> > > > > > > > > > > > variant
> > > > > > > > > > > > is also not very convenient for developers, since we're 
> > > > > > > > > > > > forcing
> > > > > > > > > > > > them to
> > > > > > > > > > > > change the structure of their records.
> > > > > > > > > > > > 
> > > > > > > > > > > > A 'golden mean' here might be using composite ID with 
> > > > > > > > > > > > its first
> > > > > > > > > > > > component equals to k and its second component equals 
> > > > > > > > > > > > to some f(v) (f
> > > > > > > > > > > > defaults to v -> null, and null value returned by f(v) 
> > > > > > > > > > > > means
> > > > > > > > > > > > 'deduplicate by the key only'). The nuance here is that 
> > > > > > > > > > > > we will have
> > > > > > > > > > > > serializers only for types of k and f(v), and we must 
> > > > > > > > > > > > correctly
> > > > > > > > > > > > serialize a tuple (k, f(v)), but of course this is 
> > > > > > > > > > > > doable.
> > > > > > > > > > > > 
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > > 
> > > > > > > > > > > > ---- timeWindows ----
> > > > > > > > > > > > 
> > > > > > > > > > > > Originally I proposed TimeWindows only just because 
> > > > > > > > > > > > they solved my
> > > > > > > > > > > > particular case :-) but agree with Matthias' and 
> > > > > > > > > > > > Sophie's objections.
> > > > > > > > > > > > 
> > > > > > > > > > > > I like the Sophie's point: we need both epoch-aligned 
> > > > > > > > > > > > and
> > > > > > > > > > > > data-aligned
> > > > > > > > > > > > windows. IMO this is absolutely correct: "data-aligned 
> > > > > > > > > > > > is useful for
> > > > > > > > > > > > example when you know that a large number of updates to 
> > > > > > > > > > > > a single key
> > > > > > > > > > > > will occur in short bursts, and epoch-aligned when you
> > > > > > > > > > > > specifically want
> > > > > > > > > > > > to get just a single update per discrete time interval."
> > > > > > > > > > > > 
> > > > > > > > > > > > I just cannot agree right away with Sophie's
> > > > > > > > > > > > .groupByKey().windowedBy(...).distinct() proposal, as 
> > > > > > > > > > > > it implies  the
> > > > > > > > > > > > key-only deduplication -- see the previous topic.
> > > > > > > > > > > > 
> > > > > > > > > > > > Epoch-aligned windows are very simple: they should 
> > > > > > > > > > > > forward only one
> > > > > > > > > > > > record per enumerated time window. TimeWindows are 
> > > > > > > > > > > > exactly what we
> > > > > > > > > > > > want
> > > > > > > > > > > > here. I mentioned in the KIP both tumbling and hopping 
> > > > > > > > > > > > windows just
> > > > > > > > > > > > because both are possible for TimeWindows, but indeed I 
> > > > > > > > > > > > don't see any
> > > > > > > > > > > > real use case for hopping windows, only tumbling 
> > > > > > > > > > > > windows make
> > > > > > > > > > > > sence IMO.
> > > > > > > > > > > > 
> > > > > > > > > > > > For data-aligned windows SlidingWindow interface seems 
> > > > > > > > > > > > to be a nearly
> > > > > > > > > > > > valid choice. Nearly. It should forward a record once 
> > > > > > > > > > > > when it's first
> > > > > > > > > > > > seen, and then not again for any identical records that 
> > > > > > > > > > > > fall into the
> > > > > > > > > > > > next N timeUnits.  However, we cannot reuse 
> > > > > > > > > > > > SlidingWindow as is,
> > > > > > > > > > > > because
> > > > > > > > > > > > just as Matthias noted, SlidingWindows go backward in 
> > > > > > > > > > > > time, while we
> > > > > > > > > > > > need a windows that go forward in time, and are not 
> > > > > > > > > > > > opened while
> > > > > > > > > > > > records
> > > > > > > > > > > > fall into an already existing window. We definitely 
> > > > > > > > > > > > should make
> > > > > > > > > > > > our own
> > > > > > > > > > > > implementation, maybe we should call it 
> > > > > > > > > > > > ExpirationWindow? WDYT?
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > ---- miscellaneous ----
> > > > > > > > > > > > 
> > > > > > > > > > > > Persistent/in-memory stores. Matthias proposed to pass 
> > > > > > > > > > > > Materialized
> > > > > > > > > > > > parameter next to DistinctParameters (and this is 
> > > > > > > > > > > > necessary,
> > > > > > > > > > > > because we
> > > > > > > > > > > > will need to provide a serializer for extracted id). 
> > > > > > > > > > > > This is
> > > > > > > > > > > > absolutely
> > > > > > > > > > > > valid point, I agree and I will fix it in the KIP.
> > > > > > > > > > > > 
> > > > > > > > > > > > Naming. Sophie noted that the Streams DSL operators are 
> > > > > > > > > > > > typically
> > > > > > > > > > > > named
> > > > > > > > > > > > as verbs, so she proposes `deduplicate` in favour of 
> > > > > > > > > > > > `distinct`. I
> > > > > > > > > > > > think
> > > > > > > > > > > > that while it's important to stick to the naming 
> > > > > > > > > > > > conventions, it
> > > > > > > > > > > > is also
> > > > > > > > > > > > important to think of the experience of those who come 
> > > > > > > > > > > > from different
> > > > > > > > > > > > stacks/technologies. People who are familiar with SQL 
> > > > > > > > > > > > and Java
> > > > > > > > > > > > Streams
> > > > > > > > > > > > API must know for sure what does 'distinct' mean, while 
> > > > > > > > > > > > data
> > > > > > > > > > > > deduplication in general is a more complex task and thus
> > > > > > > > > > > > `deduplicate`
> > > > > > > > > > > > might be misleading. But I'm ready to be convinced if 
> > > > > > > > > > > > the majority
> > > > > > > > > > > > thinks otherwise.
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > 
> > > > > > > > > > > > Ivan
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> > > > > > > > > > > > > Hey all,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I'm not convinced either epoch-aligned or 
> > > > > > > > > > > > > data-aligned will fit all
> > > > > > > > > > > > > possible use cases.
> > > > > > > > > > > > > Both seem totally reasonable to me: data-aligned is 
> > > > > > > > > > > > > useful for
> > > > > > > > > > > > > example when
> > > > > > > > > > > > > you know
> > > > > > > > > > > > > that a large number of updates to a single key will 
> > > > > > > > > > > > > occur in
> > > > > > > > > > > > > short bursts,
> > > > > > > > > > > > > and epoch-
> > > > > > > > > > > > > aligned when you specifically want to get just a 
> > > > > > > > > > > > > single update
> > > > > > > > > > > > > per discrete
> > > > > > > > > > > > > time
> > > > > > > > > > > > > interval.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Going a step further, though, what if you want just a 
> > > > > > > > > > > > > single
> > > > > > > > > > > > > update per
> > > > > > > > > > > > > calendar
> > > > > > > > > > > > > month, or per year with accounting for leap years? 
> > > > > > > > > > > > > Neither of
> > > > > > > > > > > > > those are
> > > > > > > > > > > > > serviced that
> > > > > > > > > > > > > well by the existing Windows specification to windowed
> > > > > > > > > > > > > aggregations, a
> > > > > > > > > > > > > well-known
> > > > > > > > > > > > > limitation of the current API. There is actually a KIP
> > > > > > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
> > > > > > > > > > > > > 
> > > > > > > > > > > > > going
> > > > > > > > > > > > > on in parallel to fix this
> > > > > > > > > > > > > exact issue and make the windowing interface much 
> > > > > > > > > > > > > more flexible.
> > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > instead
> > > > > > > > > > > > > of re-implementing this windowing interface in a 
> > > > > > > > > > > > > similarly
> > > > > > > > > > > > > limited fashion
> > > > > > > > > > > > > for the
> > > > > > > > > > > > > Distinct operator, we could leverage it here and get 
> > > > > > > > > > > > > all the
> > > > > > > > > > > > > benefits
> > > > > > > > > > > > > coming with
> > > > > > > > > > > > > KIP-645.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Specifically, I'm proposing to remove the 
> > > > > > > > > > > > > TimeWindows/etc config
> > > > > > > > > > > > > from the
> > > > > > > > > > > > > DistinctParameters class, and move the distinct() 
> > > > > > > > > > > > > method from the
> > > > > > > > > > > > > KStream
> > > > > > > > > > > > > interface
> > > > > > > > > > > > > to the TimeWindowedKStream interface. Since it's 
> > > > > > > > > > > > > semantically
> > > > > > > > > > > > > similar to a
> > > > > > > > > > > > > kind of
> > > > > > > > > > > > > windowed aggregation, it makes sense to align it with 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > existing windowing
> > > > > > > > > > > > > framework, ie:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > inputStream
> > > > > > > > > > > > >          .groupKyKey()
> > > > > > > > > > > > >          .windowedBy()
> > > > > > > > > > > > >          .distinct()
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Then we could use data-aligned windows if 
> > > > > > > > > > > > > SlidingWindows is
> > > > > > > > > > > > > specified in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > windowedBy(), and epoch-aligned (or some other kind 
> > > > > > > > > > > > > of enumerable
> > > > > > > > > > > > > window)
> > > > > > > > > > > > > if a Windows is specified in windowedBy() (or an
> > > > > > > > > > > > > EnumerableWindowDefinition
> > > > > > > > > > > > > once KIP-645 is implemented to replace Windows).
> > > > > > > > > > > > > 
> > > > > > > > > > > > > *SlidingWindows*: should forward a record once when 
> > > > > > > > > > > > > it's first
> > > > > > > > > > > > > seen, and
> > > > > > > > > > > > > then not again
> > > > > > > > > > > > > for any identical records that fall into the next N 
> > > > > > > > > > > > > timeUnits. This
> > > > > > > > > > > > > includes out-of-order
> > > > > > > > > > > > > records, ie if you have a SlidingWindows of size 10s 
> > > > > > > > > > > > > and process
> > > > > > > > > > > > > records at
> > > > > > > > > > > > > time
> > > > > > > > > > > > > 15s, 20s, 14s then you would just forward the one at 
> > > > > > > > > > > > > 15s.
> > > > > > > > > > > > > Presumably, if
> > > > > > > > > > > > > you're
> > > > > > > > > > > > > using SlidingWindows, you don't care about what falls 
> > > > > > > > > > > > > into exact
> > > > > > > > > > > > > time
> > > > > > > > > > > > > boxes, you just
> > > > > > > > > > > > > want to deduplicate. If you do care about exact time 
> > > > > > > > > > > > > boxing then
> > > > > > > > > > > > > you should
> > > > > > > > > > > > > use...
> > > > > > > > > > > > > 
> > > > > > > > > > > > > *EnumerableWindowDefinition* (eg *TimeWindows*): 
> > > > > > > > > > > > > should forward
> > > > > > > > > > > > > only one
> > > > > > > > > > > > > record
> > > > > > > > > > > > > per enumerated time window. If you get a records at 
> > > > > > > > > > > > > 15s, 20s,14s
> > > > > > > > > > > > > where the
> > > > > > > > > > > > > windows
> > > > > > > > > > > > > are enumerated at [5,14], [15, 24], etc then you 
> > > > > > > > > > > > > forward the
> > > > > > > > > > > > > record at 15s
> > > > > > > > > > > > > and also
> > > > > > > > > > > > > the record at 14s
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Just an idea: not sure if the impedance mismatch 
> > > > > > > > > > > > > would throw
> > > > > > > > > > > > > users off
> > > > > > > > > > > > > since the
> > > > > > > > > > > > > semantics of the distinct windows are slightly 
> > > > > > > > > > > > > different than in the
> > > > > > > > > > > > > aggregations.
> > > > > > > > > > > > > But if we don't fit this into the existing windowed 
> > > > > > > > > > > > > framework,
> > > > > > > > > > > > > then we
> > > > > > > > > > > > > shouldn't use
> > > > > > > > > > > > > any existing Windows-type classes at all, imo. ie we 
> > > > > > > > > > > > > should
> > > > > > > > > > > > > create a new
> > > > > > > > > > > > > DistinctWindows config class, similar to how 
> > > > > > > > > > > > > stream-stream joins
> > > > > > > > > > > > > get their
> > > > > > > > > > > > > own
> > > > > > > > > > > > > JoinWindows class
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I also think that non-windowed deduplication could be 
> > > > > > > > > > > > > useful, in
> > > > > > > > > > > > > which case
> > > > > > > > > > > > > we
> > > > > > > > > > > > > would want to also have the distinct() operator on 
> > > > > > > > > > > > > the KStream
> > > > > > > > > > > > > interface.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > One quick note regarding the naming: it seems like 
> > > > > > > > > > > > > the Streams
> > > > > > > > > > > > > DSL operators
> > > > > > > > > > > > > are typically named as verbs rather than adjectives, 
> > > > > > > > > > > > > for example.
> > > > > > > > > > > > > #suppress
> > > > > > > > > > > > > or
> > > > > > > > > > > > > #aggregate. I get that there's some precedent for  
> > > > > > > > > > > > > 'distinct'
> > > > > > > > > > > > > specifically,
> > > > > > > > > > > > > but
> > > > > > > > > > > > > maybe something like 'deduplicate' would be more 
> > > > > > > > > > > > > appropriate for
> > > > > > > > > > > > > the Streams
> > > > > > > > > > > > > API.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > WDYT?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
> > > > > > > > > > > > > <iponoma...@mail.ru.invalid>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > > Hi Matthias,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thanks for your review! It made me think deeper, 
> > > > > > > > > > > > > > and indeed I
> > > > > > > > > > > > > > understood
> > > > > > > > > > > > > > that I was missing some important details.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > To simplify, let me explain my particular use case 
> > > > > > > > > > > > > > first so I
> > > > > > > > > > > > > > can refer
> > > > > > > > > > > > > > to it later.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > We have a system that collects information about 
> > > > > > > > > > > > > > ongoing live
> > > > > > > > > > > > > > sporting
> > > > > > > > > > > > > > events from different sources. The information 
> > > > > > > > > > > > > > sources have
> > > > > > > > > > > > > > their IDs
> > > > > > > > > > > > > > and these IDs are keys of the stream. Each source 
> > > > > > > > > > > > > > emits messages
> > > > > > > > > > > > > > concerning sporting events, and we can have many 
> > > > > > > > > > > > > > messages about
> > > > > > > > > > > > > > each
> > > > > > > > > > > > > > sporing event from each source. Event ID is 
> > > > > > > > > > > > > > extracted from the
> > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > We need a database of event IDs that were reported 
> > > > > > > > > > > > > > at least once
> > > > > > > > > > > > > > by each
> > > > > > > > > > > > > > source (important: events from different sources 
> > > > > > > > > > > > > > are considered
> > > > > > > > > > > > > > to be
> > > > > > > > > > > > > > different entities). The requirements are:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 1) each new event ID should be written to the 
> > > > > > > > > > > > > > database as soon
> > > > > > > > > > > > > > as possible
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 2) although it's ok and sometimes even desired to 
> > > > > > > > > > > > > > repeat the
> > > > > > > > > > > > > > notification about already known event ID, but we 
> > > > > > > > > > > > > > wouldn’t like our
> > > > > > > > > > > > > > database to be bothered by the same event ID more 
> > > > > > > > > > > > > > often than
> > > > > > > > > > > > > > once in a
> > > > > > > > > > > > > > given period of time (say, 15 minutes).
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > With this example in mind let me answer your 
> > > > > > > > > > > > > > questions
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (1) Using the `idExtractor` has the issue 
> > > > > > > > > > > > > > that data might
> > > > > > > > > > > > > > not be
> > > > > > > > > > > > > >       > co-partitioned as you mentioned in the KIP. 
> > > > > > > > > > > > > > Thus, I am
> > > > > > > > > > > > > > wondering if it
> > > > > > > > > > > > > >       > might be better to do deduplication only on 
> > > > > > > > > > > > > > the key? If
> > > > > > > > > > > > > > one sets a new
> > > > > > > > > > > > > >       > key upstream (ie, extracts the 
> > > > > > > > > > > > > > deduplication id into the
> > > > > > > > > > > > > > key), the
> > > > > > > > > > > > > >       > `distinct` operator could automatically 
> > > > > > > > > > > > > > repartition the
> > > > > > > > > > > > > > data and thus we
> > > > > > > > > > > > > >       > would avoid user errors.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Of course with 'key-only' deduplication + 
> > > > > > > > > > > > > > autorepartitioning we
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > never cause problems with co-partitioning. But in 
> > > > > > > > > > > > > > practice, we
> > > > > > > > > > > > > > often
> > > > > > > > > > > > > > don't need repartitioning even if 'dedup ID' is 
> > > > > > > > > > > > > > different from
> > > > > > > > > > > > > > the key,
> > > > > > > > > > > > > > like in my example above. So here we have a sort of 
> > > > > > > > > > > > > > 'performance vs
> > > > > > > > > > > > > > security' tradeoff.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > The 'golden middle way' here can be the following: 
> > > > > > > > > > > > > > we can form a
> > > > > > > > > > > > > > deduplication ID as KEY + separator + 
> > > > > > > > > > > > > > idExtractor(VALUE). In case
> > > > > > > > > > > > > > idExtractor is not provided, we deduplicate by key 
> > > > > > > > > > > > > > only (as in
> > > > > > > > > > > > > > original
> > > > > > > > > > > > > > proposal). Then idExtractor transforms only the 
> > > > > > > > > > > > > > value (and not
> > > > > > > > > > > > > > the key)
> > > > > > > > > > > > > > and its result is appended to the key. Records from 
> > > > > > > > > > > > > > different
> > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > will inherently have different deduplication IDs 
> > > > > > > > > > > > > > and all the
> > > > > > > > > > > > > > data will
> > > > > > > > > > > > > > be co-partitioned. As with any stateful operation, 
> > > > > > > > > > > > > > we will
> > > > > > > > > > > > > > repartition
> > > > > > > > > > > > > > the topic in case the key was changed upstream, but 
> > > > > > > > > > > > > > only in this
> > > > > > > > > > > > > > case,
> > > > > > > > > > > > > > thus avoiding unnecessary repartitioning. My 
> > > > > > > > > > > > > > example above fits
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > perfectly.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (2) What is the motivation for allowing the 
> > > > > > > > > > > > > > `idExtractor`
> > > > > > > > > > > > > > to return
> > > > > > > > > > > > > >       > `null`? Might be good to have some use-case 
> > > > > > > > > > > > > > examples for
> > > > > > > > > > > > > > this feature.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Can't think of any use-cases. As it often happens, 
> > > > > > > > > > > > > > it's just
> > > > > > > > > > > > > > came with a
> > > > > > > > > > > > > > copy-paste from StackOverflow -- see Michael Noll's 
> > > > > > > > > > > > > > answer here:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > But, jokes aside, we'll have to decide what to do 
> > > > > > > > > > > > > > with nulls. If we
> > > > > > > > > > > > > > accept the above proposal of having deduplication 
> > > > > > > > > > > > > > ID as KEY +
> > > > > > > > > > > > > > postfix,
> > > > > > > > > > > > > > then null can be treated as no postfix at all. If 
> > > > > > > > > > > > > > we don't
> > > > > > > > > > > > > > accept this
> > > > > > > > > > > > > > approach, then treating nulls as 'no-deduplication' 
> > > > > > > > > > > > > > seems to be a
> > > > > > > > > > > > > > reasonable assumption (we can't get or put null as 
> > > > > > > > > > > > > > a key to a KV
> > > > > > > > > > > > > > store,
> > > > > > > > > > > > > > so a record with null ID is always going to look 
> > > > > > > > > > > > > > 'new' for us).
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (2) Is using a `TimeWindow` really what we 
> > > > > > > > > > > > > > want? I was
> > > > > > > > > > > > > > wondering if a
> > > > > > > > > > > > > >       > `SlidingWindow` might be better? Or maybe 
> > > > > > > > > > > > > > we need a new
> > > > > > > > > > > > > > type of window?
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Agree. It's probably not what we want. Once I 
> > > > > > > > > > > > > > thought that reusing
> > > > > > > > > > > > > > TimeWindow is a clever idea, now I don't.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Do we need epoch alignment in our use case? No, we 
> > > > > > > > > > > > > > don't, and I
> > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > know if anyone going to need this. Epoch alignment 
> > > > > > > > > > > > > > is good for
> > > > > > > > > > > > > > aggregation, but deduplication is a different story.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Let me describe the semantic the way I see it now 
> > > > > > > > > > > > > > and tell me
> > > > > > > > > > > > > > what you
> > > > > > > > > > > > > > think:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - the only parameter that defines the deduplication 
> > > > > > > > > > > > > > logic is
> > > > > > > > > > > > > > 'expiration
> > > > > > > > > > > > > > period'
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - when a deduplication ID arrives and we cannot 
> > > > > > > > > > > > > > find it in the
> > > > > > > > > > > > > > store, we
> > > > > > > > > > > > > > forward the message downstream and store the ID + 
> > > > > > > > > > > > > > its timestamp.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - when an out-of-order ID arrives with an older 
> > > > > > > > > > > > > > timestamp and we
> > > > > > > > > > > > > > find a
> > > > > > > > > > > > > > 'fresher' record, we do nothing and don't forward 
> > > > > > > > > > > > > > the message
> > > > > > > > > > > > > > (??? OR
> > > > > > > > > > > > > > NOT? In what case would we want to forward an 
> > > > > > > > > > > > > > out-of-order
> > > > > > > > > > > > > > message?)
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - when an ID with fresher timestamp arrives we 
> > > > > > > > > > > > > > check if it falls
> > > > > > > > > > > > > > into
> > > > > > > > > > > > > > the expiration period and either forward it or not, 
> > > > > > > > > > > > > > but in both
> > > > > > > > > > > > > > cases we
> > > > > > > > > > > > > > update the timestamp of the message in the store
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - the WindowStore retention mechanism should clean 
> > > > > > > > > > > > > > up very old
> > > > > > > > > > > > > > records
> > > > > > > > > > > > > > in order not to run out of space.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (3) `isPersistent` -- instead of using this 
> > > > > > > > > > > > > > flag, it seems
> > > > > > > > > > > > > > better to
> > > > > > > > > > > > > >       > allow users to pass in a `Materialized` 
> > > > > > > > > > > > > > parameter next to
> > > > > > > > > > > > > >       > `DistinctParameters` to configure the state 
> > > > > > > > > > > > > > store?
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Fully agree! Users might also want to change the 
> > > > > > > > > > > > > > retention time.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (4) I am wondering if we should really have 
> > > > > > > > > > > > > > 4 overloads for
> > > > > > > > > > > > > >       > `DistinctParameters.with()`? It might be 
> > > > > > > > > > > > > > better to have
> > > > > > > > > > > > > > one overload
> > > > > > > > > > > > > >       > with all require parameters, and add 
> > > > > > > > > > > > > > optional parameters
> > > > > > > > > > > > > > using the
> > > > > > > > > > > > > >       > builder pattern? This seems to follow the 
> > > > > > > > > > > > > > DSL Grammer
> > > > > > > > > > > > > > proposal.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Oh, I can explain. We can't fully rely on the 
> > > > > > > > > > > > > > builder pattern
> > > > > > > > > > > > > > because of
> > > > > > > > > > > > > > Java type inference limitations. We have to provide 
> > > > > > > > > > > > > > type
> > > > > > > > > > > > > > parameters to
> > > > > > > > > > > > > > the builder methods or the code won't compile: see 
> > > > > > > > > > > > > > e. g. this
> > > > > > > > > > > > > > https://twitter.com/inponomarev/status/1265053286933159938
> > > > > > > > > > > > > >  and
> > > > > > > > > > > > > > following
> > > > > > > > > > > > > > discussion with Tagir Valeev.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > When we came across the similar difficulties in 
> > > > > > > > > > > > > > KIP-418, we finally
> > > > > > > > > > > > > > decided to add all the necessary overloads to 
> > > > > > > > > > > > > > parameter class.
> > > > > > > > > > > > > > So I just
> > > > > > > > > > > > > > reproduced that approach here.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > >       > (5) Even if it might be an implementation 
> > > > > > > > > > > > > > detail (and
> > > > > > > > > > > > > > maybe the KIP
> > > > > > > > > > > > > >       > itself does not need to mention it), can 
> > > > > > > > > > > > > > you give a high
> > > > > > > > > > > > > > level overview
> > > > > > > > > > > > > >       > how you intent to implement it (that would 
> > > > > > > > > > > > > > be easier to
> > > > > > > > > > > > > > grog, compared
> > > > > > > > > > > > > >       > to reading the PR).
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Well as with any operation on KStreamImpl level I'm 
> > > > > > > > > > > > > > building a
> > > > > > > > > > > > > > store and
> > > > > > > > > > > > > > a processor node.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > KStreamDistinct class is going to be the 
> > > > > > > > > > > > > > ProcessorSupplier, with
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > logic regarding the forwarding/muting of the 
> > > > > > > > > > > > > > records located in
> > > > > > > > > > > > > > KStreamDistinct.KStreamDistinctProcessor#process
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > ----
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Matthias, if you are still reading this :-) a 
> > > > > > > > > > > > > > gentle reminder:
> > > > > > > > > > > > > > my PR for
> > > > > > > > > > > > > > already accepted KIP-418 is still waiting for your 
> > > > > > > > > > > > > > review. I
> > > > > > > > > > > > > > think it's
> > > > > > > > > > > > > > better for me to finalize at least one  KIP before 
> > > > > > > > > > > > > > proceeding to
> > > > > > > > > > > > > > a new
> > > > > > > > > > > > > > one :-)
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Ivan
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 03.09.2020 4:20, Matthias J. Sax пишет:
> > > > > > > > > > > > > > > Thanks for the KIP Ivan. Having a built-in 
> > > > > > > > > > > > > > > deduplication
> > > > > > > > > > > > > > > operator is for
> > > > > > > > > > > > > > > sure a good addition.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Couple of questions:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (1) Using the `idExtractor` has the issue that 
> > > > > > > > > > > > > > > data might not be
> > > > > > > > > > > > > > > co-partitioned as you mentioned in the KIP. Thus, 
> > > > > > > > > > > > > > > I am
> > > > > > > > > > > > > > > wondering if it
> > > > > > > > > > > > > > > might be better to do deduplication only on the 
> > > > > > > > > > > > > > > key? If one
> > > > > > > > > > > > > > > sets a new
> > > > > > > > > > > > > > > key upstream (ie, extracts the deduplication id 
> > > > > > > > > > > > > > > into the key), the
> > > > > > > > > > > > > > > `distinct` operator could automatically 
> > > > > > > > > > > > > > > repartition the data
> > > > > > > > > > > > > > > and thus we
> > > > > > > > > > > > > > > would avoid user errors.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (2) What is the motivation for allowing the 
> > > > > > > > > > > > > > > `idExtractor` to
> > > > > > > > > > > > > > > return
> > > > > > > > > > > > > > > `null`? Might be good to have some use-case 
> > > > > > > > > > > > > > > examples for this
> > > > > > > > > > > > > > > feature.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (2) Is using a `TimeWindow` really what we want? 
> > > > > > > > > > > > > > > I was
> > > > > > > > > > > > > > > wondering if a
> > > > > > > > > > > > > > > `SlidingWindow` might be better? Or maybe we need 
> > > > > > > > > > > > > > > a new type of
> > > > > > > > > > > > > > > window?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > It would be helpful if you could describe 
> > > > > > > > > > > > > > > potential use cases
> > > > > > > > > > > > > > > in more
> > > > > > > > > > > > > > > detail. -- I am mainly wondering about hopping 
> > > > > > > > > > > > > > > window? Each
> > > > > > > > > > > > > > > record would
> > > > > > > > > > > > > > > always falls into multiple window and thus would 
> > > > > > > > > > > > > > > be emitted
> > > > > > > > > > > > > > > multiple
> > > > > > > > > > > > > > > times, ie, each time the window closes. Is this 
> > > > > > > > > > > > > > > really a valid
> > > > > > > > > > > > > > > use case?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > It seems that for de-duplication, one wants to 
> > > > > > > > > > > > > > > have some
> > > > > > > > > > > > > > > "expiration
> > > > > > > > > > > > > > > time", ie, for each ID, deduplicate all 
> > > > > > > > > > > > > > > consecutive records
> > > > > > > > > > > > > > > with the
> > > > > > > > > > > > > > > same ID and emit the first record after the 
> > > > > > > > > > > > > > > "expiration time"
> > > > > > > > > > > > > > > passed. In
> > > > > > > > > > > > > > > terms of a window, this would mean that the 
> > > > > > > > > > > > > > > window starts at
> > > > > > > > > > > > > > > `r.ts` and
> > > > > > > > > > > > > > > ends at `r.ts + windowSize`, ie, the window is 
> > > > > > > > > > > > > > > aligned to the
> > > > > > > > > > > > > > > data.
> > > > > > > > > > > > > > > TimeWindows are aligned to the epoch though. While
> > > > > > > > > > > > > > > `SlidingWindows` also
> > > > > > > > > > > > > > > align to the data, for the aggregation use-case 
> > > > > > > > > > > > > > > they go
> > > > > > > > > > > > > > > backward in
> > > > > > > > > > > > > > > time, while we need a window that goes forward in 
> > > > > > > > > > > > > > > time. It's an
> > > > > > > > > > > > > > > open
> > > > > > > > > > > > > > > question if we can re-purpose `SlidingWindows` -- 
> > > > > > > > > > > > > > > it might be
> > > > > > > > > > > > > > > ok the
> > > > > > > > > > > > > > > make the alignment (into the past vs into the 
> > > > > > > > > > > > > > > future) an operator
> > > > > > > > > > > > > > > dependent behavior?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (3) `isPersistent` -- instead of using this flag, 
> > > > > > > > > > > > > > > it seems
> > > > > > > > > > > > > > > better to
> > > > > > > > > > > > > > > allow users to pass in a `Materialized` parameter 
> > > > > > > > > > > > > > > next to
> > > > > > > > > > > > > > > `DistinctParameters` to configure the state store?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (4) I am wondering if we should really have 4 
> > > > > > > > > > > > > > > overloads for
> > > > > > > > > > > > > > > `DistinctParameters.with()`? It might be better 
> > > > > > > > > > > > > > > to have one
> > > > > > > > > > > > > > > overload
> > > > > > > > > > > > > > > with all require parameters, and add optional 
> > > > > > > > > > > > > > > parameters using the
> > > > > > > > > > > > > > > builder pattern? This seems to follow the DSL 
> > > > > > > > > > > > > > > Grammer proposal.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > (5) Even if it might be an implementation detail 
> > > > > > > > > > > > > > > (and maybe the
> > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > itself does not need to mention it), can you give 
> > > > > > > > > > > > > > > a high level
> > > > > > > > > > > > > > > overview
> > > > > > > > > > > > > > > how you intent to implement it (that would be 
> > > > > > > > > > > > > > > easier to grog,
> > > > > > > > > > > > > > > compared
> > > > > > > > > > > > > > > to reading the PR).
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> > > > > > > > > > > > > > > > Sorry, I forgot to add [DISCUSS] tag to the 
> > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 24.08.2020 2:27, Ivan Ponomarev пишет:
> > > > > > > > > > > > > > > > > Hello,
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I'd like to start a discussion for KIP-655.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > KIP-655:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I also opened a proof-of-concept PR for you 
> > > > > > > > > > > > > > > > > to experiment
> > > > > > > > > > > > > > > > > with the API:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > PR#9210: 
> > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/9210
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Ivan Ponomarev
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > > 
> > > > 
> > 
> > 
> 
> 


Reply via email to