Hey all,
I'm not convinced either epoch-aligned or data-aligned will fit all
possible use cases.
Both seem totally reasonable to me: data-aligned is useful for
example when
you know
that a large number of updates to a single key will occur in
short bursts,
and epoch-
aligned when you specifically want to get just a single update
per discrete
time
interval.
Going a step further, though, what if you want just a single
update per
calendar
month, or per year with accounting for leap years? Neither of
those are
serviced that
well by the existing Windows specification to windowed
aggregations, a
well-known
limitation of the current API. There is actually a KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
going
on in parallel to fix this
exact issue and make the windowing interface much more flexible.
Maybe
instead
of re-implementing this windowing interface in a similarly
limited fashion
for the
Distinct operator, we could leverage it here and get all the
benefits
coming with
KIP-645.
Specifically, I'm proposing to remove the TimeWindows/etc config
from the
DistinctParameters class, and move the distinct() method from the
KStream
interface
to the TimeWindowedKStream interface. Since it's semantically
similar to a
kind of
windowed aggregation, it makes sense to align it with the
existing windowing
framework, ie:
inputStream
.groupKyKey()
.windowedBy()
.distinct()
Then we could use data-aligned windows if SlidingWindows is
specified in
the
windowedBy(), and epoch-aligned (or some other kind of enumerable
window)
if a Windows is specified in windowedBy() (or an
EnumerableWindowDefinition
once KIP-645 is implemented to replace Windows).
*SlidingWindows*: should forward a record once when it's first
seen, and
then not again
for any identical records that fall into the next N timeUnits. This
includes out-of-order
records, ie if you have a SlidingWindows of size 10s and process
records at
time
15s, 20s, 14s then you would just forward the one at 15s.
Presumably, if
you're
using SlidingWindows, you don't care about what falls into exact
time
boxes, you just
want to deduplicate. If you do care about exact time boxing then
you should
use...
*EnumerableWindowDefinition* (eg *TimeWindows*): should forward
only one
record
per enumerated time window. If you get a records at 15s, 20s,14s
where the
windows
are enumerated at [5,14], [15, 24], etc then you forward the
record at 15s
and also
the record at 14s
Just an idea: not sure if the impedance mismatch would throw
users off
since the
semantics of the distinct windows are slightly different than in the
aggregations.
But if we don't fit this into the existing windowed framework,
then we
shouldn't use
any existing Windows-type classes at all, imo. ie we should
create a new
DistinctWindows config class, similar to how stream-stream joins
get their
own
JoinWindows class
I also think that non-windowed deduplication could be useful, in
which case
we
would want to also have the distinct() operator on the KStream
interface.
One quick note regarding the naming: it seems like the Streams
DSL operators
are typically named as verbs rather than adjectives, for example.
#suppress
or
#aggregate. I get that there's some precedent for 'distinct'
specifically,
but
maybe something like 'deduplicate' would be more appropriate for
the Streams
API.
WDYT?
On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:
Hi Matthias,
Thanks for your review! It made me think deeper, and indeed I
understood
that I was missing some important details.
To simplify, let me explain my particular use case first so I
can refer
to it later.
We have a system that collects information about ongoing live
sporting
events from different sources. The information sources have
their IDs
and these IDs are keys of the stream. Each source emits messages
concerning sporting events, and we can have many messages about
each
sporing event from each source. Event ID is extracted from the
message.
We need a database of event IDs that were reported at least once
by each
source (important: events from different sources are considered
to be
different entities). The requirements are:
1) each new event ID should be written to the database as soon
as possible
2) although it's ok and sometimes even desired to repeat the
notification about already known event ID, but we wouldn’t like our
database to be bothered by the same event ID more often than
once in a
given period of time (say, 15 minutes).
With this example in mind let me answer your questions
> (1) Using the `idExtractor` has the issue that data might
not be
> co-partitioned as you mentioned in the KIP. Thus, I am
wondering if it
> might be better to do deduplication only on the key? If
one sets a new
> key upstream (ie, extracts the deduplication id into the
key), the
> `distinct` operator could automatically repartition the
data and thus we
> would avoid user errors.
Of course with 'key-only' deduplication + autorepartitioning we
will
never cause problems with co-partitioning. But in practice, we
often
don't need repartitioning even if 'dedup ID' is different from
the key,
like in my example above. So here we have a sort of 'performance vs
security' tradeoff.
The 'golden middle way' here can be the following: we can form a
deduplication ID as KEY + separator + idExtractor(VALUE). In case
idExtractor is not provided, we deduplicate by key only (as in
original
proposal). Then idExtractor transforms only the value (and not
the key)
and its result is appended to the key. Records from different
partitions
will inherently have different deduplication IDs and all the
data will
be co-partitioned. As with any stateful operation, we will
repartition
the topic in case the key was changed upstream, but only in this
case,
thus avoiding unnecessary repartitioning. My example above fits
this
perfectly.
> (2) What is the motivation for allowing the `idExtractor`
to return
> `null`? Might be good to have some use-case examples for
this feature.
Can't think of any use-cases. As it often happens, it's just
came with a
copy-paste from StackOverflow -- see Michael Noll's answer here:
https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
But, jokes aside, we'll have to decide what to do with nulls. If we
accept the above proposal of having deduplication ID as KEY +
postfix,
then null can be treated as no postfix at all. If we don't
accept this
approach, then treating nulls as 'no-deduplication' seems to be a
reasonable assumption (we can't get or put null as a key to a KV
store,
so a record with null ID is always going to look 'new' for us).
> (2) Is using a `TimeWindow` really what we want? I was
wondering if a
> `SlidingWindow` might be better? Or maybe we need a new
type of window?
Agree. It's probably not what we want. Once I thought that reusing
TimeWindow is a clever idea, now I don't.
Do we need epoch alignment in our use case? No, we don't, and I
don't
know if anyone going to need this. Epoch alignment is good for
aggregation, but deduplication is a different story.
Let me describe the semantic the way I see it now and tell me
what you
think:
- the only parameter that defines the deduplication logic is
'expiration
period'
- when a deduplication ID arrives and we cannot find it in the
store, we
forward the message downstream and store the ID + its timestamp.
- when an out-of-order ID arrives with an older timestamp and we
find a
'fresher' record, we do nothing and don't forward the message
(??? OR
NOT? In what case would we want to forward an out-of-order
message?)
- when an ID with fresher timestamp arrives we check if it falls
into
the expiration period and either forward it or not, but in both
cases we
update the timestamp of the message in the store
- the WindowStore retention mechanism should clean up very old
records
in order not to run out of space.
> (3) `isPersistent` -- instead of using this flag, it seems
better to
> allow users to pass in a `Materialized` parameter next to
> `DistinctParameters` to configure the state store?
Fully agree! Users might also want to change the retention time.
> (4) I am wondering if we should really have 4 overloads for
> `DistinctParameters.with()`? It might be better to have
one overload
> with all require parameters, and add optional parameters
using the
> builder pattern? This seems to follow the DSL Grammer
proposal.
Oh, I can explain. We can't fully rely on the builder pattern
because of
Java type inference limitations. We have to provide type
parameters to
the builder methods or the code won't compile: see e. g. this
https://twitter.com/inponomarev/status/1265053286933159938 and
following
discussion with Tagir Valeev.
When we came across the similar difficulties in KIP-418, we finally
decided to add all the necessary overloads to parameter class.
So I just
reproduced that approach here.
> (5) Even if it might be an implementation detail (and
maybe the KIP
> itself does not need to mention it), can you give a high
level overview
> how you intent to implement it (that would be easier to
grog, compared
> to reading the PR).
Well as with any operation on KStreamImpl level I'm building a
store and
a processor node.
KStreamDistinct class is going to be the ProcessorSupplier, with
the
logic regarding the forwarding/muting of the records located in
KStreamDistinct.KStreamDistinctProcessor#process
----
Matthias, if you are still reading this :-) a gentle reminder:
my PR for
already accepted KIP-418 is still waiting for your review. I
think it's
better for me to finalize at least one KIP before proceeding to
a new
one :-)
Regards,
Ivan
03.09.2020 4:20, Matthias J. Sax пишет:
Thanks for the KIP Ivan. Having a built-in deduplication
operator is for
sure a good addition.
Couple of questions:
(1) Using the `idExtractor` has the issue that data might not be
co-partitioned as you mentioned in the KIP. Thus, I am
wondering if it
might be better to do deduplication only on the key? If one
sets a new
key upstream (ie, extracts the deduplication id into the key), the
`distinct` operator could automatically repartition the data
and thus we
would avoid user errors.
(2) What is the motivation for allowing the `idExtractor` to
return
`null`? Might be good to have some use-case examples for this
feature.
(2) Is using a `TimeWindow` really what we want? I was
wondering if a
`SlidingWindow` might be better? Or maybe we need a new type of
window?
It would be helpful if you could describe potential use cases
in more
detail. -- I am mainly wondering about hopping window? Each
record would
always falls into multiple window and thus would be emitted
multiple
times, ie, each time the window closes. Is this really a valid
use case?
It seems that for de-duplication, one wants to have some
"expiration
time", ie, for each ID, deduplicate all consecutive records
with the
same ID and emit the first record after the "expiration time"
passed. In
terms of a window, this would mean that the window starts at
`r.ts` and
ends at `r.ts + windowSize`, ie, the window is aligned to the
data.
TimeWindows are aligned to the epoch though. While
`SlidingWindows` also
align to the data, for the aggregation use-case they go
backward in
time, while we need a window that goes forward in time. It's an
open
question if we can re-purpose `SlidingWindows` -- it might be
ok the
make the alignment (into the past vs into the future) an operator
dependent behavior?
(3) `isPersistent` -- instead of using this flag, it seems
better to
allow users to pass in a `Materialized` parameter next to
`DistinctParameters` to configure the state store?
(4) I am wondering if we should really have 4 overloads for
`DistinctParameters.with()`? It might be better to have one
overload
with all require parameters, and add optional parameters using the
builder pattern? This seems to follow the DSL Grammer proposal.
(5) Even if it might be an implementation detail (and maybe the
KIP
itself does not need to mention it), can you give a high level
overview
how you intent to implement it (that would be easier to grog,
compared
to reading the PR).
-Matthias
On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
Sorry, I forgot to add [DISCUSS] tag to the topic
24.08.2020 2:27, Ivan Ponomarev пишет:
Hello,
I'd like to start a discussion for KIP-655.
KIP-655:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
I also opened a proof-of-concept PR for you to experiment
with the API:
PR#9210: https://github.com/apache/kafka/pull/9210
Regards,
Ivan Ponomarev