Happy to name it BatchWindows. Will give some people time to chime in and then change the name.
- Almog On Tue, Feb 4, 2025 at 11:10 PM Sophie Blee-Goldman <sop...@responsive.dev> wrote: > One minor suggestion: use BatchWindows instead of BatchedWindows. The > version without the "ed" matches up with the established naming pattern and > grammar used by other Windows classes: eg TimeWindows, SessionWindows, > SlidingWindows > > Not a big deal though, won't redact my +1 on the voting thread if you > prefer to keep it as BatchedWindows > > On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com> wrote: > > > Thanks for the discussion everyone! I've updated the Wiki with the > > following changes: > > > > - Renamed to BatchedWindows > > - Add a note in rejected alternatives about more general purpose > > (micro-)batching functionality since the scope of that is much wider. > > > > Since it looks like we've stabilized the discussion I'm going to go ahead > > and open up the vote! Definitely feel free to take another look and leave > > any additional thoughts. > > > > > > On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > > batch window with max N records, > > > >> and then also specifying a BufferConfig.maxRecords() > > > > > > That's actually two different and independent dimensions. "N records" > > > would be the number of records in the window, but `maxRecords` is the > > > number of unique keys/row in the buffer before it's flushed. > > > > > > > > > > > > > the current proposal > > > > that only includes stream time based batches is no worse than > existing > > > > windows which also have that potential indeterminism in this scenario > > > > except for that it's more likely since grace period is always 0 > > > > > > Guess this becomes a philosophical question. For TimeWindows, even if > > > grace=0, it's still deterministic into which window each record goes -- > > > of course, the "cut off point" when we start to drop late records is > > > subject to "noise" due to repartitioning, as stream-time advances "non > > > deterministically". > > > > > > So if we drop different records on a "re-run" on the same input data, > we > > > might still get different windows (as different records would have been > > > dropped). But that is exactly why we have a grace-period to begin with > > > (to avoid it, and to make a re-run deterministic -- I think of a "too > > > short" grace period as a misconfiguration -- or an informed decision to > > > sacrifice determinism for something else)... > > > > > > And as the new window type, by definition, does not need/want a > > > grace-period, IMHO, the new window type would be "worse" (for the lack > > > of a better word...); I don't think its really worse, it just > inherently > > > non-deterministic, and that's fine. > > > > > > Guess we are overall on the same page and share common understanding of > > > the trade-off. So I think we are good :) > > > > > > > > > -Matthias > > > > > > > > > On 1/30/25 9:07 AM, Almog Gavra wrote: > > > > I'm not opposed to "BatchedWindows" - I think I like that the most so > > > far. > > > > I'll let that sit on the discussion thread for a while, and change > the > > > KIP > > > > to match if no concerns. > > > > > > > >> What I don't understand is, why the relationship to > > > > suppress()/emitStrategy() is relevant? Can you elaborate a little > bit? > > > > > > > > The existing proposal has no impact on suppression/emitStrategy, but > > I'm > > > > uncertain it's unrelated if you start to introduce window constraints > > > that > > > > aren't just stream time. Imagine having a batch window with max N > > > records, > > > > and then also specifying a BufferConfig.maxRecords() suppression... > > what > > > > happens in that case? With the current scope of the KIP, we don't > need > > to > > > > worry about any of that so it's pretty well contained. > > > > > > > >> if data is auto-repartitioned before a `BatchWindows` step, > > > > repartitioning introduces non-deterministic order in the repartition > > > topic > > > > > > > > This is a good point, I did not think about it (my original comment > was > > > > specifically about wall clock time). I guess, though, the current > > > proposal > > > > that only includes stream time based batches is no worse than > existing > > > > windows which also have that potential indeterminism in this scenario > > > > except for that it's more likely since grace period is always 0. > > > > > > > >> I don't see any relationship to EOS or ALOS. Can you explain what > you > > > > mean? > > > > > > > > Hmm, I'm not totally sure what I was thinking... we can ignore that > > part. > > > > > > > > - Almog > > > > > > > > > > > > On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org> > > > wrote: > > > > > > > >> Interesting thoughts. So maybe we could go with `BatchWindows` as a > > > >> name? Again, only spit-balling... > > > >> > > > >> If we really put "(micro-)batching" in the center of this idea, I > > think > > > >> both count-based and time-based (and time could actually be either > > > >> stream-time or wall-clock-time), or any combination of these > > dimensions > > > >> could all make sense. > > > >> > > > >> Having said this: I don't think we need to support all dimensions > > > >> initially, but if we add something like `BatchWindows` we can extent > > the > > > >> supported specification incrementally. > > > >> > > > >> > > > >> > > > >> What I don't understand is, why the relationship to > > > >> suppress()/emitStrategy() is relevant? Can you elaborate a little > bit? > > > >> > > > >> > > > >> > > > >>> how to make it deterministic with regards to time > > > >> > > > >> I think, in any case, we are leaving determinism-land (at least to > > some > > > >> extent). I guess, as long as the `BatchWindows` are applied directly > > to > > > >> an input topic, we get determinism for both stream-time size, as > well > > as > > > >> count-size windows (not for wall-clock0time, of course). > > > >> > > > >> However, if data is auto-repartitioned before a `BatchWindows` step, > > > >> repartitioning introduces non-deterministic order in the repartition > > > >> topic due to interleaved writes, and thus, also stream-time based > > > >> windows would become non-deterministic (as there is no grace-period > by > > > >> design to "fix" the non-deterministic order, in contrast to full > > > >> event-time based windows we support so far). > > > >> > > > >> So I don't think that is an actual difference between stream-time or > > > >> count-based `BatchWindows` with regard to determinism. > > > >> > > > >> > > > >> > > > >>> which is important for EOS and even ALOS > > > >> > > > >> I don't see any relationship to EOS or ALOS. Can you explain what > you > > > mean? > > > >> > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> > > > >> On 1/28/25 10:43 AM, Almog Gavra wrote: > > > >>> Thanks for the feedback Lucas and Bruno! > > > >>> > > > >>> L0. "Given the motivation section, it sounds we actually want > > something > > > >>> that I'd call "batching" rather than "windowing"." > > > >>> > > > >>> You are right here, and I think ultimately introducing more > flexible > > > and > > > >>> controlled micro-batching will be useful for Kafka Streams, but > that > > > >>> expands the scope a little more than I'd want. We'd have to rethink > > the > > > >>> implications of suppression as well as emitting and how to make it > > > >>> deterministic with regards to time (which is important for EOS and > > even > > > >>> ALOS). The proposal here is a halfway point that I think is easy > > enough > > > >> to > > > >>> reason about within the existing semantics of Kafka Streams and > gets > > us > > > >> 80% > > > >>> of the benefit. (See the second part below since these questions > are > > > >>> related) > > > >>> > > > >>> L2/B1. "why are we specifically emitting based on stream-time?" / > > > >> "Wouldn't > > > >>> it make more sense to have similarly sized batches?" > > > >>> > > > >>> There are two motivations for this: (a) most of the use cases for > > this > > > >> that > > > >>> we've encountered are still time sensitive in that they don't want > to > > > >> batch > > > >>> N records, but rather batch over N seconds (there may be an uneven > > > >>> distribution of keys so that some keys hit N records very quickly > and > > > >>> others are one and done, we'll never see that key again). (b) this > is > > > >> much > > > >>> easier to implement, and is a step forward, given this fits really > > well > > > >>> into the existing windowing semantics. > > > >>> > > > >>> Ultimately, as you suggest, I'd want to be able to control these > > > >> "batches" > > > >>> with multiple emit strategies. It would be nice to specify a > > condition > > > >> like > > > >>> "emit after N records, or N elapsed seconds, or N bytes > accumulated" > > - > > > >> but > > > >>> again (as mentioned above) modeling this with existing Kafka > Streams > > > >>> semantics/operators is a big stretch and I don't want to expand the > > > scope > > > >>> of this KIP too much for that. > > > >>> > > > >>> Let me know if that makes sense! Basically I view this as a good > > middle > > > >>> ground that doesn't compromise semantics but probably addresses > most > > > real > > > >>> (or near-real) time streaming use cases. > > > >>> > > > >>> - Almog > > > >>> > > > >>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org> > > > >> wrote: > > > >>> > > > >>>> Hi Almog, > > > >>>> > > > >>>> I had similar thoughts as Lucas. When I read the KIP, I asked > myself > > > why > > > >>>> are the windows not specified on number of records instead of time > > if > > > we > > > >>>> do not care about whether the event time of the records is in the > > time > > > >>>> range of the window? > > > >>>> > > > >>>> In your motivation, you write that users might collect small > batches > > > of > > > >>>> records to be passed to a consumer that can handle batched > messages > > > more > > > >>>> effectively than individual messages. Wouldn't it make more sense > to > > > >>>> have similarly sized batches? > > > >>>> You could also consider to do something like the Kafka producer > that > > > has > > > >>>> a batch size and a linger time. > > > >>>> > > > >>>> Best, > > > >>>> Bruno > > > >>>> > > > >>>> On 28.01.25 15:44, Lucas Brutschy wrote: > > > >>>>> Hi Almog, > > > >>>>> > > > >>>>> this seems useful to me. I don't see anything wrong with the > > details > > > >>>>> of the proposal. > > > >>>>> > > > >>>>> More generally, I'd like to hear your thoughts on this vs. > > batching. > > > >>>>> Given the motivation section, it sounds we actually want > something > > > >>>>> that I'd call "batching" rather than "windowing". If you do not > > > really > > > >>>>> care about the including events that fall into different time > > > windows, > > > >>>>> and the order of events does not matter much, why are we > > specifically > > > >>>>> emitting based on stream-time? Would you expect this mechanism to > > > >>>>> extend also to emitting batches based on number of records, > > byte-size > > > >>>>> of batch, wall-clock time? > > > >>>>> > > > >>>>> Cheers, > > > >>>>> Lucas > > > >>>>> > > > >>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax < > mj...@apache.org> > > > >>>> wrote: > > > >>>>>> > > > >>>>>> Thanks, Almog. > > > >>>>>> > > > >>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am > aware > > > and > > > >>>>>> was actually re-reading my previous email before sending it a > few > > > >> times > > > >>>>>> to make sure I use the right one; it's very subtle.) > > > >>>>>> > > > >>>>>> For `TimeWindows` semantics are certainly well defined, and > there > > is > > > >>>>>> nothing to be discussed. > > > >>>>>> > > > >>>>>> For `TimeWindow`, I am not sure as I said, and your > interpretation > > > as > > > >>>>>> "just a container" might be fine, too. I agree to the problem, > > that > > > if > > > >>>>>> we add something new, we might just leak it again, and thus not > > gain > > > >>>>>> much, so the lesser evil might be to just re-use `TimeWindow` as > > you > > > >>>>>> propose. I just wanted to point out this question to sanity > check, > > > and > > > >>>>>> collect feedback about it. > > > >>>>>> > > > >>>>>> > > > >>>>>> -Matthias > > > >>>>>> > > > >>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote: > > > >>>>>>> Thanks Matthias for the quick and detailed feedback! > > > >>>>>>> > > > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and > "late" > > > and > > > >>>>>>> using them as synonymous, what we usually not do. > > > >>>>>>> > > > >>>>>>> M1. Ah, in my mind "late arriving" was after the window closed > > but > > > >>>>>>> potentially before grace (and "out of order" was just anything > > that > > > >> is > > > >>>> out > > > >>>>>>> of order). Do we have a specific word for "after the window > > closes > > > >> but > > > >>>>>>> before grace"? Maybe we should have "fashionably late" and "too > > > late" > > > >>>>>>> (haha, just kidding). > > > >>>>>>> > > > >>>>>>> I'll clear up the terminology in the KIP. > > > >>>>>>> > > > >>>>>>>>> Today the only way to implement this semantic is using a > > > >> punctuation > > > >>>>>>> and manually storing events in an aggregation (which is > > inefficient > > > >> for > > > >>>>>>> many reasons and can cause severe bottlenecks) > > > >>>>>>>> Not sure if I follow this argument? I don't see any > relationship > > > to > > > >>>>>>> punctuations. Can you elaborate? > > > >>>>>>> > > > >>>>>>> M2. The way we've seen people implement the desired behavior in > > > this > > > >>>> KIP is > > > >>>>>>> to use a non-windowed aggregation on a stream, and then use a > > > >>>> punctuation > > > >>>>>>> every N seconds to scan the table and emit and delete the > records > > > to > > > >>>>>>> simulate a window closing. It's certainly suboptimal, but it > gets > > > you > > > >>>> very > > > >>>>>>> similar semantics to what I describe in the KIP. > > > >>>>>>> > > > >>>>>>> M3. Re: extends Windows<TimeWindow> > > > >>>>>>> > > > >>>>>>> (note for other readers, I think you get this Matthias) > > TimeWindow > > > is > > > >>>>>>> distinct from TimeWindows (with an s at the end). TimeWindows > > (with > > > >> an > > > >>>> s) > > > >>>>>>> implies exactly what you suggest. I do think, however, that > > > >> TimeWindow > > > >>>>>>> (without an s) is a perfect abstraction to leverage here. That > > > class > > > >>>>>>> doesn't do anything except to indicate the bounds of a window > (it > > > >> has a > > > >>>>>>> start time, an end time and a definition of whether two windows > > > >>>> overlap). > > > >>>>>>> The javadoc for TimeWindow (without an s) doesn't even need to > > > change > > > >>>> to be > > > >>>>>>> used in the way this KIP suggests. > > > >>>>>>> > > > >>>>>>> As suggested, the StreamTimeWindow still uses time as its > bounds > > - > > > it > > > >>>> just > > > >>>>>>> doesn't use the event time to determine which time window the > > event > > > >> is > > > >>>>>>> placed in. > > > >>>>>>> > > > >>>>>>> M4. Re: I just realized that `Window` is in an `internal` > package > > > but > > > >>>>>>> `TimeWindows#windowsFor` does return `Map<Long, TimeWindow>` > > > >>>>>>> > > > >>>>>>> Yeah, I was wondering about that as well. I also agree its a > bit > > > >>>> orthogonal > > > >>>>>>> to the KIP and I think if we introduce a new "extends Window" > > type > > > >>>> we'll > > > >>>>>>> just leak that type as well, so I don't really see a benefit > from > > > >>>> that. If > > > >>>>>>> we do decide to tackle the leaky abstraction we might as well > > just > > > >> have > > > >>>>>>> TimeWindow to handle :) > > > >>>>>>> > > > >>>>>>> M5. Specifying abstract methods > > > >>>>>>> > > > >>>>>>> Can do! > > > >>>>>>> > > > >>>>>>> M6. Naming > > > >>>>>>> > > > >>>>>>> I like "OffsetOrderedWindows", I will wait for some other > > feedback > > > >> and > > > >>>> if > > > >>>>>>> no one has something better I'll update the KIP to that. > > > >>>>>>> > > > >>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that it > > > defines > > > >>>> the > > > >>>>>>> window size with time is important, we're not saying the window > > > >> closes > > > >>>>>>> after N offsets have passed but rather the stream time has > > passed N > > > >>>> seconds > > > >>>>>>> (or ms or whatever). I agree that it doesn't need to be part of > > the > > > >>>> main > > > >>>>>>> name, however. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax < > > mj...@apache.org > > > > > > > >>>> wrote: > > > >>>>>>> > > > >>>>>>>> Interesting KIP. It's a known problem, and the proposed > solution > > > >> make > > > >>>>>>>> sense to me. > > > >>>>>>>> > > > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and > "late" > > > and > > > >>>>>>>> using them as synonymous, what we usually not do. > > > >>>>>>>> > > > >>>>>>>> "Out-of-order" is the more generic term, while "late" means > > after > > > >> the > > > >>>>>>>> grace period (hence, late arriving data is still out-of-order, > > but > > > >>>> it's > > > >>>>>>>> a subset of out-of-order records... all late records are > > > >> out-of-order, > > > >>>>>>>> but not the other way around). I think the KIP could gain > > clarity > > > >> and > > > >>>>>>>> avoid potential confusion to use both terms not a synonymous. > > > >>>>>>>> > > > >>>>>>>> In the end, "late" data cannot be handled at all right now, > > based > > > on > > > >>>> the > > > >>>>>>>> definition of "late". "late" data is always dropped. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> Today the only way to implement this semantic is using a > > > >> punctuation > > > >>>> and > > > >>>>>>>> manually storing events in an aggregation (which is > inefficient > > > for > > > >>>> many > > > >>>>>>>> reasons and can cause severe bottlenecks) > > > >>>>>>>> > > > >>>>>>>> Not sure if I follow this argument? I don't see any > relationship > > > to > > > >>>>>>>> punctuations. Can you elaborate? > > > >>>>>>>> > > > >>>>>>>> It's also not clear to me, why "storing events in an > > aggregation" > > > is > > > >>>>>>>> problematic, and/or how this new window type would avoid it? > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> extends Windows<TimeWindow> > > > >>>>>>>> > > > >>>>>>>> Not sure if this is the right thing to do it? I understand > your > > > >>>> intent, > > > >>>>>>>> but to me, `TimeWindow` has certain semantics, implying we > only > > > put > > > >>>> data > > > >>>>>>>> with record-ts between the window bounds into the window. Not > > sure > > > >> if > > > >>>> my > > > >>>>>>>> interpretation is off, and re-using could be ok? It's just > > > something > > > >>>> we > > > >>>>>>>> should figure out I guess. > > > >>>>>>>> > > > >>>>>>>> To be fair: it's not 100% clearly specific if these semantics > > > apply > > > >> or > > > >>>>>>>> not. If we believe they don't apply, happy to pivot on this, > but > > > we > > > >>>>>>>> might want to update the JavaDocs, if we only want to use it > as > > > >>>>>>>> "container class" (for the lack of better word). > > > >>>>>>>> > > > >>>>>>>> What actually raises a different issue: I just realized that > > > >> `Window` > > > >>>> is > > > >>>>>>>> in an `internal` package, but `TimeWindows#windowsFor` does > > return > > > >>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda > > > public... > > > >> It > > > >>>>>>>> seems we should we do something about this? End users do not > > > really > > > >>>> need > > > >>>>>>>> to use `TimeWindow` -- it's really internal, and it seems > > > >> `windowsFor` > > > >>>>>>>> is effective a leaky abstraction... > > > >>>>>>>> > > > >>>>>>>> If we keep `TimeWindow` internal, I think we can easily be > more > > > >>>> relaxed > > > >>>>>>>> with applied semantics. But it's kinda leaking right now, what > > > does > > > >>>> not > > > >>>>>>>> make me happy... (Of course, I also don't want to derail and > > > hijack > > > >>>> this > > > >>>>>>>> KIP for some related, but orthogonal issue in the API...) > > > >>>>>>>> > > > >>>>>>>> On other nit I am wondering about is, if it might be good to > > > specify > > > >>>> the > > > >>>>>>>> abstract methods inherited from `Windows` on the KIP for > > clarity? > > > >> Even > > > >>>>>>>> they are implicitly well specified it might be beneficial to > > spell > > > >> it > > > >>>>>>>> out. After all, `StreamTimeWindow` needs to implement there > > > methods. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> And of course > > > >>>>>>>> > > > >>>>>>>>> Note: I really don't like the name I cam up with > > > >>>> StreamTimeWindows... if > > > >>>>>>>> anyone has a better name in mind please suggest one! > > > >>>>>>>> > > > >>>>>>>> I am not a fan of the name either. But not really sure what a > > > better > > > >>>>>>>> name could be. In the end, it's more of an "offset ordered > > window" > > > >>>>>>>> rather than a "time window", because it does not really take > the > > > >>>> record > > > >>>>>>>> timestamps into account to figure out into what window a > record > > > goes > > > >>>>>>>> into, but it really uses the offset order. > > > >>>>>>>> > > > >>>>>>>> I believe that the size of window itself does not matter too > > much > > > >> for > > > >>>>>>>> naming, so even if the size is defined in time, I don't think > > that > > > >> the > > > >>>>>>>> term "time" must be part of the window name. (That's also why > I > > am > > > >>>>>>>> wondering if we should go with `extends<TimeWindow>` or add > > > >> something > > > >>>>>>>> new, which we hopefully can keep internal...) > > > >>>>>>>> > > > >>>>>>>> Not sure if we ever intent to make `StreamTimeWindows` > > > overlapping? > > > >>>>>>>> Given the described use-cases it seems very unlikely? So > > > effectively > > > >>>>>>>> it's a "tumbling window" . Of course, we should not just > > overload > > > >> this > > > >>>>>>>> term, but there is no `TumblingWindows` in the API -- only the > > > term > > > >>>>>>>> Tumbling Window in the docs to describe the special case of > > > >>>>>>>> non-overlapping `TimeWindows`. > > > >>>>>>>> > > > >>>>>>>> So maybe `OffsetTumblingWindows` could work? Or > > > >>>> `OffsetOrderedWindows`? > > > >>>>>>>> Just spitballing here, to get a discussion going... > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -Matthias > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote: > > > >>>>>>>>> Hello! > > > >>>>>>>>> > > > >>>>>>>>> I'd like to initiate a discussion thread on KIP-1127: > > > >>>>>>>>> > > > >>>>>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data > > > >>>>>>>>> > > > >>>>>>>>> This KIP aims to make it easier to specify windowing > semantics > > > that > > > >>>> are > > > >>>>>>>>> more tolerable to late arriving data, particularly with > > > >> suppression. > > > >>>>>>>>> > > > >>>>>>>>> Would appreciate any and all feedback. > > > >>>>>>>>> > > > >>>>>>>>> Cheers, > > > >>>>>>>>> Almog > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> > > > > > > > > > > > > >