Thanks for the discussion everyone! I've updated the Wiki with the following changes:
- Renamed to BatchedWindows - Add a note in rejected alternatives about more general purpose (micro-)batching functionality since the scope of that is much wider. Since it looks like we've stabilized the discussion I'm going to go ahead and open up the vote! Definitely feel free to take another look and leave any additional thoughts. On Thu, Jan 30, 2025 at 12:28 PM Matthias J. Sax <mj...@apache.org> wrote: > > batch window with max N records, > >> and then also specifying a BufferConfig.maxRecords() > > That's actually two different and independent dimensions. "N records" > would be the number of records in the window, but `maxRecords` is the > number of unique keys/row in the buffer before it's flushed. > > > > > the current proposal > > that only includes stream time based batches is no worse than existing > > windows which also have that potential indeterminism in this scenario > > except for that it's more likely since grace period is always 0 > > Guess this becomes a philosophical question. For TimeWindows, even if > grace=0, it's still deterministic into which window each record goes -- > of course, the "cut off point" when we start to drop late records is > subject to "noise" due to repartitioning, as stream-time advances "non > deterministically". > > So if we drop different records on a "re-run" on the same input data, we > might still get different windows (as different records would have been > dropped). But that is exactly why we have a grace-period to begin with > (to avoid it, and to make a re-run deterministic -- I think of a "too > short" grace period as a misconfiguration -- or an informed decision to > sacrifice determinism for something else)... > > And as the new window type, by definition, does not need/want a > grace-period, IMHO, the new window type would be "worse" (for the lack > of a better word...); I don't think its really worse, it just inherently > non-deterministic, and that's fine. > > Guess we are overall on the same page and share common understanding of > the trade-off. So I think we are good :) > > > -Matthias > > > On 1/30/25 9:07 AM, Almog Gavra wrote: > > I'm not opposed to "BatchedWindows" - I think I like that the most so > far. > > I'll let that sit on the discussion thread for a while, and change the > KIP > > to match if no concerns. > > > >> What I don't understand is, why the relationship to > > suppress()/emitStrategy() is relevant? Can you elaborate a little bit? > > > > The existing proposal has no impact on suppression/emitStrategy, but I'm > > uncertain it's unrelated if you start to introduce window constraints > that > > aren't just stream time. Imagine having a batch window with max N > records, > > and then also specifying a BufferConfig.maxRecords() suppression... what > > happens in that case? With the current scope of the KIP, we don't need to > > worry about any of that so it's pretty well contained. > > > >> if data is auto-repartitioned before a `BatchWindows` step, > > repartitioning introduces non-deterministic order in the repartition > topic > > > > This is a good point, I did not think about it (my original comment was > > specifically about wall clock time). I guess, though, the current > proposal > > that only includes stream time based batches is no worse than existing > > windows which also have that potential indeterminism in this scenario > > except for that it's more likely since grace period is always 0. > > > >> I don't see any relationship to EOS or ALOS. Can you explain what you > > mean? > > > > Hmm, I'm not totally sure what I was thinking... we can ignore that part. > > > > - Almog > > > > > > On Tue, Jan 28, 2025 at 3:40 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Interesting thoughts. So maybe we could go with `BatchWindows` as a > >> name? Again, only spit-balling... > >> > >> If we really put "(micro-)batching" in the center of this idea, I think > >> both count-based and time-based (and time could actually be either > >> stream-time or wall-clock-time), or any combination of these dimensions > >> could all make sense. > >> > >> Having said this: I don't think we need to support all dimensions > >> initially, but if we add something like `BatchWindows` we can extent the > >> supported specification incrementally. > >> > >> > >> > >> What I don't understand is, why the relationship to > >> suppress()/emitStrategy() is relevant? Can you elaborate a little bit? > >> > >> > >> > >>> how to make it deterministic with regards to time > >> > >> I think, in any case, we are leaving determinism-land (at least to some > >> extent). I guess, as long as the `BatchWindows` are applied directly to > >> an input topic, we get determinism for both stream-time size, as well as > >> count-size windows (not for wall-clock0time, of course). > >> > >> However, if data is auto-repartitioned before a `BatchWindows` step, > >> repartitioning introduces non-deterministic order in the repartition > >> topic due to interleaved writes, and thus, also stream-time based > >> windows would become non-deterministic (as there is no grace-period by > >> design to "fix" the non-deterministic order, in contrast to full > >> event-time based windows we support so far). > >> > >> So I don't think that is an actual difference between stream-time or > >> count-based `BatchWindows` with regard to determinism. > >> > >> > >> > >>> which is important for EOS and even ALOS > >> > >> I don't see any relationship to EOS or ALOS. Can you explain what you > mean? > >> > >> > >> > >> -Matthias > >> > >> > >> On 1/28/25 10:43 AM, Almog Gavra wrote: > >>> Thanks for the feedback Lucas and Bruno! > >>> > >>> L0. "Given the motivation section, it sounds we actually want something > >>> that I'd call "batching" rather than "windowing"." > >>> > >>> You are right here, and I think ultimately introducing more flexible > and > >>> controlled micro-batching will be useful for Kafka Streams, but that > >>> expands the scope a little more than I'd want. We'd have to rethink the > >>> implications of suppression as well as emitting and how to make it > >>> deterministic with regards to time (which is important for EOS and even > >>> ALOS). The proposal here is a halfway point that I think is easy enough > >> to > >>> reason about within the existing semantics of Kafka Streams and gets us > >> 80% > >>> of the benefit. (See the second part below since these questions are > >>> related) > >>> > >>> L2/B1. "why are we specifically emitting based on stream-time?" / > >> "Wouldn't > >>> it make more sense to have similarly sized batches?" > >>> > >>> There are two motivations for this: (a) most of the use cases for this > >> that > >>> we've encountered are still time sensitive in that they don't want to > >> batch > >>> N records, but rather batch over N seconds (there may be an uneven > >>> distribution of keys so that some keys hit N records very quickly and > >>> others are one and done, we'll never see that key again). (b) this is > >> much > >>> easier to implement, and is a step forward, given this fits really well > >>> into the existing windowing semantics. > >>> > >>> Ultimately, as you suggest, I'd want to be able to control these > >> "batches" > >>> with multiple emit strategies. It would be nice to specify a condition > >> like > >>> "emit after N records, or N elapsed seconds, or N bytes accumulated" - > >> but > >>> again (as mentioned above) modeling this with existing Kafka Streams > >>> semantics/operators is a big stretch and I don't want to expand the > scope > >>> of this KIP too much for that. > >>> > >>> Let me know if that makes sense! Basically I view this as a good middle > >>> ground that doesn't compromise semantics but probably addresses most > real > >>> (or near-real) time streaming use cases. > >>> > >>> - Almog > >>> > >>> On Tue, Jan 28, 2025 at 7:15 AM Bruno Cadonna <cado...@apache.org> > >> wrote: > >>> > >>>> Hi Almog, > >>>> > >>>> I had similar thoughts as Lucas. When I read the KIP, I asked myself > why > >>>> are the windows not specified on number of records instead of time if > we > >>>> do not care about whether the event time of the records is in the time > >>>> range of the window? > >>>> > >>>> In your motivation, you write that users might collect small batches > of > >>>> records to be passed to a consumer that can handle batched messages > more > >>>> effectively than individual messages. Wouldn't it make more sense to > >>>> have similarly sized batches? > >>>> You could also consider to do something like the Kafka producer that > has > >>>> a batch size and a linger time. > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 28.01.25 15:44, Lucas Brutschy wrote: > >>>>> Hi Almog, > >>>>> > >>>>> this seems useful to me. I don't see anything wrong with the details > >>>>> of the proposal. > >>>>> > >>>>> More generally, I'd like to hear your thoughts on this vs. batching. > >>>>> Given the motivation section, it sounds we actually want something > >>>>> that I'd call "batching" rather than "windowing". If you do not > really > >>>>> care about the including events that fall into different time > windows, > >>>>> and the order of events does not matter much, why are we specifically > >>>>> emitting based on stream-time? Would you expect this mechanism to > >>>>> extend also to emitting batches based on number of records, byte-size > >>>>> of batch, wall-clock time? > >>>>> > >>>>> Cheers, > >>>>> Lucas > >>>>> > >>>>> On Thu, Jan 23, 2025 at 7:59 PM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>>> > >>>>>> Thanks, Almog. > >>>>>> > >>>>>> Good call out about `TimeWindows` vs `TimeWindow` (yes, I am aware > and > >>>>>> was actually re-reading my previous email before sending it a few > >> times > >>>>>> to make sure I use the right one; it's very subtle.) > >>>>>> > >>>>>> For `TimeWindows` semantics are certainly well defined, and there is > >>>>>> nothing to be discussed. > >>>>>> > >>>>>> For `TimeWindow`, I am not sure as I said, and your interpretation > as > >>>>>> "just a container" might be fine, too. I agree to the problem, that > if > >>>>>> we add something new, we might just leak it again, and thus not gain > >>>>>> much, so the lesser evil might be to just re-use `TimeWindow` as you > >>>>>> propose. I just wanted to point out this question to sanity check, > and > >>>>>> collect feedback about it. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 1/23/25 8:58 AM, Almog Gavra wrote: > >>>>>>> Thanks Matthias for the quick and detailed feedback! > >>>>>>> > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" > and > >>>>>>> using them as synonymous, what we usually not do. > >>>>>>> > >>>>>>> M1. Ah, in my mind "late arriving" was after the window closed but > >>>>>>> potentially before grace (and "out of order" was just anything that > >> is > >>>> out > >>>>>>> of order). Do we have a specific word for "after the window closes > >> but > >>>>>>> before grace"? Maybe we should have "fashionably late" and "too > late" > >>>>>>> (haha, just kidding). > >>>>>>> > >>>>>>> I'll clear up the terminology in the KIP. > >>>>>>> > >>>>>>>>> Today the only way to implement this semantic is using a > >> punctuation > >>>>>>> and manually storing events in an aggregation (which is inefficient > >> for > >>>>>>> many reasons and can cause severe bottlenecks) > >>>>>>>> Not sure if I follow this argument? I don't see any relationship > to > >>>>>>> punctuations. Can you elaborate? > >>>>>>> > >>>>>>> M2. The way we've seen people implement the desired behavior in > this > >>>> KIP is > >>>>>>> to use a non-windowed aggregation on a stream, and then use a > >>>> punctuation > >>>>>>> every N seconds to scan the table and emit and delete the records > to > >>>>>>> simulate a window closing. It's certainly suboptimal, but it gets > you > >>>> very > >>>>>>> similar semantics to what I describe in the KIP. > >>>>>>> > >>>>>>> M3. Re: extends Windows<TimeWindow> > >>>>>>> > >>>>>>> (note for other readers, I think you get this Matthias) TimeWindow > is > >>>>>>> distinct from TimeWindows (with an s at the end). TimeWindows (with > >> an > >>>> s) > >>>>>>> implies exactly what you suggest. I do think, however, that > >> TimeWindow > >>>>>>> (without an s) is a perfect abstraction to leverage here. That > class > >>>>>>> doesn't do anything except to indicate the bounds of a window (it > >> has a > >>>>>>> start time, an end time and a definition of whether two windows > >>>> overlap). > >>>>>>> The javadoc for TimeWindow (without an s) doesn't even need to > change > >>>> to be > >>>>>>> used in the way this KIP suggests. > >>>>>>> > >>>>>>> As suggested, the StreamTimeWindow still uses time as its bounds - > it > >>>> just > >>>>>>> doesn't use the event time to determine which time window the event > >> is > >>>>>>> placed in. > >>>>>>> > >>>>>>> M4. Re: I just realized that `Window` is in an `internal` package > but > >>>>>>> `TimeWindows#windowsFor` does return `Map<Long, TimeWindow>` > >>>>>>> > >>>>>>> Yeah, I was wondering about that as well. I also agree its a bit > >>>> orthogonal > >>>>>>> to the KIP and I think if we introduce a new "extends Window" type > >>>> we'll > >>>>>>> just leak that type as well, so I don't really see a benefit from > >>>> that. If > >>>>>>> we do decide to tackle the leaky abstraction we might as well just > >> have > >>>>>>> TimeWindow to handle :) > >>>>>>> > >>>>>>> M5. Specifying abstract methods > >>>>>>> > >>>>>>> Can do! > >>>>>>> > >>>>>>> M6. Naming > >>>>>>> > >>>>>>> I like "OffsetOrderedWindows", I will wait for some other feedback > >> and > >>>> if > >>>>>>> no one has something better I'll update the KIP to that. > >>>>>>> > >>>>>>> Re: `extends Window<TimeWindow>` I do think the fact that it > defines > >>>> the > >>>>>>> window size with time is important, we're not saying the window > >> closes > >>>>>>> after N offsets have passed but rather the stream time has passed N > >>>> seconds > >>>>>>> (or ms or whatever). I agree that it doesn't need to be part of the > >>>> main > >>>>>>> name, however. > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Jan 23, 2025 at 12:14 AM Matthias J. Sax <mj...@apache.org > > > >>>> wrote: > >>>>>>> > >>>>>>>> Interesting KIP. It's a known problem, and the proposed solution > >> make > >>>>>>>> sense to me. > >>>>>>>> > >>>>>>>> Nit: it seems you are mixing the terms "out-of-order" and "late" > and > >>>>>>>> using them as synonymous, what we usually not do. > >>>>>>>> > >>>>>>>> "Out-of-order" is the more generic term, while "late" means after > >> the > >>>>>>>> grace period (hence, late arriving data is still out-of-order, but > >>>> it's > >>>>>>>> a subset of out-of-order records... all late records are > >> out-of-order, > >>>>>>>> but not the other way around). I think the KIP could gain clarity > >> and > >>>>>>>> avoid potential confusion to use both terms not a synonymous. > >>>>>>>> > >>>>>>>> In the end, "late" data cannot be handled at all right now, based > on > >>>> the > >>>>>>>> definition of "late". "late" data is always dropped. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> Today the only way to implement this semantic is using a > >> punctuation > >>>> and > >>>>>>>> manually storing events in an aggregation (which is inefficient > for > >>>> many > >>>>>>>> reasons and can cause severe bottlenecks) > >>>>>>>> > >>>>>>>> Not sure if I follow this argument? I don't see any relationship > to > >>>>>>>> punctuations. Can you elaborate? > >>>>>>>> > >>>>>>>> It's also not clear to me, why "storing events in an aggregation" > is > >>>>>>>> problematic, and/or how this new window type would avoid it? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>>> extends Windows<TimeWindow> > >>>>>>>> > >>>>>>>> Not sure if this is the right thing to do it? I understand your > >>>> intent, > >>>>>>>> but to me, `TimeWindow` has certain semantics, implying we only > put > >>>> data > >>>>>>>> with record-ts between the window bounds into the window. Not sure > >> if > >>>> my > >>>>>>>> interpretation is off, and re-using could be ok? It's just > something > >>>> we > >>>>>>>> should figure out I guess. > >>>>>>>> > >>>>>>>> To be fair: it's not 100% clearly specific if these semantics > apply > >> or > >>>>>>>> not. If we believe they don't apply, happy to pivot on this, but > we > >>>>>>>> might want to update the JavaDocs, if we only want to use it as > >>>>>>>> "container class" (for the lack of better word). > >>>>>>>> > >>>>>>>> What actually raises a different issue: I just realized that > >> `Window` > >>>> is > >>>>>>>> in an `internal` package, but `TimeWindows#windowsFor` does return > >>>>>>>> `Map<Long, TimeWindow>`, thus making it effectively kinda > public... > >> It > >>>>>>>> seems we should we do something about this? End users do not > really > >>>> need > >>>>>>>> to use `TimeWindow` -- it's really internal, and it seems > >> `windowsFor` > >>>>>>>> is effective a leaky abstraction... > >>>>>>>> > >>>>>>>> If we keep `TimeWindow` internal, I think we can easily be more > >>>> relaxed > >>>>>>>> with applied semantics. But it's kinda leaking right now, what > does > >>>> not > >>>>>>>> make me happy... (Of course, I also don't want to derail and > hijack > >>>> this > >>>>>>>> KIP for some related, but orthogonal issue in the API...) > >>>>>>>> > >>>>>>>> On other nit I am wondering about is, if it might be good to > specify > >>>> the > >>>>>>>> abstract methods inherited from `Windows` on the KIP for clarity? > >> Even > >>>>>>>> they are implicitly well specified it might be beneficial to spell > >> it > >>>>>>>> out. After all, `StreamTimeWindow` needs to implement there > methods. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> And of course > >>>>>>>> > >>>>>>>>> Note: I really don't like the name I cam up with > >>>> StreamTimeWindows... if > >>>>>>>> anyone has a better name in mind please suggest one! > >>>>>>>> > >>>>>>>> I am not a fan of the name either. But not really sure what a > better > >>>>>>>> name could be. In the end, it's more of an "offset ordered window" > >>>>>>>> rather than a "time window", because it does not really take the > >>>> record > >>>>>>>> timestamps into account to figure out into what window a record > goes > >>>>>>>> into, but it really uses the offset order. > >>>>>>>> > >>>>>>>> I believe that the size of window itself does not matter too much > >> for > >>>>>>>> naming, so even if the size is defined in time, I don't think that > >> the > >>>>>>>> term "time" must be part of the window name. (That's also why I am > >>>>>>>> wondering if we should go with `extends<TimeWindow>` or add > >> something > >>>>>>>> new, which we hopefully can keep internal...) > >>>>>>>> > >>>>>>>> Not sure if we ever intent to make `StreamTimeWindows` > overlapping? > >>>>>>>> Given the described use-cases it seems very unlikely? So > effectively > >>>>>>>> it's a "tumbling window" . Of course, we should not just overload > >> this > >>>>>>>> term, but there is no `TumblingWindows` in the API -- only the > term > >>>>>>>> Tumbling Window in the docs to describe the special case of > >>>>>>>> non-overlapping `TimeWindows`. > >>>>>>>> > >>>>>>>> So maybe `OffsetTumblingWindows` could work? Or > >>>> `OffsetOrderedWindows`? > >>>>>>>> Just spitballing here, to get a discussion going... > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 1/22/25 8:26 PM, Almog Gavra wrote: > >>>>>>>>> Hello! > >>>>>>>>> > >>>>>>>>> I'd like to initiate a discussion thread on KIP-1127: > >>>>>>>>> > >>>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1127+Flexible+Windows+for+Late+Arriving+Data > >>>>>>>>> > >>>>>>>>> This KIP aims to make it easier to specify windowing semantics > that > >>>> are > >>>>>>>>> more tolerable to late arriving data, particularly with > >> suppression. > >>>>>>>>> > >>>>>>>>> Would appreciate any and all feedback. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Almog > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > >> > > > >