Couple of questions:

 - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)

 - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...

Also, why would the result key need to be windowed?

Btw: How should out-of-order data be handled? Given that you only want
to consider the key, the value could be different, and thus, if there is
out-of-order data, keeping the one or other value could make a
difference? IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


While I agree that deduplication for overlapping window is questionable,
I am still wondering if you plan to disallow it (by adding a runtime
check and throwing an exception), or not?


On 8/1/21 6:42 AM, Ivan Ponomarev wrote:
> Hi Bruno,
> 
> I'm sorry for the delay with the answer. Unfortunately your messages
> were put to spam folder, that's why I didn't answer them right away.
> 
> Concerning your question about comparing serialized values vs. using
> equals: I think it must be clear now due to John's explanations.
> Distinct is a stateful operation, thus we will need to use
> serialization. (Although AFAICS the in-memory storage might be a good
> practical solution in many cases).
> 
>> I do currently not see why it should not make sense in hopping
> windows... I do not understand  the following sentence: "...one record
> can be multiplied instead of deduplication."
> 
> Ok, let me explain.
> 
> As it's written in the KIP, "The distinct operation returns only a first
> record that falls into a new window, and filters out all the other
> records that fall into an already existing window."
> 
> Also it's worth to remember that the result of `distinct` is
> KTable<Windowed<K>, V>, not Stream<K, V>.
> 
> If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a
> record (key, val) with timestamp 25 arrives, it will be forwarded three
> times ('multiplied') since is falls into the intersection of all three
> windows. The output will be
> 
> (key@[0/40],  val)
> (key@[10/50], val)
> (key@[20/60], val)
> 
> You can reason about `distinct` operation just like you reason about
> `sum` or `count`. When a record arrives that falls into a window, we
> update the aggregation on this window. For `distinct`, when extra
> records arrive into the same window, we also perform some sort of
> aggregation (we may even count them internally!), but, unlike sum or
> count, we will not forward anything since counter is strictly greater
> than zero.
> 
> You may refer to 'usage examples' of the KIP
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples)
> to get clearer idea of how it works.
> 
>> As I said earlier, I do not think that SQL and the Java Stream API are
> good arguments to not use a verb
> 
> This is an important matter. As we all know, naming is hard.
> 
> However, `distinct` name is not used just in SQL and Java Streams. It is
> a kind of a standard operation that is used in nearly all the data
> processing frameworks, see all the hyperlinked examples in 'Motivation'
> section of KIP
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation)
> 
> 
> Please look at it and let me know what do you think.
> 
> Regards,
> 
> Ivan
> 
> 29.07.2021 4:49, John Roesler пишет:
>> Hi Bruno,
>>
>> I had previously been thinking to use equals(), since I
>> thought that this might be a stateless operation. Comparing
>> the serialized form requires a serde and a fairly expensive
>> serialization operation, so while byte equality is superior
>> to equals(), we shouldn't use it in operations unless they
>> already require serialization.
>>
>> I chnaged my mind when I later realized I had been mistaken,
>> and this operation is of course stateful.
>>
>> I hope this helps clarify it.
>>
>> Thanks,
>> -John
>>
>> On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote:
>>> Hi Ivan and John,
>>>
>>> 1. John, could you clarify why comparing serialized values seems the way
>>> to go, now?
>>>
>>> 2. Ivan, Could you please answer my questions that I posted earlier? I
>>> will repost it here:
>>> Ivan, could you please make this matter a bit clearer in the KIP?
>>> Actually, thinking about it again, I do currently not see why it should
>>> not make sense in hopping windows. Regarding this, I do not understand
>>> the following sentence:
>>>
>>> "hopping and sliding windows do not make much sense for distinct()
>>> because they produce multiple intersected windows, so that one record
>>> can be multiplied instead of deduplication."
>>>
>>> Ivan, what do you mean with "multiplied"?
>>>
>>> 3. As I said earlier, I do not think that SQL and the Java Stream API
>>> are good arguments to not use a verb. However, if everybody else is fine
>>> with it, I can get behind it.
>>>
>>> John, good catch about the missing overloads!
>>> BTW, the overload with Named should be there regardless of stateful or
>>> stateless.
>>>
>>> Best,
>>> Bruno
>>>
>>> On 22.07.21 20:58, John Roesler wrote:
>>>> Hi Ivan,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> 1. I think I might have gotten myself confused. I was
>>>> thinking of this operation as stateless, but now I'm not
>>>> sure what I was thinking... This operator has to be
>>>> stateful, right? In that case, I agree that comparing
>>>> serialized values seems to be way to do it.
>>>>
>>>> 2. Thanks for the confirmation
>>>>
>>>> 3. I continue to be satisfied to let you all hash it out.
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote:
>>>>> Hi all,
>>>>>
>>>>> 1. Actually I always thought about the serialized byte array only
>>>>> -- at
>>>>> least this is what local stores depend upon, and what Kafka itself
>>>>> depends upon when doing log compaction.
>>>>>
>>>>> I can imagine a case where two different byte arrays deserialize to
>>>>> objects which are `equals` to each other. But I think we can ignore
>>>>> this
>>>>> for now because IMO the risks related to buggy `equals`
>>>>> implementations
>>>>> outweigh the potential benefits.
>>>>>
>>>>> I will mention the duplicate definition in the KIP.
>>>>>
>>>>> 2. I agree with John, he got my point.
>>>>>
>>>>> 3. Let me gently insist on `distinct`. I believe that an exception to
>>>>> the rule is appropriate here, because the name `distinct()` is
>>>>> ubiquitous.
>>>>>
>>>>> It's not only about Java Streams API (or .NET LINQ, which appeared
>>>>> earlier and also has `Distinct`): Spark's DataFrame has `distinct()`
>>>>> method, Hazelcast Jet has `distinct()` method, and I bet I can find
>>>>> more
>>>>> examples if I search. When we teach KStreams, we always say that
>>>>> KStreams are just like other streaming APIs, and they have roots in
>>>>> SQL
>>>>> queries. Users know what `distinct` is and they expect it to be in
>>>>> the API.
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ivan
>>>>>
>>>>>
>>>>> 13.07.2021 0:10, John Roesler пишет:
>>>>>> Hi all,
>>>>>>
>>>>>> Bruno raised some very good points. I’d like to chime in with
>>>>>> additional context.
>>>>>>
>>>>>> 1. Great point. We faced a similar problem defining KIP-557. For
>>>>>> 557, we chose to use the serialized byte array instead of the
>>>>>> equals() method, but I think the situation in KIP-655 is a bit
>>>>>> different. I think it might make sense to use the equals() method
>>>>>> here, but am curious what Ivan thinks.
>>>>>>
>>>>>> 2. I figured we'd do nothing. I thought Ivan was just saying that
>>>>>> it doesn't make a ton of sense to use it, which I agree with, but
>>>>>> it doesn't seem like that means we should prohibit it.
>>>>>>
>>>>>> 3. FWIW, I don't have a strong feeling either way.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote:
>>>>>>> Hi Ivan,
>>>>>>>
>>>>>>> Thank you for the KIP!
>>>>>>>
>>>>>>> Some aspects are not clear to me from the KIP and I have a proposal.
>>>>>>>
>>>>>>> 1. The KIP does not describe the criteria that define a
>>>>>>> duplicate. Could
>>>>>>> you add a definition of duplicate to the KIP?
>>>>>>>
>>>>>>> 2. The KIP does not describe what happens if distinct() is
>>>>>>> applied on a
>>>>>>> hopping window. On the DSL level, I do not see how you can avoid
>>>>>>> that
>>>>>>> users apply distinct() on a hopping window, i.e., you cannot
>>>>>>> avoid it at
>>>>>>> compile time, you need to check it at runtime and throw an
>>>>>>> exception. Is
>>>>>>> this correct or am I missing something?
>>>>>>>
>>>>>>> 3. I would also like to back a proposal by Sophie. She proposed
>>>>>>> to use
>>>>>>> deduplicate() instead of distinct(), since the other DSL
>>>>>>> operations are
>>>>>>> also verbs. I do not think that SQL and the Java Stream API are good
>>>>>>> arguments to not use a verb.
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>>
>>>>>>> On 10.07.21 19:11, John Roesler wrote:
>>>>>>>> Hi Ivan,
>>>>>>>>
>>>>>>>> Sorry for the silence!
>>>>>>>>
>>>>>>>> I have just re-read the proposal.
>>>>>>>>
>>>>>>>> To summarize, you are now only proposing the zero-arg distict()
>>>>>>>> method to be added to TimeWindowedKStream and
>>>>>>>> SessionWindowedKStream, right?
>>>>>>>>
>>>>>>>> I’m in favor of this proposal.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> John
>>>>>>>>
>>>>>>>> On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote:
>>>>>>>>> Hello everyone,
>>>>>>>>>
>>>>>>>>> I would like to remind you about KIP-655 and KIP-759 just in
>>>>>>>>> case they
>>>>>>>>> got lost in your inbox.
>>>>>>>>>
>>>>>>>>> Now the initial proposal is split into two independent and
>>>>>>>>> smaller ones,
>>>>>>>>> so it must be easier to review them. Of course, if you have time.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Ivan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 24.06.2021 18:11, Ivan Ponomarev пишет:
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have rewritten the KIP-655 summarizing what was agreed upon
>>>>>>>>>> during
>>>>>>>>>> this discussion (now the proposal is much simpler and less
>>>>>>>>>> invasive).
>>>>>>>>>>
>>>>>>>>>> I have also created KIP-759 (cancelRepartition operation) and
>>>>>>>>>> started a
>>>>>>>>>> discussion for it.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Ivan.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 04.06.2021 8:15, Matthias J. Sax пишет:
>>>>>>>>>>> Just skimmed over the thread -- first of all, I am glad that
>>>>>>>>>>> we could
>>>>>>>>>>> merge KIP-418 and ship it :)
>>>>>>>>>>>
>>>>>>>>>>> About the re-partitioning concerns, there are already two
>>>>>>>>>>> tickets for it:
>>>>>>>>>>>
>>>>>>>>>>>       - https://issues.apache.org/jira/browse/KAFKA-4835
>>>>>>>>>>>       - https://issues.apache.org/jira/browse/KAFKA-10844
>>>>>>>>>>>
>>>>>>>>>>> Thus, it seems best to exclude this topic from this KIP, and
>>>>>>>>>>> do a
>>>>>>>>>>> separate KIP for it (if necessary, we can "pause" this KIP
>>>>>>>>>>> until the
>>>>>>>>>>> repartition KIP is done). It's a long standing "issue" and we
>>>>>>>>>>> should
>>>>>>>>>>> resolve it in a general way I guess.
>>>>>>>>>>>
>>>>>>>>>>> (Did not yet ready all responses in detail yet, so keeping
>>>>>>>>>>> this comment
>>>>>>>>>>> short.)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 6/2/21 6:35 AM, John Roesler wrote:
>>>>>>>>>>>> Thanks, Ivan!
>>>>>>>>>>>>
>>>>>>>>>>>> That sounds like a great plan to me. Two smaller KIPs are
>>>>>>>>>>>> easier to
>>>>>>>>>>>> agree on than one big one.
>>>>>>>>>>>>
>>>>>>>>>>>> I agree hopping and sliding windows will actually have a
>>>>>>>>>>>> duplicating
>>>>>>>>>>>> effect. We can avoid adding distinct() to the sliding window
>>>>>>>>>>>> interface, but hopping windows are just a different
>>>>>>>>>>>> parameterization
>>>>>>>>>>>> of epoch-aligned windows. It seems we can’t do much about
>>>>>>>>>>>> that except
>>>>>>>>>>>> document the issue.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> John
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
>>>>>>>>>>>>> Hi John!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think that your proposal is just fantastic, it simplifies
>>>>>>>>>>>>> things a
>>>>>>>>>>>>> lot!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I also felt uncomfortable due to the fact that the proposed
>>>>>>>>>>>>> `distinct()`
>>>>>>>>>>>>> is not somewhere near `count()` and `reduce(..)`. But
>>>>>>>>>>>>> `selectKey(..).groupByKey().windowedBy(..).distinct()`
>>>>>>>>>>>>> didn't look like
>>>>>>>>>>>>> a correct option for  me because of the issue with the
>>>>>>>>>>>>> unneeded
>>>>>>>>>>>>> repartitioning.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The bold idea that we can just CANCEL the repartitioning
>>>>>>>>>>>>> didn't came to
>>>>>>>>>>>>> my mind.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What seemed to me a single problem is in fact two unrelated
>>>>>>>>>>>>> problems:
>>>>>>>>>>>>> `distinct` operation and cancelling the unneeded
>>>>>>>>>>>>> repartitioning.
>>>>>>>>>>>>>
>>>>>>>>>>>>>       > what if we introduce a parameter to `selectKey()`
>>>>>>>>>>>>> that specifies
>>>>>>>>>>>>> that
>>>>>>>>>>>>> the caller asserts that the new key does _not_ change the data
>>>>>>>>>>>>> partitioning?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think a more elegant solution would be not to add a new
>>>>>>>>>>>>> parameter to
>>>>>>>>>>>>> `selectKey` and all the other key-changing operations (`map`,
>>>>>>>>>>>>> `transform`, `flatMap`, ...), but add a new operator
>>>>>>>>>>>>> `KStream#cancelRepartitioning()` that resets
>>>>>>>>>>>>> `keyChangingOperation`
>>>>>>>>>>>>> flag
>>>>>>>>>>>>> for the upstream node. Of course, "use it only if you know
>>>>>>>>>>>>> what you're
>>>>>>>>>>>>> doing" warning is to be added. Well, it's a topic for a
>>>>>>>>>>>>> separate KIP!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Concerning `distinct()`. If we use `XXXWindowedKStream`
>>>>>>>>>>>>> facilities,
>>>>>>>>>>>>> then
>>>>>>>>>>>>> changes to the API are minimally invasive: we're just adding
>>>>>>>>>>>>> `distinct()` to TimeWindowedKStream and
>>>>>>>>>>>>> SessionWindowedKStream, and
>>>>>>>>>>>>> that's all.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We can now define `distinct` as an operation that returns
>>>>>>>>>>>>> only a first
>>>>>>>>>>>>> record that falls into a new window, and filters out all
>>>>>>>>>>>>> the other
>>>>>>>>>>>>> records that fall into an already existing window. BTW, we
>>>>>>>>>>>>> can mock the
>>>>>>>>>>>>> behaviour of such an operation with `TopologyTestDriver` using
>>>>>>>>>>>>> `reduce((l, r) -> STOP)`.filterNot((k,
>>>>>>>>>>>>> v)->STOP.equals(v)).  ;-)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Consider the following example (record times are in seconds):
>>>>>>>>>>>>>
>>>>>>>>>>>>> //three bursts of variously ordered records
>>>>>>>>>>>>> 4, 5, 6
>>>>>>>>>>>>> 23, 22, 24
>>>>>>>>>>>>> 34, 33, 32
>>>>>>>>>>>>> //'late arrivals'
>>>>>>>>>>>>> 7, 22, 35
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. 'Epoch-aligned deduplication' using tumbling windows:
>>>>>>>>>>>>>
>>>>>>>>>>>>> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> produces
>>>>>>>>>>>>>
>>>>>>>>>>>>> (key@[00000/10000], 4)
>>>>>>>>>>>>> (key@[20000/30000], 23)
>>>>>>>>>>>>> (key@[30000/40000], 34)
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- that is, one record per epoch-aligned window.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. Hopping and sliding windows do not make much sense here,
>>>>>>>>>>>>> because
>>>>>>>>>>>>> they
>>>>>>>>>>>>> produce multiple intersected windows, so that one record
>>>>>>>>>>>>> can be
>>>>>>>>>>>>> multiplied, but we want deduplication.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. SessionWindows work for 'data-aligned deduplication'.
>>>>>>>>>>>>>
>>>>>>>>>>>>> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> produces only
>>>>>>>>>>>>>
>>>>>>>>>>>>> ([key@4000/4000], 4)
>>>>>>>>>>>>> ([key@23000/23000], 23)
>>>>>>>>>>>>>
>>>>>>>>>>>>> because all the records bigger than 7 are stuck together in
>>>>>>>>>>>>> one
>>>>>>>>>>>>> session.
>>>>>>>>>>>>> Setting inactivity gap to 9 seconds will return three records:
>>>>>>>>>>>>>
>>>>>>>>>>>>> ([key@4000/4000], 4)
>>>>>>>>>>>>> ([key@23000/23000], 23)
>>>>>>>>>>>>> ([key@34000/34000], 34)
>>>>>>>>>>>>>
>>>>>>>>>>>>> WDYT? If you like this variant, I will re-write KIP-655 and
>>>>>>>>>>>>> propose a
>>>>>>>>>>>>> separate KIP for `cancelRepartitioning` (or whatever name
>>>>>>>>>>>>> we will
>>>>>>>>>>>>> choose
>>>>>>>>>>>>> for it).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 24.05.2021 22:32, John Roesler пишет:
>>>>>>>>>>>>>> Hey there, Ivan!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In typical fashion, I'm going to make a somewhat outlandish
>>>>>>>>>>>>>> proposal. I'm hoping that we can side-step some of the
>>>>>>>>>>>>>> complications that have arisen. Please bear with me.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It seems like `distinct()` is not fundamentally unlike
>>>>>>>>>>>>>> other windowed
>>>>>>>>>>>>>> "aggregation" operations. Your concern about unnecessary
>>>>>>>>>>>>>> repartitioning seems to apply just as well to `count()` as to
>>>>>>>>>>>>>> `distinct()`.
>>>>>>>>>>>>>> This has come up before, but I don't remember when: what
>>>>>>>>>>>>>> if we
>>>>>>>>>>>>>> introduce a parameter to `selectKey()` that specifies that
>>>>>>>>>>>>>> the caller
>>>>>>>>>>>>>> asserts that the new key does _not_ change the data
>>>>>>>>>>>>>> partitioning?
>>>>>>>>>>>>>> The docs on that parameter would of course spell out all
>>>>>>>>>>>>>> the "rights
>>>>>>>>>>>>>> and responsibilities" of setting it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In that case, we could indeed get back to
>>>>>>>>>>>>>> `selectKey(A).windowBy(B).distinct(...)`, where we get to
>>>>>>>>>>>>>> compose the
>>>>>>>>>>>>>> key mapper and the windowing function without having to
>>>>>>>>>>>>>> carve out
>>>>>>>>>>>>>> a separate domain just for `distinct()`. All the rest of
>>>>>>>>>>>>>> the KStream
>>>>>>>>>>>>>> operations would also benefit.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
>>>>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> let me revive the discussion for KIP-655. Now I have some
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>> again and
>>>>>>>>>>>>>>> I'm eager to finalize this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Based on what was already discussed, I think that we can
>>>>>>>>>>>>>>> split the
>>>>>>>>>>>>>>> discussion into three topics for our convenience.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The three topics are:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - idExtractor  (how should we extract the deduplication
>>>>>>>>>>>>>>> key for
>>>>>>>>>>>>>>> the record)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - timeWindows (what time windows should we use)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - miscellaneous (naming etc.)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ---- idExtractor ----
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Original proposal: use (k, v) -> f(k, v) mapper,
>>>>>>>>>>>>>>> defaulting to (k,
>>>>>>>>>>>>>>> v) ->
>>>>>>>>>>>>>>> k.  The drawback here is that we must warn the user to
>>>>>>>>>>>>>>> choose such a
>>>>>>>>>>>>>>> function that sets different IDs for records from different
>>>>>>>>>>>>>>> partitions,
>>>>>>>>>>>>>>> otherwise same IDs might be not co-partitioned (and not
>>>>>>>>>>>>>>> deduplicated as
>>>>>>>>>>>>>>> a result). Additional concern: what should we do when
>>>>>>>>>>>>>>> this function
>>>>>>>>>>>>>>> returns null?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Matthias proposed key-only deduplication: that is, no
>>>>>>>>>>>>>>> idExtractor at
>>>>>>>>>>>>>>> all, and if we want to use `distinct` for a particular
>>>>>>>>>>>>>>> identifier, we
>>>>>>>>>>>>>>> must `selectKey()` before. The drawback of this approach
>>>>>>>>>>>>>>> is that
>>>>>>>>>>>>>>> we will
>>>>>>>>>>>>>>> always have repartitioning after the key selection, while
>>>>>>>>>>>>>>> in practice
>>>>>>>>>>>>>>> repartitioning will not always be necessary (for example,
>>>>>>>>>>>>>>> when the
>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>> stream is such that different values infer different keys).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So here we have a 'safety vs. performance' trade-off. But
>>>>>>>>>>>>>>> 'safe'
>>>>>>>>>>>>>>> variant
>>>>>>>>>>>>>>> is also not very convenient for developers, since we're
>>>>>>>>>>>>>>> forcing
>>>>>>>>>>>>>>> them to
>>>>>>>>>>>>>>> change the structure of their records.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> A 'golden mean' here might be using composite ID with its
>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>> component equals to k and its second component equals to
>>>>>>>>>>>>>>> some f(v) (f
>>>>>>>>>>>>>>> defaults to v -> null, and null value returned by f(v) means
>>>>>>>>>>>>>>> 'deduplicate by the key only'). The nuance here is that
>>>>>>>>>>>>>>> we will have
>>>>>>>>>>>>>>> serializers only for types of k and f(v), and we must
>>>>>>>>>>>>>>> correctly
>>>>>>>>>>>>>>> serialize a tuple (k, f(v)), but of course this is doable.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ---- timeWindows ----
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Originally I proposed TimeWindows only just because they
>>>>>>>>>>>>>>> solved my
>>>>>>>>>>>>>>> particular case :-) but agree with Matthias' and Sophie's
>>>>>>>>>>>>>>> objections.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I like the Sophie's point: we need both epoch-aligned and
>>>>>>>>>>>>>>> data-aligned
>>>>>>>>>>>>>>> windows. IMO this is absolutely correct: "data-aligned is
>>>>>>>>>>>>>>> useful for
>>>>>>>>>>>>>>> example when you know that a large number of updates to a
>>>>>>>>>>>>>>> single key
>>>>>>>>>>>>>>> will occur in short bursts, and epoch-aligned when you
>>>>>>>>>>>>>>> specifically want
>>>>>>>>>>>>>>> to get just a single update per discrete time interval."
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I just cannot agree right away with Sophie's
>>>>>>>>>>>>>>> .groupByKey().windowedBy(...).distinct() proposal, as it
>>>>>>>>>>>>>>> implies  the
>>>>>>>>>>>>>>> key-only deduplication -- see the previous topic.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Epoch-aligned windows are very simple: they should
>>>>>>>>>>>>>>> forward only one
>>>>>>>>>>>>>>> record per enumerated time window. TimeWindows are
>>>>>>>>>>>>>>> exactly what we
>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>> here. I mentioned in the KIP both tumbling and hopping
>>>>>>>>>>>>>>> windows just
>>>>>>>>>>>>>>> because both are possible for TimeWindows, but indeed I
>>>>>>>>>>>>>>> don't see any
>>>>>>>>>>>>>>> real use case for hopping windows, only tumbling windows
>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>> sence IMO.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For data-aligned windows SlidingWindow interface seems to
>>>>>>>>>>>>>>> be a nearly
>>>>>>>>>>>>>>> valid choice. Nearly. It should forward a record once
>>>>>>>>>>>>>>> when it's first
>>>>>>>>>>>>>>> seen, and then not again for any identical records that
>>>>>>>>>>>>>>> fall into the
>>>>>>>>>>>>>>> next N timeUnits.  However, we cannot reuse SlidingWindow
>>>>>>>>>>>>>>> as is,
>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>> just as Matthias noted, SlidingWindows go backward in
>>>>>>>>>>>>>>> time, while we
>>>>>>>>>>>>>>> need a windows that go forward in time, and are not
>>>>>>>>>>>>>>> opened while
>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>> fall into an already existing window. We definitely
>>>>>>>>>>>>>>> should make
>>>>>>>>>>>>>>> our own
>>>>>>>>>>>>>>> implementation, maybe we should call it ExpirationWindow?
>>>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ---- miscellaneous ----
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Persistent/in-memory stores. Matthias proposed to pass
>>>>>>>>>>>>>>> Materialized
>>>>>>>>>>>>>>> parameter next to DistinctParameters (and this is necessary,
>>>>>>>>>>>>>>> because we
>>>>>>>>>>>>>>> will need to provide a serializer for extracted id). This is
>>>>>>>>>>>>>>> absolutely
>>>>>>>>>>>>>>> valid point, I agree and I will fix it in the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Naming. Sophie noted that the Streams DSL operators are
>>>>>>>>>>>>>>> typically
>>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>> as verbs, so she proposes `deduplicate` in favour of
>>>>>>>>>>>>>>> `distinct`. I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> that while it's important to stick to the naming
>>>>>>>>>>>>>>> conventions, it
>>>>>>>>>>>>>>> is also
>>>>>>>>>>>>>>> important to think of the experience of those who come
>>>>>>>>>>>>>>> from different
>>>>>>>>>>>>>>> stacks/technologies. People who are familiar with SQL and
>>>>>>>>>>>>>>> Java
>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>> API must know for sure what does 'distinct' mean, while data
>>>>>>>>>>>>>>> deduplication in general is a more complex task and thus
>>>>>>>>>>>>>>> `deduplicate`
>>>>>>>>>>>>>>> might be misleading. But I'm ready to be convinced if the
>>>>>>>>>>>>>>> majority
>>>>>>>>>>>>>>> thinks otherwise.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
>>>>>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm not convinced either epoch-aligned or data-aligned
>>>>>>>>>>>>>>>> will fit all
>>>>>>>>>>>>>>>> possible use cases.
>>>>>>>>>>>>>>>> Both seem totally reasonable to me: data-aligned is
>>>>>>>>>>>>>>>> useful for
>>>>>>>>>>>>>>>> example when
>>>>>>>>>>>>>>>> you know
>>>>>>>>>>>>>>>> that a large number of updates to a single key will
>>>>>>>>>>>>>>>> occur in
>>>>>>>>>>>>>>>> short bursts,
>>>>>>>>>>>>>>>> and epoch-
>>>>>>>>>>>>>>>> aligned when you specifically want to get just a single
>>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>> per discrete
>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> interval.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Going a step further, though, what if you want just a
>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>> update per
>>>>>>>>>>>>>>>> calendar
>>>>>>>>>>>>>>>> month, or per year with accounting for leap years?
>>>>>>>>>>>>>>>> Neither of
>>>>>>>>>>>>>>>> those are
>>>>>>>>>>>>>>>> serviced that
>>>>>>>>>>>>>>>> well by the existing Windows specification to windowed
>>>>>>>>>>>>>>>> aggregations, a
>>>>>>>>>>>>>>>> well-known
>>>>>>>>>>>>>>>> limitation of the current API. There is actually a KIP
>>>>>>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> going
>>>>>>>>>>>>>>>> on in parallel to fix this
>>>>>>>>>>>>>>>> exact issue and make the windowing interface much more
>>>>>>>>>>>>>>>> flexible.
>>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>> of re-implementing this windowing interface in a similarly
>>>>>>>>>>>>>>>> limited fashion
>>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>> Distinct operator, we could leverage it here and get all
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> benefits
>>>>>>>>>>>>>>>> coming with
>>>>>>>>>>>>>>>> KIP-645.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Specifically, I'm proposing to remove the
>>>>>>>>>>>>>>>> TimeWindows/etc config
>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>> DistinctParameters class, and move the distinct() method
>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>> KStream
>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>> to the TimeWindowedKStream interface. Since it's
>>>>>>>>>>>>>>>> semantically
>>>>>>>>>>>>>>>> similar to a
>>>>>>>>>>>>>>>> kind of
>>>>>>>>>>>>>>>> windowed aggregation, it makes sense to align it with the
>>>>>>>>>>>>>>>> existing windowing
>>>>>>>>>>>>>>>> framework, ie:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> inputStream
>>>>>>>>>>>>>>>>            .groupKyKey()
>>>>>>>>>>>>>>>>            .windowedBy()
>>>>>>>>>>>>>>>>            .distinct()
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Then we could use data-aligned windows if SlidingWindows is
>>>>>>>>>>>>>>>> specified in
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> windowedBy(), and epoch-aligned (or some other kind of
>>>>>>>>>>>>>>>> enumerable
>>>>>>>>>>>>>>>> window)
>>>>>>>>>>>>>>>> if a Windows is specified in windowedBy() (or an
>>>>>>>>>>>>>>>> EnumerableWindowDefinition
>>>>>>>>>>>>>>>> once KIP-645 is implemented to replace Windows).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *SlidingWindows*: should forward a record once when it's
>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>> seen, and
>>>>>>>>>>>>>>>> then not again
>>>>>>>>>>>>>>>> for any identical records that fall into the next N
>>>>>>>>>>>>>>>> timeUnits. This
>>>>>>>>>>>>>>>> includes out-of-order
>>>>>>>>>>>>>>>> records, ie if you have a SlidingWindows of size 10s and
>>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>> records at
>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> 15s, 20s, 14s then you would just forward the one at 15s.
>>>>>>>>>>>>>>>> Presumably, if
>>>>>>>>>>>>>>>> you're
>>>>>>>>>>>>>>>> using SlidingWindows, you don't care about what falls
>>>>>>>>>>>>>>>> into exact
>>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>> boxes, you just
>>>>>>>>>>>>>>>> want to deduplicate. If you do care about exact time
>>>>>>>>>>>>>>>> boxing then
>>>>>>>>>>>>>>>> you should
>>>>>>>>>>>>>>>> use...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *EnumerableWindowDefinition* (eg *TimeWindows*): should
>>>>>>>>>>>>>>>> forward
>>>>>>>>>>>>>>>> only one
>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>> per enumerated time window. If you get a records at 15s,
>>>>>>>>>>>>>>>> 20s,14s
>>>>>>>>>>>>>>>> where the
>>>>>>>>>>>>>>>> windows
>>>>>>>>>>>>>>>> are enumerated at [5,14], [15, 24], etc then you forward
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> record at 15s
>>>>>>>>>>>>>>>> and also
>>>>>>>>>>>>>>>> the record at 14s
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Just an idea: not sure if the impedance mismatch would
>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>> users off
>>>>>>>>>>>>>>>> since the
>>>>>>>>>>>>>>>> semantics of the distinct windows are slightly different
>>>>>>>>>>>>>>>> than in the
>>>>>>>>>>>>>>>> aggregations.
>>>>>>>>>>>>>>>> But if we don't fit this into the existing windowed
>>>>>>>>>>>>>>>> framework,
>>>>>>>>>>>>>>>> then we
>>>>>>>>>>>>>>>> shouldn't use
>>>>>>>>>>>>>>>> any existing Windows-type classes at all, imo. ie we should
>>>>>>>>>>>>>>>> create a new
>>>>>>>>>>>>>>>> DistinctWindows config class, similar to how
>>>>>>>>>>>>>>>> stream-stream joins
>>>>>>>>>>>>>>>> get their
>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>> JoinWindows class
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also think that non-windowed deduplication could be
>>>>>>>>>>>>>>>> useful, in
>>>>>>>>>>>>>>>> which case
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> would want to also have the distinct() operator on the
>>>>>>>>>>>>>>>> KStream
>>>>>>>>>>>>>>>> interface.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One quick note regarding the naming: it seems like the
>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>> DSL operators
>>>>>>>>>>>>>>>> are typically named as verbs rather than adjectives, for
>>>>>>>>>>>>>>>> example.
>>>>>>>>>>>>>>>> #suppress
>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>> #aggregate. I get that there's some precedent for 
>>>>>>>>>>>>>>>> 'distinct'
>>>>>>>>>>>>>>>> specifically,
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> maybe something like 'deduplicate' would be more
>>>>>>>>>>>>>>>> appropriate for
>>>>>>>>>>>>>>>> the Streams
>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your review! It made me think deeper, and
>>>>>>>>>>>>>>>>> indeed I
>>>>>>>>>>>>>>>>> understood
>>>>>>>>>>>>>>>>> that I was missing some important details.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> To simplify, let me explain my particular use case
>>>>>>>>>>>>>>>>> first so I
>>>>>>>>>>>>>>>>> can refer
>>>>>>>>>>>>>>>>> to it later.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We have a system that collects information about
>>>>>>>>>>>>>>>>> ongoing live
>>>>>>>>>>>>>>>>> sporting
>>>>>>>>>>>>>>>>> events from different sources. The information sources
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> their IDs
>>>>>>>>>>>>>>>>> and these IDs are keys of the stream. Each source emits
>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>> concerning sporting events, and we can have many
>>>>>>>>>>>>>>>>> messages about
>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>> sporing event from each source. Event ID is extracted
>>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>> message.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We need a database of event IDs that were reported at
>>>>>>>>>>>>>>>>> least once
>>>>>>>>>>>>>>>>> by each
>>>>>>>>>>>>>>>>> source (important: events from different sources are
>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>> to be
>>>>>>>>>>>>>>>>> different entities). The requirements are:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1) each new event ID should be written to the database
>>>>>>>>>>>>>>>>> as soon
>>>>>>>>>>>>>>>>> as possible
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2) although it's ok and sometimes even desired to
>>>>>>>>>>>>>>>>> repeat the
>>>>>>>>>>>>>>>>> notification about already known event ID, but we
>>>>>>>>>>>>>>>>> wouldn’t like our
>>>>>>>>>>>>>>>>> database to be bothered by the same event ID more often
>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>> once in a
>>>>>>>>>>>>>>>>> given period of time (say, 15 minutes).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> With this example in mind let me answer your questions
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (1) Using the `idExtractor` has the issue
>>>>>>>>>>>>>>>>> that data might
>>>>>>>>>>>>>>>>> not be
>>>>>>>>>>>>>>>>>         > co-partitioned as you mentioned in the KIP.
>>>>>>>>>>>>>>>>> Thus, I am
>>>>>>>>>>>>>>>>> wondering if it
>>>>>>>>>>>>>>>>>         > might be better to do deduplication only on
>>>>>>>>>>>>>>>>> the key? If
>>>>>>>>>>>>>>>>> one sets a new
>>>>>>>>>>>>>>>>>         > key upstream (ie, extracts the deduplication
>>>>>>>>>>>>>>>>> id into the
>>>>>>>>>>>>>>>>> key), the
>>>>>>>>>>>>>>>>>         > `distinct` operator could automatically
>>>>>>>>>>>>>>>>> repartition the
>>>>>>>>>>>>>>>>> data and thus we
>>>>>>>>>>>>>>>>>         > would avoid user errors.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Of course with 'key-only' deduplication +
>>>>>>>>>>>>>>>>> autorepartitioning we
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> never cause problems with co-partitioning. But in
>>>>>>>>>>>>>>>>> practice, we
>>>>>>>>>>>>>>>>> often
>>>>>>>>>>>>>>>>> don't need repartitioning even if 'dedup ID' is
>>>>>>>>>>>>>>>>> different from
>>>>>>>>>>>>>>>>> the key,
>>>>>>>>>>>>>>>>> like in my example above. So here we have a sort of
>>>>>>>>>>>>>>>>> 'performance vs
>>>>>>>>>>>>>>>>> security' tradeoff.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The 'golden middle way' here can be the following: we
>>>>>>>>>>>>>>>>> can form a
>>>>>>>>>>>>>>>>> deduplication ID as KEY + separator +
>>>>>>>>>>>>>>>>> idExtractor(VALUE). In case
>>>>>>>>>>>>>>>>> idExtractor is not provided, we deduplicate by key only
>>>>>>>>>>>>>>>>> (as in
>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>> proposal). Then idExtractor transforms only the value
>>>>>>>>>>>>>>>>> (and not
>>>>>>>>>>>>>>>>> the key)
>>>>>>>>>>>>>>>>> and its result is appended to the key. Records from
>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>> will inherently have different deduplication IDs and
>>>>>>>>>>>>>>>>> all the
>>>>>>>>>>>>>>>>> data will
>>>>>>>>>>>>>>>>> be co-partitioned. As with any stateful operation, we will
>>>>>>>>>>>>>>>>> repartition
>>>>>>>>>>>>>>>>> the topic in case the key was changed upstream, but
>>>>>>>>>>>>>>>>> only in this
>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>> thus avoiding unnecessary repartitioning. My example
>>>>>>>>>>>>>>>>> above fits
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> perfectly.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (2) What is the motivation for allowing the
>>>>>>>>>>>>>>>>> `idExtractor`
>>>>>>>>>>>>>>>>> to return
>>>>>>>>>>>>>>>>>         > `null`? Might be good to have some use-case
>>>>>>>>>>>>>>>>> examples for
>>>>>>>>>>>>>>>>> this feature.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Can't think of any use-cases. As it often happens, it's
>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>> came with a
>>>>>>>>>>>>>>>>> copy-paste from StackOverflow -- see Michael Noll's
>>>>>>>>>>>>>>>>> answer here:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> But, jokes aside, we'll have to decide what to do with
>>>>>>>>>>>>>>>>> nulls. If we
>>>>>>>>>>>>>>>>> accept the above proposal of having deduplication ID as
>>>>>>>>>>>>>>>>> KEY +
>>>>>>>>>>>>>>>>> postfix,
>>>>>>>>>>>>>>>>> then null can be treated as no postfix at all. If we don't
>>>>>>>>>>>>>>>>> accept this
>>>>>>>>>>>>>>>>> approach, then treating nulls as 'no-deduplication'
>>>>>>>>>>>>>>>>> seems to be a
>>>>>>>>>>>>>>>>> reasonable assumption (we can't get or put null as a
>>>>>>>>>>>>>>>>> key to a KV
>>>>>>>>>>>>>>>>> store,
>>>>>>>>>>>>>>>>> so a record with null ID is always going to look 'new'
>>>>>>>>>>>>>>>>> for us).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (2) Is using a `TimeWindow` really what we
>>>>>>>>>>>>>>>>> want? I was
>>>>>>>>>>>>>>>>> wondering if a
>>>>>>>>>>>>>>>>>         > `SlidingWindow` might be better? Or maybe we
>>>>>>>>>>>>>>>>> need a new
>>>>>>>>>>>>>>>>> type of window?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Agree. It's probably not what we want. Once I thought
>>>>>>>>>>>>>>>>> that reusing
>>>>>>>>>>>>>>>>> TimeWindow is a clever idea, now I don't.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Do we need epoch alignment in our use case? No, we
>>>>>>>>>>>>>>>>> don't, and I
>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> know if anyone going to need this. Epoch alignment is
>>>>>>>>>>>>>>>>> good for
>>>>>>>>>>>>>>>>> aggregation, but deduplication is a different story.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me describe the semantic the way I see it now and
>>>>>>>>>>>>>>>>> tell me
>>>>>>>>>>>>>>>>> what you
>>>>>>>>>>>>>>>>> think:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - the only parameter that defines the deduplication
>>>>>>>>>>>>>>>>> logic is
>>>>>>>>>>>>>>>>> 'expiration
>>>>>>>>>>>>>>>>> period'
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - when a deduplication ID arrives and we cannot find it
>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>> store, we
>>>>>>>>>>>>>>>>> forward the message downstream and store the ID + its
>>>>>>>>>>>>>>>>> timestamp.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - when an out-of-order ID arrives with an older
>>>>>>>>>>>>>>>>> timestamp and we
>>>>>>>>>>>>>>>>> find a
>>>>>>>>>>>>>>>>> 'fresher' record, we do nothing and don't forward the
>>>>>>>>>>>>>>>>> message
>>>>>>>>>>>>>>>>> (??? OR
>>>>>>>>>>>>>>>>> NOT? In what case would we want to forward an out-of-order
>>>>>>>>>>>>>>>>> message?)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - when an ID with fresher timestamp arrives we check if
>>>>>>>>>>>>>>>>> it falls
>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>> the expiration period and either forward it or not, but
>>>>>>>>>>>>>>>>> in both
>>>>>>>>>>>>>>>>> cases we
>>>>>>>>>>>>>>>>> update the timestamp of the message in the store
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - the WindowStore retention mechanism should clean up
>>>>>>>>>>>>>>>>> very old
>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>> in order not to run out of space.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (3) `isPersistent` -- instead of using this
>>>>>>>>>>>>>>>>> flag, it seems
>>>>>>>>>>>>>>>>> better to
>>>>>>>>>>>>>>>>>         > allow users to pass in a `Materialized`
>>>>>>>>>>>>>>>>> parameter next to
>>>>>>>>>>>>>>>>>         > `DistinctParameters` to configure the state
>>>>>>>>>>>>>>>>> store?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Fully agree! Users might also want to change the
>>>>>>>>>>>>>>>>> retention time.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (4) I am wondering if we should really have 4
>>>>>>>>>>>>>>>>> overloads for
>>>>>>>>>>>>>>>>>         > `DistinctParameters.with()`? It might be
>>>>>>>>>>>>>>>>> better to have
>>>>>>>>>>>>>>>>> one overload
>>>>>>>>>>>>>>>>>         > with all require parameters, and add optional
>>>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>>>> using the
>>>>>>>>>>>>>>>>>         > builder pattern? This seems to follow the DSL
>>>>>>>>>>>>>>>>> Grammer
>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Oh, I can explain. We can't fully rely on the builder
>>>>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>>>>> because of
>>>>>>>>>>>>>>>>> Java type inference limitations. We have to provide type
>>>>>>>>>>>>>>>>> parameters to
>>>>>>>>>>>>>>>>> the builder methods or the code won't compile: see e.
>>>>>>>>>>>>>>>>> g. this
>>>>>>>>>>>>>>>>> https://twitter.com/inponomarev/status/1265053286933159938
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>> discussion with Tagir Valeev.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When we came across the similar difficulties in
>>>>>>>>>>>>>>>>> KIP-418, we finally
>>>>>>>>>>>>>>>>> decided to add all the necessary overloads to parameter
>>>>>>>>>>>>>>>>> class.
>>>>>>>>>>>>>>>>> So I just
>>>>>>>>>>>>>>>>> reproduced that approach here.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         > (5) Even if it might be an implementation
>>>>>>>>>>>>>>>>> detail (and
>>>>>>>>>>>>>>>>> maybe the KIP
>>>>>>>>>>>>>>>>>         > itself does not need to mention it), can you
>>>>>>>>>>>>>>>>> give a high
>>>>>>>>>>>>>>>>> level overview
>>>>>>>>>>>>>>>>>         > how you intent to implement it (that would be
>>>>>>>>>>>>>>>>> easier to
>>>>>>>>>>>>>>>>> grog, compared
>>>>>>>>>>>>>>>>>         > to reading the PR).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Well as with any operation on KStreamImpl level I'm
>>>>>>>>>>>>>>>>> building a
>>>>>>>>>>>>>>>>> store and
>>>>>>>>>>>>>>>>> a processor node.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KStreamDistinct class is going to be the
>>>>>>>>>>>>>>>>> ProcessorSupplier, with
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> logic regarding the forwarding/muting of the records
>>>>>>>>>>>>>>>>> located in
>>>>>>>>>>>>>>>>> KStreamDistinct.KStreamDistinctProcessor#process
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ----
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Matthias, if you are still reading this :-) a gentle
>>>>>>>>>>>>>>>>> reminder:
>>>>>>>>>>>>>>>>> my PR for
>>>>>>>>>>>>>>>>> already accepted KIP-418 is still waiting for your
>>>>>>>>>>>>>>>>> review. I
>>>>>>>>>>>>>>>>> think it's
>>>>>>>>>>>>>>>>> better for me to finalize at least one  KIP before
>>>>>>>>>>>>>>>>> proceeding to
>>>>>>>>>>>>>>>>> a new
>>>>>>>>>>>>>>>>> one :-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 03.09.2020 4:20, Matthias J. Sax пишет:
>>>>>>>>>>>>>>>>>> Thanks for the KIP Ivan. Having a built-in deduplication
>>>>>>>>>>>>>>>>>> operator is for
>>>>>>>>>>>>>>>>>> sure a good addition.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Couple of questions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (1) Using the `idExtractor` has the issue that data
>>>>>>>>>>>>>>>>>> might not be
>>>>>>>>>>>>>>>>>> co-partitioned as you mentioned in the KIP. Thus, I am
>>>>>>>>>>>>>>>>>> wondering if it
>>>>>>>>>>>>>>>>>> might be better to do deduplication only on the key?
>>>>>>>>>>>>>>>>>> If one
>>>>>>>>>>>>>>>>>> sets a new
>>>>>>>>>>>>>>>>>> key upstream (ie, extracts the deduplication id into
>>>>>>>>>>>>>>>>>> the key), the
>>>>>>>>>>>>>>>>>> `distinct` operator could automatically repartition
>>>>>>>>>>>>>>>>>> the data
>>>>>>>>>>>>>>>>>> and thus we
>>>>>>>>>>>>>>>>>> would avoid user errors.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (2) What is the motivation for allowing the
>>>>>>>>>>>>>>>>>> `idExtractor` to
>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>> `null`? Might be good to have some use-case examples
>>>>>>>>>>>>>>>>>> for this
>>>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (2) Is using a `TimeWindow` really what we want? I was
>>>>>>>>>>>>>>>>>> wondering if a
>>>>>>>>>>>>>>>>>> `SlidingWindow` might be better? Or maybe we need a
>>>>>>>>>>>>>>>>>> new type of
>>>>>>>>>>>>>>>>>> window?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It would be helpful if you could describe potential
>>>>>>>>>>>>>>>>>> use cases
>>>>>>>>>>>>>>>>>> in more
>>>>>>>>>>>>>>>>>> detail. -- I am mainly wondering about hopping window?
>>>>>>>>>>>>>>>>>> Each
>>>>>>>>>>>>>>>>>> record would
>>>>>>>>>>>>>>>>>> always falls into multiple window and thus would be
>>>>>>>>>>>>>>>>>> emitted
>>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>> times, ie, each time the window closes. Is this really
>>>>>>>>>>>>>>>>>> a valid
>>>>>>>>>>>>>>>>>> use case?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It seems that for de-duplication, one wants to have some
>>>>>>>>>>>>>>>>>> "expiration
>>>>>>>>>>>>>>>>>> time", ie, for each ID, deduplicate all consecutive
>>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>>> same ID and emit the first record after the
>>>>>>>>>>>>>>>>>> "expiration time"
>>>>>>>>>>>>>>>>>> passed. In
>>>>>>>>>>>>>>>>>> terms of a window, this would mean that the window
>>>>>>>>>>>>>>>>>> starts at
>>>>>>>>>>>>>>>>>> `r.ts` and
>>>>>>>>>>>>>>>>>> ends at `r.ts + windowSize`, ie, the window is aligned
>>>>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>>>>>> TimeWindows are aligned to the epoch though. While
>>>>>>>>>>>>>>>>>> `SlidingWindows` also
>>>>>>>>>>>>>>>>>> align to the data, for the aggregation use-case they go
>>>>>>>>>>>>>>>>>> backward in
>>>>>>>>>>>>>>>>>> time, while we need a window that goes forward in
>>>>>>>>>>>>>>>>>> time. It's an
>>>>>>>>>>>>>>>>>> open
>>>>>>>>>>>>>>>>>> question if we can re-purpose `SlidingWindows` -- it
>>>>>>>>>>>>>>>>>> might be
>>>>>>>>>>>>>>>>>> ok the
>>>>>>>>>>>>>>>>>> make the alignment (into the past vs into the future)
>>>>>>>>>>>>>>>>>> an operator
>>>>>>>>>>>>>>>>>> dependent behavior?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (3) `isPersistent` -- instead of using this flag, it
>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>> better to
>>>>>>>>>>>>>>>>>> allow users to pass in a `Materialized` parameter next to
>>>>>>>>>>>>>>>>>> `DistinctParameters` to configure the state store?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (4) I am wondering if we should really have 4
>>>>>>>>>>>>>>>>>> overloads for
>>>>>>>>>>>>>>>>>> `DistinctParameters.with()`? It might be better to
>>>>>>>>>>>>>>>>>> have one
>>>>>>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>>>>> with all require parameters, and add optional
>>>>>>>>>>>>>>>>>> parameters using the
>>>>>>>>>>>>>>>>>> builder pattern? This seems to follow the DSL Grammer
>>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (5) Even if it might be an implementation detail (and
>>>>>>>>>>>>>>>>>> maybe the
>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>> itself does not need to mention it), can you give a
>>>>>>>>>>>>>>>>>> high level
>>>>>>>>>>>>>>>>>> overview
>>>>>>>>>>>>>>>>>> how you intent to implement it (that would be easier
>>>>>>>>>>>>>>>>>> to grog,
>>>>>>>>>>>>>>>>>> compared
>>>>>>>>>>>>>>>>>> to reading the PR).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
>>>>>>>>>>>>>>>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion for KIP-655.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> KIP-655:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I also opened a proof-of-concept PR for you to
>>>>>>>>>>>>>>>>>>>> experiment
>>>>>>>>>>>>>>>>>>>> with the API:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>
>>
> 
> 

Reply via email to