Interesting thoughts. So maybe we could go with `BatchWindows` as a
name? Again, only spit-balling...
If we really put "(micro-)batching" in the center of this idea, I think
both count-based and time-based (and time could actually be either
stream-time or wall-clock-time), or any combination of these dimensions
could all make sense.
Having said this: I don't think we need to support all dimensions
initially, but if we add something like `BatchWindows` we can extent the
supported specification incrementally.
What I don't understand is, why the relationship to
suppress()/emitStrategy() is relevant? Can you elaborate a little bit?
how to make it deterministic with regards to time
I think, in any case, we are leaving determinism-land (at least to some
extent). I guess, as long as the `BatchWindows` are applied directly to
an input topic, we get determinism for both stream-time size, as well as
count-size windows (not for wall-clock0time, of course).
However, if data is auto-repartitioned before a `BatchWindows` step,
repartitioning introduces non-deterministic order in the repartition
topic due to interleaved writes, and thus, also stream-time based
windows would become non-deterministic (as there is no grace-period by
design to "fix" the non-deterministic order, in contrast to full
event-time based windows we support so far).
So I don't think that is an actual difference between stream-time or
count-based `BatchWindows` with regard to determinism.
which is important for EOS and even ALOS
I don't see any relationship to EOS or ALOS. Can you explain what you mean?
-Matthias
On 1/28/25 10:43 AM, Almog Gavra wrote:
Thanks for the feedback Lucas and Bruno!
L0. "Given the motivation section, it sounds we actually want something
that I'd call "batching" rather than "windowing"."
You are right here, and I think ultimately introducing more flexible and
controlled micro-batching will be useful for Kafka Streams, but that
expands the scope a little more than I'd want. We'd have to rethink the
implications of suppression as well as emitting and how to make it
deterministic with regards to time (which is important for EOS and even
ALOS). The proposal here is a halfway point that I think is easy enough to
reason about within the existing semantics of Kafka Streams and gets us 80%
of the benefit. (See the second part below since these questions are
related)
L2/B1. "why are we specifically emitting based on stream-time?" / "Wouldn't
it make more sense to have similarly sized batches?"
There are two motivations for this: (a) most of the use cases for this that
we've encountered are still time sensitive in that they don't want to batch
N records, but rather batch over N seconds (there may be an uneven
distribution of keys so that some keys hit N records very quickly and
others are one and done, we'll never see that key again). (b) this is much
easier to implement, and is a step forward, given this fits really well
into the existing windowing semantics.
Ultimately, as you suggest, I'd want to be able to control these "batches"
with multiple emit strategies. It would be nice to specify a condition like
"emit after N records, or N elapsed seconds, or N bytes accumulated" - but
again (as mentioned above) modeling this with existing Kafka Streams
semantics/operators is a big stretch and I don't want to expand the scope
of this KIP too much for that.
Let me know if that makes sense! Basically I view this as a good middle
ground that doesn't compromise semantics but probably addresses most real
(or near-real) time streaming use cases.
- Almog
On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org> wrote:
Hi Almog,
I had similar thoughts as Lucas. When I read the KIP, I asked myself why
are the windows not specified on number of records instead of time if we
do not care about whether the event time of the records is in the time
range of the window?
In your motivation, you write that users might collect small batches of
records to be passed to a consumer that can handle batched messages more
effectively than individual messages. Wouldn't it make more sense to
have similarly sized batches?
You could also consider to do something like the Kafka producer that has
a batch size and a linger time.
Best,
Bruno
On 28.01.25 15:44, Lucas Brutschy wrote:
Hi Almog,
this seems useful to me. I don't see anything wrong with the details
of the proposal.
More generally, I'd like to hear your thoughts on this vs. batching.
Given the motivation section, it sounds we actually want something
that I'd call "batching" rather than "windowing". If you do not really
care about the including events that fall into different time windows,
and the order of events does not matter much, why are we specifically
emitting based on stream-time? Would you expect this mechanism to
extend also to emitting batches based on number of records, byte-size
of batch, wall-clock time?
Cheers,
Lucas
On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <mj...@apache.org>
wrote:
Thanks, Almog.
Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware and
was actually re-reading my previous email before sending it a few times
to make sure I use the right one; it's very subtle.)
For `TimeWindows` semantics are certainly well defined, and there is
nothing to be discussed.
For `TimeWindow`, I am not sure as I said, and your interpretation as
"just a container" might be fine, too. I agree to the problem, that if
we add something new, we might just leak it again, and thus not gain
much, so the lesser evil might be to just re-use `TimeWindow` as you
propose. I just wanted to point out this question to sanity check, and
collect feedback about it.
-Matthias
On 1/23/25 8:58 AM, Almog Gavra wrote:
Thanks Matthias for the quick and detailed feedback!
Nit: it seems you are mixing the terms "out-of-order" and "late" and
using them as synonymous, what we usually not do.
M1. Ah, in my mind "late arriving" was after the window closed but
potentially before grace (and "out of order" was just anything that is
out
of order). Do we have a specific word for "after the window closes but
before grace"? Maybe we should have "fashionably late" and "too late"
(haha, just kidding).
I'll clear up the terminology in the KIP.
Today the only way to implement this semantic is using a punctuation
and manually storing events in an aggregation (which is inefficient for
many reasons and can cause severe bottlenecks)
Not sure if I follow this argument? I don't see any relationship to
punctuations. Can you elaborate?
M2. The way we've seen people implement the desired behavior in this
KIP is
to use a non-windowed aggregation on a stream, and then use a
punctuation
every N seconds to scan the table and emit and delete the records to
simulate a window closing. It's certainly suboptimal, but it gets you
very
similar semantics to what I describe in the KIP.
M3. Re: extends Windows<TimeWindow>
(note for other readers, I think you get this Matthias) TimeWindow is
distinct from TimeWindows (with an s at the end). TimeWindows (with an
s)
implies exactly what you suggest. I do think, however, that TimeWindow
(without an s) is a perfect abstraction to leverage here. That class
doesn't do anything except to indicate the bounds of a window (it has a
start time, an end time and a definition of whether two windows
overlap).
The javadoc for TimeWindow (without an s) doesn't even need to change
to be
used in the way this KIP suggests.
As suggested, the StreamTimeWindow still uses time as its bounds - it
just
doesn't use the event time to determine which time window the event is
placed in.
M4. Re: I just realized that `Window` is in an `internal` package but
`TimeWindows#windowsFor` does return `Map<Long, TimeWindow>`
Yeah, I was wondering about that as well. I also agree its a bit
orthogonal
to the KIP and I think if we introduce a new "extends Window" type
we'll
just leak that type as well, so I don't really see a benefit from
that. If
we do decide to tackle the leaky abstraction we might as well just have
TimeWindow to handle :)
M5. Specifying abstract methods
Can do!
M6. Naming
I like "OffsetOrderedWindows", I will wait for some other feedback and
if
no one has something better I'll update the KIP to that.
Re: `extends Window<TimeWindow>` I do think the fact that it defines
the
window size with time is important, we're not saying the window closes
after N offsets have passed but rather the stream time has passed N
seconds
(or ms or whatever). I agree that it doesn't need to be part of the
main
name, however.
On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <mj...@apache.org>
wrote:
Interesting KIP. It's a known problem, and the proposed solution make
sense to me.
Nit: it seems you are mixing the terms "out-of-order" and "late" and
using them as synonymous, what we usually not do.
"Out-of-order" is the more generic term, while "late" means after the
grace period (hence, late arriving data is still out-of-order, but
it's
a subset of out-of-order records... all late records are out-of-order,
but not the other way around). I think the KIP could gain clarity and
avoid potential confusion to use both terms not a synonymous.
In the end, "late" data cannot be handled at all right now, based on
the
definition of "late". "late" data is always dropped.
Today the only way to implement this semantic is using a punctuation
and
manually storing events in an aggregation (which is inefficient for
many
reasons and can cause severe bottlenecks)
Not sure if I follow this argument? I don't see any relationship to
punctuations. Can you elaborate?
It's also not clear to me, why "storing events in an aggregation" is
problematic, and/or how this new window type would avoid it?
extends Windows<TimeWindow>
Not sure if this is the right thing to do it? I understand your
intent,
but to me, `TimeWindow` has certain semantics, implying we only put
data
with record-ts between the window bounds into the window. Not sure if
my
interpretation is off, and re-using could be ok? It's just something
we
should figure out I guess.
To be fair: it's not 100% clearly specific if these semantics apply or
not. If we believe they don't apply, happy to pivot on this, but we
might want to update the JavaDocs, if we only want to use it as
"container class" (for the lack of better word).
What actually raises a different issue: I just realized that `Window`
is
in an `internal` package, but `TimeWindows#windowsFor` does return
`Map<Long, TimeWindow>`, thus making it effectively kinda public... It
seems we should we do something about this? End users do not really
need
to use `TimeWindow` -- it's really internal, and it seems `windowsFor`
is effective a leaky abstraction...
If we keep `TimeWindow` internal, I think we can easily be more
relaxed
with applied semantics. But it's kinda leaking right now, what does
not
make me happy... (Of course, I also don't want to derail and hijack
this
KIP for some related, but orthogonal issue in the API...)
On other nit I am wondering about is, if it might be good to specify
the
abstract methods inherited from `Windows` on the KIP for clarity? Even
they are implicitly well specified it might be beneficial to spell it
out. After all, `StreamTimeWindow` needs to implement there methods.
And of course
Note: I really don't like the name I cam up with
StreamTimeWindows... if
anyone has a better name in mind please suggest one!
I am not a fan of the name either. But not really sure what a better
name could be. In the end, it's more of an "offset ordered window"
rather than a "time window", because it does not really take the
record
timestamps into account to figure out into what window a record goes
into, but it really uses the offset order.
I believe that the size of window itself does not matter too much for
naming, so even if the size is defined in time, I don't think that the
term "time" must be part of the window name. (That's also why I am
wondering if we should go with `extends<TimeWindow>` or add something
new, which we hopefully can keep internal...)
Not sure if we ever intent to make `StreamTimeWindows` overlapping?
Given the described use-cases it seems very unlikely? So effectively
it's a "tumbling window" . Of course, we should not just overload this
term, but there is no `TumblingWindows` in the API -- only the term
Tumbling Window in the docs to describe the special case of
non-overlapping `TimeWindows`.
So maybe `OffsetTumblingWindows` could work? Or
`OffsetOrderedWindows`?
Just spitballing here, to get a discussion going...
-Matthias
On 1/22/25 8:26 PM, Almog Gavra wrote:
Hello!
I'd like to initiate a discussion thread on KIP-1127:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data
This KIP aims to make it easier to specify windowing semantics that
are
more tolerable to late arriving data, particularly with suppression.
Would appreciate any and all feedback.
Cheers,
Almog