Thanks, Almog.

Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware and was actually re-reading my previous email before sending it a few times to make sure I use the right one; it's very subtle.)

For `TimeWindows` semantics are certainly well defined, and there is nothing to be discussed.

For `TimeWindow`, I am not sure as I said, and your interpretation as "just a container" might be fine, too. I agree to the problem, that if we add something new, we might just leak it again, and thus not gain much, so the lesser evil might be to just re-use `TimeWindow` as you propose. I just wanted to point out this question to sanity check, and collect feedback about it.


-Matthias

On 1/23/25 8:58 AM, Almog Gavra wrote:
Thanks Matthias for the quick and detailed feedback!

Nit: it seems you are mixing the terms "out-of-order" and "late" and
using them as synonymous, what we usually not do.

M1. Ah, in my mind "late arriving" was after the window closed but
potentially before grace (and "out of order" was just anything that is out
of order). Do we have a specific word for "after the window closes but
before grace"? Maybe we should have "fashionably late" and "too late"
(haha, just kidding).

I'll clear up the terminology in the KIP.

Today the only way to implement this semantic is using a punctuation
and manually storing events in an aggregation (which is inefficient for
many reasons and can cause severe bottlenecks)
Not sure if I follow this argument? I don't see any relationship to
punctuations. Can you elaborate?

M2. The way we've seen people implement the desired behavior in this KIP is
to use a non-windowed aggregation on a stream, and then use a punctuation
every N seconds to scan the table and emit and delete the records to
simulate a window closing. It's certainly suboptimal, but it gets you very
similar semantics to what I describe in the KIP.

M3. Re: extends Windows<TimeWindow>

(note for other readers, I think you get this Matthias) TimeWindow is
distinct from TimeWindows (with an s at the end). TimeWindows (with an s)
implies exactly what you suggest. I do think, however, that TimeWindow
(without an s) is a perfect abstraction to leverage here. That class
doesn't do anything except to indicate the bounds of a window (it has a
start time, an end time and a definition of whether two windows overlap).
The javadoc for TimeWindow (without an s) doesn't even need to change to be
used in the way this KIP suggests.

As suggested, the StreamTimeWindow still uses time as its bounds - it just
doesn't use the event time to determine which time window the event is
placed in.

M4. Re: I just realized that `Window` is in an `internal` package but
`TimeWindows#windowsFor` does return `Map<Long, TimeWindow>`

Yeah, I was wondering about that as well. I also agree its a bit orthogonal
to the KIP and I think if we introduce a new "extends Window" type we'll
just leak that type as well, so I don't really see a benefit from that. If
we do decide to tackle the leaky abstraction we might as well just have
TimeWindow to handle :)

M5. Specifying abstract methods

Can do!

M6. Naming

I like "OffsetOrderedWindows", I will wait for some other feedback and if
no one has something better I'll update the KIP to that.

Re: `extends Window<TimeWindow>` I do think the fact that it defines the
window size with time is important, we're not saying the window closes
after N offsets have passed but rather the stream time has passed N seconds
(or ms or whatever). I agree that it doesn't need to be part of the main
name, however.


On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <mj...@apache.org> wrote:

Interesting KIP. It's a known problem, and the proposed solution make
sense to me.

Nit: it seems you are mixing the terms "out-of-order" and "late" and
using them as synonymous, what we usually not do.

"Out-of-order" is the more generic term, while "late" means after the
grace period (hence, late arriving data is still out-of-order, but it's
a subset of out-of-order records... all late records are out-of-order,
but not the other way around). I think the KIP could gain clarity and
avoid potential confusion to use both terms not a synonymous.

In the end, "late" data cannot be handled at all right now, based on the
definition of "late". "late" data is always dropped.



Today the only way to implement this semantic is using a punctuation and
manually storing events in an aggregation (which is inefficient for many
reasons and can cause severe bottlenecks)

Not sure if I follow this argument? I don't see any relationship to
punctuations. Can you elaborate?

It's also not clear to me, why "storing events in an aggregation" is
problematic, and/or how this new window type would avoid it?



extends Windows<TimeWindow>

Not sure if this is the right thing to do it? I understand your intent,
but to me, `TimeWindow` has certain semantics, implying we only put data
with record-ts between the window bounds into the window. Not sure if my
interpretation is off, and re-using could be ok? It's just something we
should figure out I guess.

To be fair: it's not 100% clearly specific if these semantics apply or
not. If we believe they don't apply, happy to pivot on this, but we
might want to update the JavaDocs, if we only want to use it as
"container class" (for the lack of better word).

What actually raises a different issue: I just realized that `Window` is
in an `internal` package, but `TimeWindows#windowsFor` does return
`Map<Long, TimeWindow>`, thus making it effectively kinda public... It
seems we should we do something about this? End users do not really need
to use `TimeWindow` -- it's really internal, and it seems `windowsFor`
is effective a leaky abstraction...

If we keep `TimeWindow` internal, I think we can easily be more relaxed
with applied semantics. But it's kinda leaking right now, what does not
make me happy... (Of course, I also don't want to derail and hijack this
KIP for some related, but orthogonal issue in the API...)

On other nit I am wondering about is, if it might be good to specify the
abstract methods inherited from `Windows` on the KIP for clarity? Even
they are implicitly well specified it might be beneficial to spell it
out. After all, `StreamTimeWindow` needs to implement there methods.




And of course

Note: I really don't like the name I cam up with StreamTimeWindows... if
anyone has a better name in mind please suggest one!

I am not a fan of the name either. But not really sure what a better
name could be. In the end, it's more of an "offset ordered window"
rather than a "time window", because it does not really take the record
timestamps into account to figure out into what window a record goes
into, but it really uses the offset order.

I believe that the size of window itself does not matter too much for
naming, so even if the size is defined in time, I don't think that the
term "time" must be part of the window name. (That's also why I am
wondering if we should go with `extends<TimeWindow>` or add something
new, which we hopefully can keep internal...)

Not sure if we ever intent to make `StreamTimeWindows` overlapping?
Given the described use-cases it seems very unlikely? So effectively
it's a "tumbling window" . Of course, we should not just overload this
term, but there is no `TumblingWindows` in the API -- only the term
Tumbling Window in the docs to describe the special case of
non-overlapping `TimeWindows`.

So maybe `OffsetTumblingWindows` could work? Or `OffsetOrderedWindows`?
Just spitballing here, to get a discussion going...



-Matthias



On 1/22/25 8:26 PM, Almog Gavra wrote:
Hello!

I'd like to initiate a discussion thread on KIP-1127:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data

This KIP aims to make it easier to specify windowing semantics that are
more tolerable to late arriving data, particularly with suppression.

Would appreciate any and all feedback.

Cheers,
Almog





Reply via email to