Sounds good. Thanks for clarifying.
-Matthias
On 2/6/25 3:19 PM, Almog Gavra wrote:
Good call on the backwards compatibility - updated the KIP.
Re: the grace period for BatchWindows, I think zero makes sense (and also
makes implementing things a lot easier). In my mental model, we still drop
late records that come in after the window closes, they just never happen
because we use the current stream time when computing which window it
should be in instead of the event time.
If we do this the implementation is easier because: (a) we don't have to
change the way retention for stored windows work and special case that for
BatchWindows (retention is just window size + 0 grace) and (b) we can
actually just reuse the KStreamWindowAggregateProcessor without changing
anything else.
LMK if that makes sense!
On Thu, Feb 6, 2025 at 12:12 PM Matthias J. Sax <mj...@apache.org> wrote:
Hit "reply" too early. Just re-read the KIP.
For `Windows#windowsFor(...)`, even if not intended to be implement by
users, it's strictly public API. Thus, we cannot just change the method,
but would need to keep the existing method and deprecate it, and add a
new overload with a default impl that calls the exiting one.
For `BatchedWindows#gracePeriod` I am wondering if `return 0;` does make
sense? In the end, zero would still mean "drop late record and not put
them into the window", but that is exactly what we want to do. Thus,
IMHO, the new BatchedWindows don't have / don't need the notion of a
"grace period" to begin with, and we can just throw an
"UnsupportedOperationException" from this method?
The underlying implementation of the "BatchWindowAggregationProcessor"
should never need to call `gracePeriod` anyway. Or do I miss something?
-Matthias
On 2/6/25 12:01 PM, Matthias J. Sax wrote:
BatchWindows works for me.
On 2/6/25 7:34 AM, Almog Gavra wrote:
Happy to name it BatchWindows. Will give some people time to chime in
and
then change the name.
- Almog
On Tue, Feb 4, 2025 at 11:10 PM Sophie Blee-Goldman
<sop...@responsive.dev>
wrote:
One minor suggestion: use BatchWindows instead of BatchedWindows. The
version without the "ed" matches up with the established naming
pattern and
grammar used by other Windows classes: eg TimeWindows, SessionWindows,
SlidingWindows
Not a big deal though, won't redact my +1 on the voting thread if you
prefer to keep it as BatchedWindows
On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com>
wrote:
Thanks for the discussion everyone! I've updated the Wiki with the
following changes:
- Renamed to BatchedWindows
- Add a note in rejected alternatives about more general purpose
(micro-)batching functionality since the scope of that is much wider.
Since it looks like we've stabilized the discussion I'm going to go
ahead
and open up the vote! Definitely feel free to take another look and
leave
any additional thoughts.
On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org>
wrote:
batch window with max N records,
and then also specifying a BufferConfig.maxRecords()
That's actually two different and independent dimensions. "N records"
would be the number of records in the window, but `maxRecords` is the
number of unique keys/row in the buffer before it's flushed.
the current proposal
that only includes stream time based batches is no worse than
existing
windows which also have that potential indeterminism in this
scenario
except for that it's more likely since grace period is always 0
Guess this becomes a philosophical question. For TimeWindows, even if
grace=0, it's still deterministic into which window each record
goes --
of course, the "cut off point" when we start to drop late records is
subject to "noise" due to repartitioning, as stream-time advances
"non
deterministically".
So if we drop different records on a "re-run" on the same input data,
we
might still get different windows (as different records would have
been
dropped). But that is exactly why we have a grace-period to begin
with
(to avoid it, and to make a re-run deterministic -- I think of a "too
short" grace period as a misconfiguration -- or an informed
decision to
sacrifice determinism for something else)...
And as the new window type, by definition, does not need/want a
grace-period, IMHO, the new window type would be "worse" (for the
lack
of a better word...); I don't think its really worse, it just
inherently
non-deterministic, and that's fine.
Guess we are overall on the same page and share common
understanding of
the trade-off. So I think we are good :)
-Matthias
On 1/30/25 9:07 AM, Almog Gavra wrote:
I'm not opposed to "BatchedWindows" - I think I like that the most
so
far.
I'll let that sit on the discussion thread for a while, and change
the
KIP
to match if no concerns.
What I don't understand is, why the relationship to
suppress()/emitStrategy() is relevant? Can you elaborate a little
bit?
The existing proposal has no impact on suppression/emitStrategy, but
I'm
uncertain it's unrelated if you start to introduce window
constraints
that
aren't just stream time. Imagine having a batch window with max N
records,
and then also specifying a BufferConfig.maxRecords() suppression...
what
happens in that case? With the current scope of the KIP, we don't
need
to
worry about any of that so it's pretty well contained.
if data is auto-repartitioned before a `BatchWindows` step,
repartitioning introduces non-deterministic order in the repartition
topic
This is a good point, I did not think about it (my original comment
was
specifically about wall clock time). I guess, though, the current
proposal
that only includes stream time based batches is no worse than
existing
windows which also have that potential indeterminism in this
scenario
except for that it's more likely since grace period is always 0.
I don't see any relationship to EOS or ALOS. Can you explain what
you
mean?
Hmm, I'm not totally sure what I was thinking... we can ignore that
part.
- Almog
On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org>
wrote:
Interesting thoughts. So maybe we could go with `BatchWindows` as a
name? Again, only spit-balling...
If we really put "(micro-)batching" in the center of this idea, I
think
both count-based and time-based (and time could actually be either
stream-time or wall-clock-time), or any combination of these
dimensions
could all make sense.
Having said this: I don't think we need to support all dimensions
initially, but if we add something like `BatchWindows` we can
extent
the
supported specification incrementally.
What I don't understand is, why the relationship to
suppress()/emitStrategy() is relevant? Can you elaborate a little
bit?
how to make it deterministic with regards to time
I think, in any case, we are leaving determinism-land (at least to
some
extent). I guess, as long as the `BatchWindows` are applied
directly
to
an input topic, we get determinism for both stream-time size, as
well
as
count-size windows (not for wall-clock0time, of course).
However, if data is auto-repartitioned before a `BatchWindows`
step,
repartitioning introduces non-deterministic order in the
repartition
topic due to interleaved writes, and thus, also stream-time based
windows would become non-deterministic (as there is no grace-period
by
design to "fix" the non-deterministic order, in contrast to full
event-time based windows we support so far).
So I don't think that is an actual difference between stream-time
or
count-based `BatchWindows` with regard to determinism.
which is important for EOS and even ALOS
I don't see any relationship to EOS or ALOS. Can you explain what
you
mean?
-Matthias
On 1/28/25 10:43 AM, Almog Gavra wrote:
Thanks for the feedback Lucas and Bruno!
L0. "Given the motivation section, it sounds we actually want
something
that I'd call "batching" rather than "windowing"."
You are right here, and I think ultimately introducing more
flexible
and
controlled micro-batching will be useful for Kafka Streams, but
that
expands the scope a little more than I'd want. We'd have to
rethink
the
implications of suppression as well as emitting and how to make it
deterministic with regards to time (which is important for EOS and
even
ALOS). The proposal here is a halfway point that I think is easy
enough
to
reason about within the existing semantics of Kafka Streams and
gets
us
80%
of the benefit. (See the second part below since these questions
are
related)
L2/B1. "why are we specifically emitting based on stream-time?" /
"Wouldn't
it make more sense to have similarly sized batches?"
There are two motivations for this: (a) most of the use cases for
this
that
we've encountered are still time sensitive in that they don't want
to
batch
N records, but rather batch over N seconds (there may be an uneven
distribution of keys so that some keys hit N records very quickly
and
others are one and done, we'll never see that key again). (b) this
is
much
easier to implement, and is a step forward, given this fits really
well
into the existing windowing semantics.
Ultimately, as you suggest, I'd want to be able to control these
"batches"
with multiple emit strategies. It would be nice to specify a
condition
like
"emit after N records, or N elapsed seconds, or N bytes
accumulated"
-
but
again (as mentioned above) modeling this with existing Kafka
Streams
semantics/operators is a big stretch and I don't want to expand
the
scope
of this KIP too much for that.
Let me know if that makes sense! Basically I view this as a good
middle
ground that doesn't compromise semantics but probably addresses
most
real
(or near-real) time streaming use cases.
- Almog
On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org
wrote:
Hi Almog,
I had similar thoughts as Lucas. When I read the KIP, I asked
myself
why
are the windows not specified on number of records instead of
time
if
we
do not care about whether the event time of the records is in the
time
range of the window?
In your motivation, you write that users might collect small
batches
of
records to be passed to a consumer that can handle batched
messages
more
effectively than individual messages. Wouldn't it make more sense
to
have similarly sized batches?
You could also consider to do something like the Kafka producer
that
has
a batch size and a linger time.
Best,
Bruno
On 28.01.25 15:44, Lucas Brutschy wrote:
Hi Almog,
this seems useful to me. I don't see anything wrong with the
details
of the proposal.
More generally, I'd like to hear your thoughts on this vs.
batching.
Given the motivation section, it sounds we actually want
something
that I'd call "batching" rather than "windowing". If you do not
really
care about the including events that fall into different time
windows,
and the order of events does not matter much, why are we
specifically
emitting based on stream-time? Would you expect this mechanism
to
extend also to emitting batches based on number of records,
byte-size
of batch, wall-clock time?
Cheers,
Lucas
On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <
mj...@apache.org>
wrote:
Thanks, Almog.
Good call out about `TimeWindows` vs `TimeWindow` (yes, I am
aware
and
was actually re-reading my previous email before sending it a
few
times
to make sure I use the right one; it's very subtle.)
For `TimeWindows` semantics are certainly well defined, and
there
is
nothing to be discussed.
For `TimeWindow`, I am not sure as I said, and your
interpretation
as
"just a container" might be fine, too. I agree to the problem,
that
if
we add something new, we might just leak it again, and thus not
gain
much, so the lesser evil might be to just re-use `TimeWindow`
as
you
propose. I just wanted to point out this question to sanity
check,
and
collect feedback about it.
-Matthias
On 1/23/25 8:58 AM, Almog Gavra wrote:
Thanks Matthias for the quick and detailed feedback!
Nit: it seems you are mixing the terms "out-of-order" and
"late"
and
using them as synonymous, what we usually not do.
M1. Ah, in my mind "late arriving" was after the window closed
but
potentially before grace (and "out of order" was just anything
that
is
out
of order). Do we have a specific word for "after the window
closes
but
before grace"? Maybe we should have "fashionably late" and
"too
late"
(haha, just kidding).
I'll clear up the terminology in the KIP.
Today the only way to implement this semantic is using a
punctuation
and manually storing events in an aggregation (which is
inefficient
for
many reasons and can cause severe bottlenecks)
Not sure if I follow this argument? I don't see any
relationship
to
punctuations. Can you elaborate?
M2. The way we've seen people implement the desired behavior
in
this
KIP is
to use a non-windowed aggregation on a stream, and then use a
punctuation
every N seconds to scan the table and emit and delete the
records
to
simulate a window closing. It's certainly suboptimal, but it
gets
you
very
similar semantics to what I describe in the KIP.
M3. Re: extends Windows<TimeWindow>
(note for other readers, I think you get this Matthias)
TimeWindow
is
distinct from TimeWindows (with an s at the end). TimeWindows
(with
an
s)
implies exactly what you suggest. I do think, however, that
TimeWindow
(without an s) is a perfect abstraction to leverage here. That
class
doesn't do anything except to indicate the bounds of a window
(it
has a
start time, an end time and a definition of whether two
windows
overlap).
The javadoc for TimeWindow (without an s) doesn't even need to
change
to be
used in the way this KIP suggests.
As suggested, the StreamTimeWindow still uses time as its
bounds
-
it
just
doesn't use the event time to determine which time window the
event
is
placed in.
M4. Re: I just realized that `Window` is in an `internal`
package
but
`TimeWindows#windowsFor` does return `Map<Long, TimeWindow>`
Yeah, I was wondering about that as well. I also agree its a
bit
orthogonal
to the KIP and I think if we introduce a new "extends Window"
type
we'll
just leak that type as well, so I don't really see a benefit
from
that. If
we do decide to tackle the leaky abstraction we might as well
just
have
TimeWindow to handle :)
M5. Specifying abstract methods
Can do!
M6. Naming
I like "OffsetOrderedWindows", I will wait for some other
feedback
and
if
no one has something better I'll update the KIP to that.
Re: `extends Window<TimeWindow>` I do think the fact that it
defines
the
window size with time is important, we're not saying the
window
closes
after N offsets have passed but rather the stream time has
passed N
seconds
(or ms or whatever). I agree that it doesn't need to be part
of
the
main
name, however.
On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <
mj...@apache.org
wrote:
Interesting KIP. It's a known problem, and the proposed
solution
make
sense to me.
Nit: it seems you are mixing the terms "out-of-order" and
"late"
and
using them as synonymous, what we usually not do.
"Out-of-order" is the more generic term, while "late" means
after
the
grace period (hence, late arriving data is still
out-of-order,
but
it's
a subset of out-of-order records... all late records are
out-of-order,
but not the other way around). I think the KIP could gain
clarity
and
avoid potential confusion to use both terms not a synonymous.
In the end, "late" data cannot be handled at all right now,
based
on
the
definition of "late". "late" data is always dropped.
Today the only way to implement this semantic is using a
punctuation
and
manually storing events in an aggregation (which is
inefficient
for
many
reasons and can cause severe bottlenecks)
Not sure if I follow this argument? I don't see any
relationship
to
punctuations. Can you elaborate?
It's also not clear to me, why "storing events in an
aggregation"
is
problematic, and/or how this new window type would avoid it?
extends Windows<TimeWindow>
Not sure if this is the right thing to do it? I understand
your
intent,
but to me, `TimeWindow` has certain semantics, implying we
only
put
data
with record-ts between the window bounds into the window. Not
sure
if
my
interpretation is off, and re-using could be ok? It's just
something
we
should figure out I guess.
To be fair: it's not 100% clearly specific if these semantics
apply
or
not. If we believe they don't apply, happy to pivot on this,
but
we
might want to update the JavaDocs, if we only want to use it
as
"container class" (for the lack of better word).
What actually raises a different issue: I just realized that
`Window`
is
in an `internal` package, but `TimeWindows#windowsFor` does
return
`Map<Long, TimeWindow>`, thus making it effectively kinda
public...
It
seems we should we do something about this? End users do not
really
need
to use `TimeWindow` -- it's really internal, and it seems
`windowsFor`
is effective a leaky abstraction...
If we keep `TimeWindow` internal, I think we can easily be
more
relaxed
with applied semantics. But it's kinda leaking right now,
what
does
not
make me happy... (Of course, I also don't want to derail and
hijack
this
KIP for some related, but orthogonal issue in the API...)
On other nit I am wondering about is, if it might be good to
specify
the
abstract methods inherited from `Windows` on the KIP for
clarity?
Even
they are implicitly well specified it might be beneficial to
spell
it
out. After all, `StreamTimeWindow` needs to implement there
methods.
And of course
Note: I really don't like the name I cam up with
StreamTimeWindows... if
anyone has a better name in mind please suggest one!
I am not a fan of the name either. But not really sure what a
better
name could be. In the end, it's more of an "offset ordered
window"
rather than a "time window", because it does not really take
the
record
timestamps into account to figure out into what window a
record
goes
into, but it really uses the offset order.
I believe that the size of window itself does not matter too
much
for
naming, so even if the size is defined in time, I don't think
that
the
term "time" must be part of the window name. (That's also why
I
am
wondering if we should go with `extends<TimeWindow>` or add
something
new, which we hopefully can keep internal...)
Not sure if we ever intent to make `StreamTimeWindows`
overlapping?
Given the described use-cases it seems very unlikely? So
effectively
it's a "tumbling window" . Of course, we should not just
overload
this
term, but there is no `TumblingWindows` in the API -- only
the
term
Tumbling Window in the docs to describe the special case of
non-overlapping `TimeWindows`.
So maybe `OffsetTumblingWindows` could work? Or
`OffsetOrderedWindows`?
Just spitballing here, to get a discussion going...
-Matthias
On 1/22/25 8:26 PM, Almog Gavra wrote:
Hello!
I'd like to initiate a discussion thread on KIP-1127:
https://cwiki.apache.org/confluence/display/KAFKA/
KIP-1127+Flexible+Windows+for+Late+Arriving+Data
This KIP aims to make it easier to specify windowing
semantics
that
are
more tolerable to late arriving data, particularly with
suppression.
Would appreciate any and all feedback.
Cheers,
Almog