Hi Matthias,

Thanks for your review! It made me think deeper, and indeed I understood that I was missing some important details.

To simplify, let me explain my particular use case first so I can refer to it later.

We have a system that collects information about ongoing live sporting events from different sources. The information sources have their IDs and these IDs are keys of the stream. Each source emits messages concerning sporting events, and we can have many messages about each sporing event from each source. Event ID is extracted from the message.

We need a database of event IDs that were reported at least once by each source (important: events from different sources are considered to be different entities). The requirements are:

1) each new event ID should be written to the database as soon as possible

2) although it's ok and sometimes even desired to repeat the notification about already known event ID, but we wouldn’t like our database to be bothered by the same event ID more often than once in a given period of time (say, 15 minutes).

With this example in mind let me answer your questions

> (1) Using the `idExtractor` has the issue that data might not be
> co-partitioned as you mentioned in the KIP. Thus, I am wondering if it
> might be better to do deduplication only on the key? If one sets a new
> key upstream (ie, extracts the deduplication id into the key), the
> `distinct` operator could automatically repartition the data and thus we
> would avoid user errors.

Of course with 'key-only' deduplication + autorepartitioning we will never cause problems with co-partitioning. But in practice, we often don't need repartitioning even if 'dedup ID' is different from the key, like in my example above. So here we have a sort of 'performance vs security' tradeoff.

The 'golden middle way' here can be the following: we can form a deduplication ID as KEY + separator + idExtractor(VALUE). In case idExtractor is not provided, we deduplicate by key only (as in original proposal). Then idExtractor transforms only the value (and not the key) and its result is appended to the key. Records from different partitions will inherently have different deduplication IDs and all the data will be co-partitioned. As with any stateful operation, we will repartition the topic in case the key was changed upstream, but only in this case, thus avoiding unnecessary repartitioning. My example above fits this perfectly.

> (2) What is the motivation for allowing the `idExtractor` to return
> `null`? Might be good to have some use-case examples for this feature.

Can't think of any use-cases. As it often happens, it's just came with a copy-paste from StackOverflow -- see Michael Noll's answer here: https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions

But, jokes aside, we'll have to decide what to do with nulls. If we accept the above proposal of having deduplication ID as KEY + postfix, then null can be treated as no postfix at all. If we don't accept this approach, then treating nulls as 'no-deduplication' seems to be a reasonable assumption (we can't get or put null as a key to a KV store, so a record with null ID is always going to look 'new' for us).


> (2) Is using a `TimeWindow` really what we want? I was wondering if a
> `SlidingWindow` might be better? Or maybe we need a new type of window?

Agree. It's probably not what we want. Once I thought that reusing TimeWindow is a clever idea, now I don't.

Do we need epoch alignment in our use case? No, we don't, and I don't know if anyone going to need this. Epoch alignment is good for aggregation, but deduplication is a different story.

Let me describe the semantic the way I see it now and tell me what you think:

- the only parameter that defines the deduplication logic is 'expiration period'

- when a deduplication ID arrives and we cannot find it in the store, we forward the message downstream and store the ID + its timestamp.

- when an out-of-order ID arrives with an older timestamp and we find a 'fresher' record, we do nothing and don't forward the message (??? OR NOT? In what case would we want to forward an out-of-order message?)

- when an ID with fresher timestamp arrives we check if it falls into the expiration period and either forward it or not, but in both cases we update the timestamp of the message in the store

- the WindowStore retention mechanism should clean up very old records in order not to run out of space.

> (3) `isPersistent` -- instead of using this flag, it seems better to
> allow users to pass in a `Materialized` parameter next to
> `DistinctParameters` to configure the state store?

Fully agree! Users might also want to change the retention time.

> (4) I am wondering if we should really have 4 overloads for
> `DistinctParameters.with()`? It might be better to have one overload
> with all require parameters, and add optional parameters using the
> builder pattern? This seems to follow the DSL Grammer proposal.

Oh, I can explain. We can't fully rely on the builder pattern because of Java type inference limitations. We have to provide type parameters to the builder methods or the code won't compile: see e. g. this https://twitter.com/inponomarev/status/1265053286933159938 and following discussion with Tagir Valeev.

When we came across the similar difficulties in KIP-418, we finally decided to add all the necessary overloads to parameter class. So I just reproduced that approach here.

> (5) Even if it might be an implementation detail (and maybe the KIP
> itself does not need to mention it), can you give a high level overview
> how you intent to implement it (that would be easier to grog, compared
> to reading the PR).

Well as with any operation on KStreamImpl level I'm building a store and a processor node.

KStreamDistinct class is going to be the ProcessorSupplier, with the logic regarding the forwarding/muting of the records located in KStreamDistinct.KStreamDistinctProcessor#process

----

Matthias, if you are still reading this :-) a gentle reminder: my PR for already accepted KIP-418 is still waiting for your review. I think it's better for me to finalize at least one KIP before proceeding to a new one :-)

Regards,

Ivan

03.09.2020 4:20, Matthias J. Sax пишет:
Thanks for the KIP Ivan. Having a built-in deduplication operator is for
sure a good addition.

Couple of questions:

(1) Using the `idExtractor` has the issue that data might not be
co-partitioned as you mentioned in the KIP. Thus, I am wondering if it
might be better to do deduplication only on the key? If one sets a new
key upstream (ie, extracts the deduplication id into the key), the
`distinct` operator could automatically repartition the data and thus we
would avoid user errors.

(2) What is the motivation for allowing the `idExtractor` to return
`null`? Might be good to have some use-case examples for this feature.

(2) Is using a `TimeWindow` really what we want? I was wondering if a
`SlidingWindow` might be better? Or maybe we need a new type of window?

It would be helpful if you could describe potential use cases in more
detail. -- I am mainly wondering about hopping window? Each record would
always falls into multiple window and thus would be emitted multiple
times, ie, each time the window closes. Is this really a valid use case?

It seems that for de-duplication, one wants to have some "expiration
time", ie, for each ID, deduplicate all consecutive records with the
same ID and emit the first record after the "expiration time" passed. In
terms of a window, this would mean that the window starts at `r.ts` and
ends at `r.ts + windowSize`, ie, the window is aligned to the data.
TimeWindows are aligned to the epoch though. While `SlidingWindows` also
align to the data, for the aggregation use-case they go backward in
time, while we need a window that goes forward in time. It's an open
question if we can re-purpose `SlidingWindows` -- it might be ok the
make the alignment (into the past vs into the future) an operator
dependent behavior?

(3) `isPersistent` -- instead of using this flag, it seems better to
allow users to pass in a `Materialized` parameter next to
`DistinctParameters` to configure the state store?

(4) I am wondering if we should really have 4 overloads for
`DistinctParameters.with()`? It might be better to have one overload
with all require parameters, and add optional parameters using the
builder pattern? This seems to follow the DSL Grammer proposal.

(5) Even if it might be an implementation detail (and maybe the KIP
itself does not need to mention it), can you give a high level overview
how you intent to implement it (that would be easier to grog, compared
to reading the PR).



-Matthias

On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
Sorry, I forgot to add [DISCUSS] tag to the topic

24.08.2020 2:27, Ivan Ponomarev пишет:
Hello,

I'd like to start a discussion for KIP-655.

KIP-655:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API


I also opened a proof-of-concept PR for you to experiment with the API:

PR#9210: https://github.com/apache/kafka/pull/9210

Regards,

Ivan Ponomarev



Reply via email to