One minor suggestion: use BatchWindows instead of BatchedWindows. The
version without the "ed" matches up with the established naming pattern and
grammar used by other Windows classes: eg TimeWindows, SessionWindows,
SlidingWindows

Not a big deal though, won't redact my +1 on the voting thread if you
prefer to keep it as BatchedWindows

On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com> wrote:

> Thanks for the discussion everyone! I've updated the Wiki with the
> following changes:
>
> - Renamed to BatchedWindows
> - Add a note in rejected alternatives about more general purpose
> (micro-)batching functionality since the scope of that is much wider.
>
> Since it looks like we've stabilized the discussion I'm going to go ahead
> and open up the vote! Definitely feel free to take another look and leave
> any additional thoughts.
>
>
> On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > > batch window with max N records,
> > >> and then also specifying a BufferConfig.maxRecords()
> >
> > That's actually two different and independent dimensions. "N records"
> > would be the number of records in the window, but `maxRecords` is the
> > number of unique keys/row in the buffer before it's flushed.
> >
> >
> >
> > > the current proposal
> > > that only includes stream time based batches is no worse than existing
> > > windows which also have that potential indeterminism in this scenario
> > > except for that it's more likely since grace period is always 0
> >
> > Guess this becomes a philosophical question. For TimeWindows, even if
> > grace=0, it's still deterministic into which window each record goes --
> > of course, the "cut off point" when we start to drop late records is
> > subject to "noise" due to repartitioning, as stream-time advances "non
> > deterministically".
> >
> > So if we drop different records on a "re-run" on the same input data, we
> > might still get different windows (as different records would have been
> > dropped). But that is exactly why we have a grace-period to begin with
> > (to avoid it, and to make a re-run deterministic -- I think of a "too
> > short" grace period as a misconfiguration -- or an informed decision to
> > sacrifice determinism for something else)...
> >
> > And as the new window type, by definition, does not need/want a
> > grace-period, IMHO, the new window type would be "worse" (for the lack
> > of a better word...); I don't think its really worse, it just inherently
> > non-deterministic, and that's fine.
> >
> > Guess we are overall on the same page and share common understanding of
> > the trade-off. So I think we are good :)
> >
> >
> > -Matthias
> >
> >
> > On 1/30/25 9:07 AM, Almog Gavra wrote:
> > > I'm not opposed to "BatchedWindows" - I think I like that the most so
> > far.
> > > I'll let that sit on the discussion thread for a while, and change the
> > KIP
> > > to match if no concerns.
> > >
> > >> What I don't understand is, why the relationship to
> > > suppress()/emitStrategy() is relevant? Can you elaborate a little bit?
> > >
> > > The existing proposal has no impact on suppression/emitStrategy, but
> I'm
> > > uncertain it's unrelated if you start to introduce window constraints
> > that
> > > aren't just stream time. Imagine having a batch window with max N
> > records,
> > > and then also specifying a BufferConfig.maxRecords() suppression...
> what
> > > happens in that case? With the current scope of the KIP, we don't need
> to
> > > worry about any of that so it's pretty well contained.
> > >
> > >> if data is auto-repartitioned before a `BatchWindows` step,
> > > repartitioning introduces non-deterministic order in the repartition
> > topic
> > >
> > > This is a good point, I did not think about it (my original comment was
> > > specifically about wall clock time). I guess, though, the current
> > proposal
> > > that only includes stream time based batches is no worse than existing
> > > windows which also have that potential indeterminism in this scenario
> > > except for that it's more likely since grace period is always 0.
> > >
> > >> I don't see any relationship to EOS or ALOS. Can you explain what you
> > > mean?
> > >
> > > Hmm, I'm not totally sure what I was thinking... we can ignore that
> part.
> > >
> > > - Almog
> > >
> > >
> > > On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Interesting thoughts. So maybe we could go with `BatchWindows` as a
> > >> name? Again, only spit-balling...
> > >>
> > >> If we really put "(micro-)batching" in the center of this idea, I
> think
> > >> both count-based and time-based (and time could actually be either
> > >> stream-time or wall-clock-time), or any combination of these
> dimensions
> > >> could all make sense.
> > >>
> > >> Having said this: I don't think we need to support all dimensions
> > >> initially, but if we add something like `BatchWindows` we can extent
> the
> > >> supported specification incrementally.
> > >>
> > >>
> > >>
> > >> What I don't understand is, why the relationship to
> > >> suppress()/emitStrategy() is relevant? Can you elaborate a little bit?
> > >>
> > >>
> > >>
> > >>> how to make it deterministic with regards to time
> > >>
> > >> I think, in any case, we are leaving determinism-land (at least to
> some
> > >> extent). I guess, as long as the `BatchWindows` are applied directly
> to
> > >> an input topic, we get determinism for both stream-time size, as well
> as
> > >> count-size windows (not for wall-clock0time, of course).
> > >>
> > >> However, if data is auto-repartitioned before a `BatchWindows` step,
> > >> repartitioning introduces non-deterministic order in the repartition
> > >> topic due to interleaved writes, and thus, also stream-time based
> > >> windows would become non-deterministic (as there is no grace-period by
> > >> design to "fix" the non-deterministic order, in contrast to full
> > >> event-time based windows we support so far).
> > >>
> > >> So I don't think that is an actual difference between stream-time or
> > >> count-based `BatchWindows` with regard to determinism.
> > >>
> > >>
> > >>
> > >>> which is important for EOS and even ALOS
> > >>
> > >> I don't see any relationship to EOS or ALOS. Can you explain what you
> > mean?
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 1/28/25 10:43 AM, Almog Gavra wrote:
> > >>> Thanks for the feedback Lucas and Bruno!
> > >>>
> > >>> L0. "Given the motivation section, it sounds we actually want
> something
> > >>> that I'd call "batching" rather than "windowing"."
> > >>>
> > >>> You are right here, and I think ultimately introducing more flexible
> > and
> > >>> controlled micro-batching will be useful for Kafka Streams, but that
> > >>> expands the scope a little more than I'd want. We'd have to rethink
> the
> > >>> implications of suppression as well as emitting and how to make it
> > >>> deterministic with regards to time (which is important for EOS and
> even
> > >>> ALOS). The proposal here is a halfway point that I think is easy
> enough
> > >> to
> > >>> reason about within the existing semantics of Kafka Streams and gets
> us
> > >> 80%
> > >>> of the benefit. (See the second part below since these questions are
> > >>> related)
> > >>>
> > >>> L2/B1. "why are we specifically emitting based on stream-time?" /
> > >> "Wouldn't
> > >>> it make more sense to have similarly sized batches?"
> > >>>
> > >>> There are two motivations for this: (a) most of the use cases for
> this
> > >> that
> > >>> we've encountered are still time sensitive in that they don't want to
> > >> batch
> > >>> N records, but rather batch over N seconds (there may be an uneven
> > >>> distribution of keys so that some keys hit N records very quickly and
> > >>> others are one and done, we'll never see that key again). (b) this is
> > >> much
> > >>> easier to implement, and is a step forward, given this fits really
> well
> > >>> into the existing windowing semantics.
> > >>>
> > >>> Ultimately, as you suggest, I'd want to be able to control these
> > >> "batches"
> > >>> with multiple emit strategies. It would be nice to specify a
> condition
> > >> like
> > >>> "emit after N records, or N elapsed seconds, or N bytes accumulated"
> -
> > >> but
> > >>> again (as mentioned above) modeling this with existing Kafka Streams
> > >>> semantics/operators is a big stretch and I don't want to expand the
> > scope
> > >>> of this KIP too much for that.
> > >>>
> > >>> Let me know if that makes sense! Basically I view this as a good
> middle
> > >>> ground that doesn't compromise semantics but probably addresses most
> > real
> > >>> (or near-real) time streaming use cases.
> > >>>
> > >>> - Almog
> > >>>
> > >>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org>
> > >> wrote:
> > >>>
> > >>>> Hi Almog,
> > >>>>
> > >>>> I had similar thoughts as Lucas. When I read the KIP, I asked myself
> > why
> > >>>> are the windows not specified on number of records instead of time
> if
> > we
> > >>>> do not care about whether the event time of the records is in the
> time
> > >>>> range of the window?
> > >>>>
> > >>>> In your motivation, you write that users might collect small batches
> > of
> > >>>> records to be passed to a consumer that can handle batched messages
> > more
> > >>>> effectively than individual messages. Wouldn't it make more sense to
> > >>>> have similarly sized batches?
> > >>>> You could also consider to do something like the Kafka producer that
> > has
> > >>>> a batch size and a linger time.
> > >>>>
> > >>>> Best,
> > >>>> Bruno
> > >>>>
> > >>>> On 28.01.25 15:44, Lucas Brutschy wrote:
> > >>>>> Hi Almog,
> > >>>>>
> > >>>>> this seems useful to me. I don't see anything wrong with the
> details
> > >>>>> of the proposal.
> > >>>>>
> > >>>>> More generally, I'd like to hear your thoughts on this vs.
> batching.
> > >>>>> Given the motivation section, it sounds we actually want something
> > >>>>> that I'd call "batching" rather than "windowing". If you do not
> > really
> > >>>>> care about the including events that fall into different time
> > windows,
> > >>>>> and the order of events does not matter much, why are we
> specifically
> > >>>>> emitting based on stream-time? Would you expect this mechanism to
> > >>>>> extend also to emitting batches based on number of records,
> byte-size
> > >>>>> of batch, wall-clock time?
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Lucas
> > >>>>>
> > >>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <mj...@apache.org>
> > >>>> wrote:
> > >>>>>>
> > >>>>>> Thanks, Almog.
> > >>>>>>
> > >>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware
> > and
> > >>>>>> was actually re-reading my previous email before sending it a few
> > >> times
> > >>>>>> to make sure I use the right one; it's very subtle.)
> > >>>>>>
> > >>>>>> For `TimeWindows` semantics are certainly well defined, and there
> is
> > >>>>>> nothing to be discussed.
> > >>>>>>
> > >>>>>> For `TimeWindow`, I am not sure as I said, and your interpretation
> > as
> > >>>>>> "just a container" might be fine, too. I agree to the problem,
> that
> > if
> > >>>>>> we add something new, we might just leak it again, and thus not
> gain
> > >>>>>> much, so the lesser evil might be to just re-use `TimeWindow` as
> you
> > >>>>>> propose. I just wanted to point out this question to sanity check,
> > and
> > >>>>>> collect feedback about it.
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote:
> > >>>>>>> Thanks Matthias for the quick and detailed feedback!
> > >>>>>>>
> > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late"
> > and
> > >>>>>>> using them as synonymous, what we usually not do.
> > >>>>>>>
> > >>>>>>> M1. Ah, in my mind "late arriving" was after the window closed
> but
> > >>>>>>> potentially before grace (and "out of order" was just anything
> that
> > >> is
> > >>>> out
> > >>>>>>> of order). Do we have a specific word for "after the window
> closes
> > >> but
> > >>>>>>> before grace"? Maybe we should have "fashionably late" and "too
> > late"
> > >>>>>>> (haha, just kidding).
> > >>>>>>>
> > >>>>>>> I'll clear up the terminology in the KIP.
> > >>>>>>>
> > >>>>>>>>> Today the only way to implement this semantic is using a
> > >> punctuation
> > >>>>>>> and manually storing events in an aggregation (which is
> inefficient
> > >> for
> > >>>>>>> many reasons and can cause severe bottlenecks)
> > >>>>>>>> Not sure if I follow this argument? I don't see any relationship
> > to
> > >>>>>>> punctuations. Can you elaborate?
> > >>>>>>>
> > >>>>>>> M2. The way we've seen people implement the desired behavior in
> > this
> > >>>> KIP is
> > >>>>>>> to use a non-windowed aggregation on a stream, and then use a
> > >>>> punctuation
> > >>>>>>> every N seconds to scan the table and emit and delete the records
> > to
> > >>>>>>> simulate a window closing. It's certainly suboptimal, but it gets
> > you
> > >>>> very
> > >>>>>>> similar semantics to what I describe in the KIP.
> > >>>>>>>
> > >>>>>>> M3. Re: extends Windows<TimeWindow>
> > >>>>>>>
> > >>>>>>> (note for other readers, I think you get this Matthias)
> TimeWindow
> > is
> > >>>>>>> distinct from TimeWindows (with an s at the end). TimeWindows
> (with
> > >> an
> > >>>> s)
> > >>>>>>> implies exactly what you suggest. I do think, however, that
> > >> TimeWindow
> > >>>>>>> (without an s) is a perfect abstraction to leverage here. That
> > class
> > >>>>>>> doesn't do anything except to indicate the bounds of a window (it
> > >> has a
> > >>>>>>> start time, an end time and a definition of whether two windows
> > >>>> overlap).
> > >>>>>>> The javadoc for TimeWindow (without an s) doesn't even need to
> > change
> > >>>> to be
> > >>>>>>> used in the way this KIP suggests.
> > >>>>>>>
> > >>>>>>> As suggested, the StreamTimeWindow still uses time as its bounds
> -
> > it
> > >>>> just
> > >>>>>>> doesn't use the event time to determine which time window the
> event
> > >> is
> > >>>>>>> placed in.
> > >>>>>>>
> > >>>>>>> M4. Re: I just realized that `Window` is in an `internal` package
> > but
> > >>>>>>> `TimeWindows#windowsFor` does return `Map<Long, TimeWindow>`
> > >>>>>>>
> > >>>>>>> Yeah, I was wondering about that as well. I also agree its a bit
> > >>>> orthogonal
> > >>>>>>> to the KIP and I think if we introduce a new "extends Window"
> type
> > >>>> we'll
> > >>>>>>> just leak that type as well, so I don't really see a benefit from
> > >>>> that. If
> > >>>>>>> we do decide to tackle the leaky abstraction we might as well
> just
> > >> have
> > >>>>>>> TimeWindow to handle :)
> > >>>>>>>
> > >>>>>>> M5. Specifying abstract methods
> > >>>>>>>
> > >>>>>>> Can do!
> > >>>>>>>
> > >>>>>>> M6. Naming
> > >>>>>>>
> > >>>>>>> I like "OffsetOrderedWindows", I will wait for some other
> feedback
> > >> and
> > >>>> if
> > >>>>>>> no one has something better I'll update the KIP to that.
> > >>>>>>>
> > >>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that it
> > defines
> > >>>> the
> > >>>>>>> window size with time is important, we're not saying the window
> > >> closes
> > >>>>>>> after N offsets have passed but rather the stream time has
> passed N
> > >>>> seconds
> > >>>>>>> (or ms or whatever). I agree that it doesn't need to be part of
> the
> > >>>> main
> > >>>>>>> name, however.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <
> mj...@apache.org
> > >
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Interesting KIP. It's a known problem, and the proposed solution
> > >> make
> > >>>>>>>> sense to me.
> > >>>>>>>>
> > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late"
> > and
> > >>>>>>>> using them as synonymous, what we usually not do.
> > >>>>>>>>
> > >>>>>>>> "Out-of-order" is the more generic term, while "late" means
> after
> > >> the
> > >>>>>>>> grace period (hence, late arriving data is still out-of-order,
> but
> > >>>> it's
> > >>>>>>>> a subset of out-of-order records... all late records are
> > >> out-of-order,
> > >>>>>>>> but not the other way around). I think the KIP could gain
> clarity
> > >> and
> > >>>>>>>> avoid potential confusion to use both terms not a synonymous.
> > >>>>>>>>
> > >>>>>>>> In the end, "late" data cannot be handled at all right now,
> based
> > on
> > >>>> the
> > >>>>>>>> definition of "late". "late" data is always dropped.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> Today the only way to implement this semantic is using a
> > >> punctuation
> > >>>> and
> > >>>>>>>> manually storing events in an aggregation (which is inefficient
> > for
> > >>>> many
> > >>>>>>>> reasons and can cause severe bottlenecks)
> > >>>>>>>>
> > >>>>>>>> Not sure if I follow this argument? I don't see any relationship
> > to
> > >>>>>>>> punctuations. Can you elaborate?
> > >>>>>>>>
> > >>>>>>>> It's also not clear to me, why "storing events in an
> aggregation"
> > is
> > >>>>>>>> problematic, and/or how this new window type would avoid it?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> extends Windows<TimeWindow>
> > >>>>>>>>
> > >>>>>>>> Not sure if this is the right thing to do it? I understand your
> > >>>> intent,
> > >>>>>>>> but to me, `TimeWindow` has certain semantics, implying we only
> > put
> > >>>> data
> > >>>>>>>> with record-ts between the window bounds into the window. Not
> sure
> > >> if
> > >>>> my
> > >>>>>>>> interpretation is off, and re-using could be ok? It's just
> > something
> > >>>> we
> > >>>>>>>> should figure out I guess.
> > >>>>>>>>
> > >>>>>>>> To be fair: it's not 100% clearly specific if these semantics
> > apply
> > >> or
> > >>>>>>>> not. If we believe they don't apply, happy to pivot on this, but
> > we
> > >>>>>>>> might want to update the JavaDocs, if we only want to use it as
> > >>>>>>>> "container class" (for the lack of better word).
> > >>>>>>>>
> > >>>>>>>> What actually raises a different issue: I just realized that
> > >> `Window`
> > >>>> is
> > >>>>>>>> in an `internal` package, but `TimeWindows#windowsFor` does
> return
> > >>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda
> > public...
> > >> It
> > >>>>>>>> seems we should we do something about this? End users do not
> > really
> > >>>> need
> > >>>>>>>> to use `TimeWindow` -- it's really internal, and it seems
> > >> `windowsFor`
> > >>>>>>>> is effective a leaky abstraction...
> > >>>>>>>>
> > >>>>>>>> If we keep `TimeWindow` internal, I think we can easily be more
> > >>>> relaxed
> > >>>>>>>> with applied semantics. But it's kinda leaking right now, what
> > does
> > >>>> not
> > >>>>>>>> make me happy... (Of course, I also don't want to derail and
> > hijack
> > >>>> this
> > >>>>>>>> KIP for some related, but orthogonal issue in the API...)
> > >>>>>>>>
> > >>>>>>>> On other nit I am wondering about is, if it might be good to
> > specify
> > >>>> the
> > >>>>>>>> abstract methods inherited from `Windows` on the KIP for
> clarity?
> > >> Even
> > >>>>>>>> they are implicitly well specified it might be beneficial to
> spell
> > >> it
> > >>>>>>>> out. After all, `StreamTimeWindow` needs to implement there
> > methods.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> And of course
> > >>>>>>>>
> > >>>>>>>>> Note: I really don't like the name I cam up with
> > >>>> StreamTimeWindows... if
> > >>>>>>>> anyone has a better name in mind please suggest one!
> > >>>>>>>>
> > >>>>>>>> I am not a fan of the name either. But not really sure what a
> > better
> > >>>>>>>> name could be. In the end, it's more of an "offset ordered
> window"
> > >>>>>>>> rather than a "time window", because it does not really take the
> > >>>> record
> > >>>>>>>> timestamps into account to figure out into what window a record
> > goes
> > >>>>>>>> into, but it really uses the offset order.
> > >>>>>>>>
> > >>>>>>>> I believe that the size of window itself does not matter too
> much
> > >> for
> > >>>>>>>> naming, so even if the size is defined in time, I don't think
> that
> > >> the
> > >>>>>>>> term "time" must be part of the window name. (That's also why I
> am
> > >>>>>>>> wondering if we should go with `extends<TimeWindow>` or add
> > >> something
> > >>>>>>>> new, which we hopefully can keep internal...)
> > >>>>>>>>
> > >>>>>>>> Not sure if we ever intent to make `StreamTimeWindows`
> > overlapping?
> > >>>>>>>> Given the described use-cases it seems very unlikely? So
> > effectively
> > >>>>>>>> it's a "tumbling window" . Of course, we should not just
> overload
> > >> this
> > >>>>>>>> term, but there is no `TumblingWindows` in the API -- only the
> > term
> > >>>>>>>> Tumbling Window in the docs to describe the special case of
> > >>>>>>>> non-overlapping `TimeWindows`.
> > >>>>>>>>
> > >>>>>>>> So maybe `OffsetTumblingWindows` could work? Or
> > >>>> `OffsetOrderedWindows`?
> > >>>>>>>> Just spitballing here, to get a discussion going...
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote:
> > >>>>>>>>> Hello!
> > >>>>>>>>>
> > >>>>>>>>> I'd like to initiate a discussion thread on KIP-1127:
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data
> > >>>>>>>>>
> > >>>>>>>>> This KIP aims to make it easier to specify windowing semantics
> > that
> > >>>> are
> > >>>>>>>>> more tolerable to late arriving data, particularly with
> > >> suppression.
> > >>>>>>>>>
> > >>>>>>>>> Would appreciate any and all feedback.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Almog
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to