One minor suggestion: use BatchWindows instead of BatchedWindows. The version without the "ed" matches up with the established naming pattern and grammar used by other Windows classes: eg TimeWindows, SessionWindows, SlidingWindows
Not a big deal though, won't redact my +1 on the voting thread if you prefer to keep it as BatchedWindows On Tue, Feb 4, 2025 at 10:51 AM Almog Gavra <almog.ga...@gmail.com> wrote: > Thanks for the discussion everyone! I've updated the Wiki with the > following changes: > > - Renamed to BatchedWindows > - Add a note in rejected alternatives about more general purpose > (micro-)batching functionality since the scope of that is much wider. > > Since it looks like we've stabilized the discussion I'm going to go ahead > and open up the vote! Definitely feel free to take another look and leave > any additional thoughts. > > > On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote: > > > > batch window with max N records, > > >> and then also specifying a BufferConfig.maxRecords() > > > > That's actually two different and independent dimensions. "N records" > > would be the number of records in the window, but `maxRecords` is the > > number of unique keys/row in the buffer before it's flushed. > > > > > > > > > the current proposal > > > that only includes stream time based batches is no worse than existing > > > windows which also have that potential indeterminism in this scenario > > > except for that it's more likely since grace period is always 0 > > > > Guess this becomes a philosophical question. For TimeWindows, even if > > grace=0, it's still deterministic into which window each record goes -- > > of course, the "cut off point" when we start to drop late records is > > subject to "noise" due to repartitioning, as stream-time advances "non > > deterministically". > > > > So if we drop different records on a "re-run" on the same input data, we > > might still get different windows (as different records would have been > > dropped). But that is exactly why we have a grace-period to begin with > > (to avoid it, and to make a re-run deterministic -- I think of a "too > > short" grace period as a misconfiguration -- or an informed decision to > > sacrifice determinism for something else)... > > > > And as the new window type, by definition, does not need/want a > > grace-period, IMHO, the new window type would be "worse" (for the lack > > of a better word...); I don't think its really worse, it just inherently > > non-deterministic, and that's fine. > > > > Guess we are overall on the same page and share common understanding of > > the trade-off. So I think we are good :) > > > > > > -Matthias > > > > > > On 1/30/25 9:07 AM, Almog Gavra wrote: > > > I'm not opposed to "BatchedWindows" - I think I like that the most so > > far. > > > I'll let that sit on the discussion thread for a while, and change the > > KIP > > > to match if no concerns. > > > > > >> What I don't understand is, why the relationship to > > > suppress()/emitStrategy() is relevant? Can you elaborate a little bit? > > > > > > The existing proposal has no impact on suppression/emitStrategy, but > I'm > > > uncertain it's unrelated if you start to introduce window constraints > > that > > > aren't just stream time. Imagine having a batch window with max N > > records, > > > and then also specifying a BufferConfig.maxRecords() suppression... > what > > > happens in that case? With the current scope of the KIP, we don't need > to > > > worry about any of that so it's pretty well contained. > > > > > >> if data is auto-repartitioned before a `BatchWindows` step, > > > repartitioning introduces non-deterministic order in the repartition > > topic > > > > > > This is a good point, I did not think about it (my original comment was > > > specifically about wall clock time). I guess, though, the current > > proposal > > > that only includes stream time based batches is no worse than existing > > > windows which also have that potential indeterminism in this scenario > > > except for that it's more likely since grace period is always 0. > > > > > >> I don't see any relationship to EOS or ALOS. Can you explain what you > > > mean? > > > > > > Hmm, I'm not totally sure what I was thinking... we can ignore that > part. > > > > > > - Almog > > > > > > > > > On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > >> Interesting thoughts. So maybe we could go with `BatchWindows` as a > > >> name? Again, only spit-balling... > > >> > > >> If we really put "(micro-)batching" in the center of this idea, I > think > > >> both count-based and time-based (and time could actually be either > > >> stream-time or wall-clock-time), or any combination of these > dimensions > > >> could all make sense. > > >> > > >> Having said this: I don't think we need to support all dimensions > > >> initially, but if we add something like `BatchWindows` we can extent > the > > >> supported specification incrementally. > > >> > > >> > > >> > > >> What I don't understand is, why the relationship to > > >> suppress()/emitStrategy() is relevant? Can you elaborate a little bit? > > >> > > >> > > >> > > >>> how to make it deterministic with regards to time > > >> > > >> I think, in any case, we are leaving determinism-land (at least to > some > > >> extent). I guess, as long as the `BatchWindows` are applied directly > to > > >> an input topic, we get determinism for both stream-time size, as well > as > > >> count-size windows (not for wall-clock0time, of course). > > >> > > >> However, if data is auto-repartitioned before a `BatchWindows` step, > > >> repartitioning introduces non-deterministic order in the repartition > > >> topic due to interleaved writes, and thus, also stream-time based > > >> windows would become non-deterministic (as there is no grace-period by > > >> design to "fix" the non-deterministic order, in contrast to full > > >> event-time based windows we support so far). > > >> > > >> So I don't think that is an actual difference between stream-time or > > >> count-based `BatchWindows` with regard to determinism. > > >> > > >> > > >> > > >>> which is important for EOS and even ALOS > > >> > > >> I don't see any relationship to EOS or ALOS. Can you explain what you > > mean? > > >> > > >> > > >> > > >> -Matthias > > >> > > >> > > >> On 1/28/25 10:43 AM, Almog Gavra wrote: > > >>> Thanks for the feedback Lucas and Bruno! > > >>> > > >>> L0. "Given the motivation section, it sounds we actually want > something > > >>> that I'd call "batching" rather than "windowing"." > > >>> > > >>> You are right here, and I think ultimately introducing more flexible > > and > > >>> controlled micro-batching will be useful for Kafka Streams, but that > > >>> expands the scope a little more than I'd want. We'd have to rethink > the > > >>> implications of suppression as well as emitting and how to make it > > >>> deterministic with regards to time (which is important for EOS and > even > > >>> ALOS). The proposal here is a halfway point that I think is easy > enough > > >> to > > >>> reason about within the existing semantics of Kafka Streams and gets > us > > >> 80% > > >>> of the benefit. (See the second part below since these questions are > > >>> related) > > >>> > > >>> L2/B1. "why are we specifically emitting based on stream-time?" / > > >> "Wouldn't > > >>> it make more sense to have similarly sized batches?" > > >>> > > >>> There are two motivations for this: (a) most of the use cases for > this > > >> that > > >>> we've encountered are still time sensitive in that they don't want to > > >> batch > > >>> N records, but rather batch over N seconds (there may be an uneven > > >>> distribution of keys so that some keys hit N records very quickly and > > >>> others are one and done, we'll never see that key again). (b) this is > > >> much > > >>> easier to implement, and is a step forward, given this fits really > well > > >>> into the existing windowing semantics. > > >>> > > >>> Ultimately, as you suggest, I'd want to be able to control these > > >> "batches" > > >>> with multiple emit strategies. It would be nice to specify a > condition > > >> like > > >>> "emit after N records, or N elapsed seconds, or N bytes accumulated" > - > > >> but > > >>> again (as mentioned above) modeling this with existing Kafka Streams > > >>> semantics/operators is a big stretch and I don't want to expand the > > scope > > >>> of this KIP too much for that. > > >>> > > >>> Let me know if that makes sense! Basically I view this as a good > middle > > >>> ground that doesn't compromise semantics but probably addresses most > > real > > >>> (or near-real) time streaming use cases. > > >>> > > >>> - Almog > > >>> > > >>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org> > > >> wrote: > > >>> > > >>>> Hi Almog, > > >>>> > > >>>> I had similar thoughts as Lucas. When I read the KIP, I asked myself > > why > > >>>> are the windows not specified on number of records instead of time > if > > we > > >>>> do not care about whether the event time of the records is in the > time > > >>>> range of the window? > > >>>> > > >>>> In your motivation, you write that users might collect small batches > > of > > >>>> records to be passed to a consumer that can handle batched messages > > more > > >>>> effectively than individual messages. Wouldn't it make more sense to > > >>>> have similarly sized batches? > > >>>> You could also consider to do something like the Kafka producer that > > has > > >>>> a batch size and a linger time. > > >>>> > > >>>> Best, > > >>>> Bruno > > >>>> > > >>>> On 28.01.25 15:44, Lucas Brutschy wrote: > > >>>>> Hi Almog, > > >>>>> > > >>>>> this seems useful to me. I don't see anything wrong with the > details > > >>>>> of the proposal. > > >>>>> > > >>>>> More generally, I'd like to hear your thoughts on this vs. > batching. > > >>>>> Given the motivation section, it sounds we actually want something > > >>>>> that I'd call "batching" rather than "windowing". If you do not > > really > > >>>>> care about the including events that fall into different time > > windows, > > >>>>> and the order of events does not matter much, why are we > specifically > > >>>>> emitting based on stream-time? Would you expect this mechanism to > > >>>>> extend also to emitting batches based on number of records, > byte-size > > >>>>> of batch, wall-clock time? > > >>>>> > > >>>>> Cheers, > > >>>>> Lucas > > >>>>> > > >>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <mj...@apache.org> > > >>>> wrote: > > >>>>>> > > >>>>>> Thanks, Almog. > > >>>>>> > > >>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware > > and > > >>>>>> was actually re-reading my previous email before sending it a few > > >> times > > >>>>>> to make sure I use the right one; it's very subtle.) > > >>>>>> > > >>>>>> For `TimeWindows` semantics are certainly well defined, and there > is > > >>>>>> nothing to be discussed. > > >>>>>> > > >>>>>> For `TimeWindow`, I am not sure as I said, and your interpretation > > as > > >>>>>> "just a container" might be fine, too. I agree to the problem, > that > > if > > >>>>>> we add something new, we might just leak it again, and thus not > gain > > >>>>>> much, so the lesser evil might be to just re-use `TimeWindow` as > you > > >>>>>> propose. I just wanted to point out this question to sanity check, > > and > > >>>>>> collect feedback about it. > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote: > > >>>>>>> Thanks Matthias for the quick and detailed feedback! > > >>>>>>> > > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" > > and > > >>>>>>> using them as synonymous, what we usually not do. > > >>>>>>> > > >>>>>>> M1. Ah, in my mind "late arriving" was after the window closed > but > > >>>>>>> potentially before grace (and "out of order" was just anything > that > > >> is > > >>>> out > > >>>>>>> of order). Do we have a specific word for "after the window > closes > > >> but > > >>>>>>> before grace"? Maybe we should have "fashionably late" and "too > > late" > > >>>>>>> (haha, just kidding). > > >>>>>>> > > >>>>>>> I'll clear up the terminology in the KIP. > > >>>>>>> > > >>>>>>>>> Today the only way to implement this semantic is using a > > >> punctuation > > >>>>>>> and manually storing events in an aggregation (which is > inefficient > > >> for > > >>>>>>> many reasons and can cause severe bottlenecks) > > >>>>>>>> Not sure if I follow this argument? I don't see any relationship > > to > > >>>>>>> punctuations. Can you elaborate? > > >>>>>>> > > >>>>>>> M2. The way we've seen people implement the desired behavior in > > this > > >>>> KIP is > > >>>>>>> to use a non-windowed aggregation on a stream, and then use a > > >>>> punctuation > > >>>>>>> every N seconds to scan the table and emit and delete the records > > to > > >>>>>>> simulate a window closing. It's certainly suboptimal, but it gets > > you > > >>>> very > > >>>>>>> similar semantics to what I describe in the KIP. > > >>>>>>> > > >>>>>>> M3. Re: extends Windows<TimeWindow> > > >>>>>>> > > >>>>>>> (note for other readers, I think you get this Matthias) > TimeWindow > > is > > >>>>>>> distinct from TimeWindows (with an s at the end). TimeWindows > (with > > >> an > > >>>> s) > > >>>>>>> implies exactly what you suggest. I do think, however, that > > >> TimeWindow > > >>>>>>> (without an s) is a perfect abstraction to leverage here. That > > class > > >>>>>>> doesn't do anything except to indicate the bounds of a window (it > > >> has a > > >>>>>>> start time, an end time and a definition of whether two windows > > >>>> overlap). > > >>>>>>> The javadoc for TimeWindow (without an s) doesn't even need to > > change > > >>>> to be > > >>>>>>> used in the way this KIP suggests. > > >>>>>>> > > >>>>>>> As suggested, the StreamTimeWindow still uses time as its bounds > - > > it > > >>>> just > > >>>>>>> doesn't use the event time to determine which time window the > event > > >> is > > >>>>>>> placed in. > > >>>>>>> > > >>>>>>> M4. Re: I just realized that `Window` is in an `internal` package > > but > > >>>>>>> `TimeWindows#windowsFor` does return `Map<Long, TimeWindow>` > > >>>>>>> > > >>>>>>> Yeah, I was wondering about that as well. I also agree its a bit > > >>>> orthogonal > > >>>>>>> to the KIP and I think if we introduce a new "extends Window" > type > > >>>> we'll > > >>>>>>> just leak that type as well, so I don't really see a benefit from > > >>>> that. If > > >>>>>>> we do decide to tackle the leaky abstraction we might as well > just > > >> have > > >>>>>>> TimeWindow to handle :) > > >>>>>>> > > >>>>>>> M5. Specifying abstract methods > > >>>>>>> > > >>>>>>> Can do! > > >>>>>>> > > >>>>>>> M6. Naming > > >>>>>>> > > >>>>>>> I like "OffsetOrderedWindows", I will wait for some other > feedback > > >> and > > >>>> if > > >>>>>>> no one has something better I'll update the KIP to that. > > >>>>>>> > > >>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that it > > defines > > >>>> the > > >>>>>>> window size with time is important, we're not saying the window > > >> closes > > >>>>>>> after N offsets have passed but rather the stream time has > passed N > > >>>> seconds > > >>>>>>> (or ms or whatever). I agree that it doesn't need to be part of > the > > >>>> main > > >>>>>>> name, however. > > >>>>>>> > > >>>>>>> > > >>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax < > mj...@apache.org > > > > > >>>> wrote: > > >>>>>>> > > >>>>>>>> Interesting KIP. It's a known problem, and the proposed solution > > >> make > > >>>>>>>> sense to me. > > >>>>>>>> > > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" > > and > > >>>>>>>> using them as synonymous, what we usually not do. > > >>>>>>>> > > >>>>>>>> "Out-of-order" is the more generic term, while "late" means > after > > >> the > > >>>>>>>> grace period (hence, late arriving data is still out-of-order, > but > > >>>> it's > > >>>>>>>> a subset of out-of-order records... all late records are > > >> out-of-order, > > >>>>>>>> but not the other way around). I think the KIP could gain > clarity > > >> and > > >>>>>>>> avoid potential confusion to use both terms not a synonymous. > > >>>>>>>> > > >>>>>>>> In the end, "late" data cannot be handled at all right now, > based > > on > > >>>> the > > >>>>>>>> definition of "late". "late" data is always dropped. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> Today the only way to implement this semantic is using a > > >> punctuation > > >>>> and > > >>>>>>>> manually storing events in an aggregation (which is inefficient > > for > > >>>> many > > >>>>>>>> reasons and can cause severe bottlenecks) > > >>>>>>>> > > >>>>>>>> Not sure if I follow this argument? I don't see any relationship > > to > > >>>>>>>> punctuations. Can you elaborate? > > >>>>>>>> > > >>>>>>>> It's also not clear to me, why "storing events in an > aggregation" > > is > > >>>>>>>> problematic, and/or how this new window type would avoid it? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> extends Windows<TimeWindow> > > >>>>>>>> > > >>>>>>>> Not sure if this is the right thing to do it? I understand your > > >>>> intent, > > >>>>>>>> but to me, `TimeWindow` has certain semantics, implying we only > > put > > >>>> data > > >>>>>>>> with record-ts between the window bounds into the window. Not > sure > > >> if > > >>>> my > > >>>>>>>> interpretation is off, and re-using could be ok? It's just > > something > > >>>> we > > >>>>>>>> should figure out I guess. > > >>>>>>>> > > >>>>>>>> To be fair: it's not 100% clearly specific if these semantics > > apply > > >> or > > >>>>>>>> not. If we believe they don't apply, happy to pivot on this, but > > we > > >>>>>>>> might want to update the JavaDocs, if we only want to use it as > > >>>>>>>> "container class" (for the lack of better word). > > >>>>>>>> > > >>>>>>>> What actually raises a different issue: I just realized that > > >> `Window` > > >>>> is > > >>>>>>>> in an `internal` package, but `TimeWindows#windowsFor` does > return > > >>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda > > public... > > >> It > > >>>>>>>> seems we should we do something about this? End users do not > > really > > >>>> need > > >>>>>>>> to use `TimeWindow` -- it's really internal, and it seems > > >> `windowsFor` > > >>>>>>>> is effective a leaky abstraction... > > >>>>>>>> > > >>>>>>>> If we keep `TimeWindow` internal, I think we can easily be more > > >>>> relaxed > > >>>>>>>> with applied semantics. But it's kinda leaking right now, what > > does > > >>>> not > > >>>>>>>> make me happy... (Of course, I also don't want to derail and > > hijack > > >>>> this > > >>>>>>>> KIP for some related, but orthogonal issue in the API...) > > >>>>>>>> > > >>>>>>>> On other nit I am wondering about is, if it might be good to > > specify > > >>>> the > > >>>>>>>> abstract methods inherited from `Windows` on the KIP for > clarity? > > >> Even > > >>>>>>>> they are implicitly well specified it might be beneficial to > spell > > >> it > > >>>>>>>> out. After all, `StreamTimeWindow` needs to implement there > > methods. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> And of course > > >>>>>>>> > > >>>>>>>>> Note: I really don't like the name I cam up with > > >>>> StreamTimeWindows... if > > >>>>>>>> anyone has a better name in mind please suggest one! > > >>>>>>>> > > >>>>>>>> I am not a fan of the name either. But not really sure what a > > better > > >>>>>>>> name could be. In the end, it's more of an "offset ordered > window" > > >>>>>>>> rather than a "time window", because it does not really take the > > >>>> record > > >>>>>>>> timestamps into account to figure out into what window a record > > goes > > >>>>>>>> into, but it really uses the offset order. > > >>>>>>>> > > >>>>>>>> I believe that the size of window itself does not matter too > much > > >> for > > >>>>>>>> naming, so even if the size is defined in time, I don't think > that > > >> the > > >>>>>>>> term "time" must be part of the window name. (That's also why I > am > > >>>>>>>> wondering if we should go with `extends<TimeWindow>` or add > > >> something > > >>>>>>>> new, which we hopefully can keep internal...) > > >>>>>>>> > > >>>>>>>> Not sure if we ever intent to make `StreamTimeWindows` > > overlapping? > > >>>>>>>> Given the described use-cases it seems very unlikely? So > > effectively > > >>>>>>>> it's a "tumbling window" . Of course, we should not just > overload > > >> this > > >>>>>>>> term, but there is no `TumblingWindows` in the API -- only the > > term > > >>>>>>>> Tumbling Window in the docs to describe the special case of > > >>>>>>>> non-overlapping `TimeWindows`. > > >>>>>>>> > > >>>>>>>> So maybe `OffsetTumblingWindows` could work? Or > > >>>> `OffsetOrderedWindows`? > > >>>>>>>> Just spitballing here, to get a discussion going... > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -Matthias > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote: > > >>>>>>>>> Hello! > > >>>>>>>>> > > >>>>>>>>> I'd like to initiate a discussion thread on KIP-1127: > > >>>>>>>>> > > >>>>>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data > > >>>>>>>>> > > >>>>>>>>> This KIP aims to make it easier to specify windowing semantics > > that > > >>>> are > > >>>>>>>>> more tolerable to late arriving data, particularly with > > >> suppression. > > >>>>>>>>> > > >>>>>>>>> Would appreciate any and all feedback. > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> Almog > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > > > > > > >