I'll try my code on a more recent release to see if it works. Thanks for pointing that fix out
On Fri, Feb 7, 2025 at 3:58 PM Joey Tran <[email protected]> wrote: > It is not different! I am using beam version 2.50 and it looks like the > changes associated with that test were in 2.56. Were custom window types > not supported before then? > > On Fri, Feb 7, 2025 at 3:44 PM Robert Bradshaw <[email protected]> > wrote: > >> On Thu, Feb 6, 2025 at 8:39 AM Joey Tran <[email protected]> >> wrote: >> >>> Thanks all for the conceptual sanity check. I went ahead and tried >>> implementing a data-based window type. I've gotten most of the way there >>> but it was surprisingly difficult. Not really because of any complications >>> due to the window not being based on timestamps (which is what I was >>> worried about), but actually just because of all the runner API logic for >>> turning coders/windowing strategies into runner-safe coders/windowing >>> strategies. >>> >>> To define my new window type, I first... >>> - Defined a new window type `class DataBasedWindow(BoundedWindow)` >>> - Created a new window coder for it >>> - Created a new WindowFn to assign data to their DataBasedWindows >>> >>> I thought this would be enough, but then I ran into the following issues >>> when running a unit test with the python directrunner (really just the >>> fn_api_runner) >>> - The general trigger driver tried to read `window.end` but the window >>> was just plain bytes >>> - I thought fn_api_runner wasn't properly decoding windows but turns out >>> it does decode properly - the window-value coder it uses just uses a >>> bytescoder for the window >>> - I figured that the translations phase was turning the windowfn and >>> windowcoder into "safe" wfn's and coders >>> - I registered urns for the new windowfn and windowcoder, but a bytes >>> coder was _still_ being used for the window >>> - I realized that TransformContext doesn't actually consider just any >>> coder whose URN is registered as safe. Only ones defined in >>> `common_urns.coders`. So I had to hackily add it to both that and the >>> python_sdk capabilities set >>> >>> Did I accidentally veer off the expected path for implementing a new >>> window type or are new window types just not something that many users do >>> with the python SDK? Based on the last point, it doesn't actually even seem >>> possible to. >>> >> >> Well, you're definitely off the beaten path, but this should still be >> supported. E.g. >> https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1009 >> was >> made to handle this scenario. >> >> I'm curious how your case is different than >> https://github.com/apache/beam/blob/release-2.60.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1120 >> >> >> >>> >>> >>> On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user < >>> [email protected]> wrote: >>> >>>> Interestingly, the very first prototypes of windows were >>>> actually called buckets, and we thought of applications like geographical >>>> grouping and such in addition to time-based aggregations. For streaming, >>>> however, event time in special in the sense that it's required for >>>> aggregation and omnipresent, and so this is what windows are centred on. >>>> But nothing prevents one from creating data-dependent windows (that, in >>>> batch, may not have anything to do with time at all) and using them as >>>> secondary keys. >>>> >>>> The idea of carrying out-of-band metadata along with elements to >>>> simplify/re-use transforms is certainly an interesting one; the question is >>>> always what to do for aggregating operations (including side inputs, >>>> stateful DoFns, etc. as well as the standard GroupByKey). Both treating >>>> them as a sub-key (essentially creating parallel pipelines, though one >>>> could do interesting things with merging) or providing a merging operation >>>> for this metadata are reasonable alternatives. >>>> >>>> - Robert >>>> >>>> >>>> >>>> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <[email protected]> wrote: >>>> >>>>> This idea makes perfect sense conceptually. >>>>> >>>>> Conceptually, a "window" is just a key that has the ability to know if >>>>> it is "done" so we can stop waiting for more input and emit the >>>>> aggregation. To do an aggregation over unbounded input, you need to know >>>>> when to stop waiting for more data. In Beam, the only notion of "done" >>>>> that >>>>> we support is event time, so you need to have a maximum timestamp - the >>>>> one-method interface for BoundedWindow >>>>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78> >>>>> in the Java SDK is correct. >>>>> >>>>> In Beam we have a fairly arbitrary separation between the K in KV and >>>>> the Windowing, and they show up differently in all our APIs and require >>>>> different sorts of plumbing. This is to make it easier to explain things >>>>> in >>>>> terms of event time, but is not essential. The true key of an aggregation >>>>> is the Key+Window and which data you choose to put in which place is sort >>>>> of up to you. The invariant is that composites should typically "just >>>>> work" >>>>> per-window no matter what windowing the user chooses, similarly to how >>>>> keyed transforms should "just work" per key no matter what keying the user >>>>> chooses. >>>>> >>>>> Incidentally, in SQL I found it easy to explain as "at least one of >>>>> the GROUP BY keys must have some notion of completion". In the past, I've >>>>> occasionally offered the example of an election, where an aggregation per >>>>> locale has a clear notion of "done" but is not an interval in an event >>>>> time. I like an aggregation per scrabble game better. But it doesn't sound >>>>> like you are trying to stream data and then have the aggregation emit when >>>>> the game concludes so you aren't in such a state. But incidentally also >>>>> our >>>>> Nexmark implementations use a merging window function that does something >>>>> like this, in which it waits for an "auction ended" event. It is a bit of >>>>> a >>>>> hack that probably violates the Beam model but seems to work, mostly... >>>>> >>>>> In practice, there's no design / implementation / API / protocol for >>>>> windows with a notion of completeness that is not event time. But IIRC in >>>>> early Spark Runner (and maybe today?) the checking of window completeness >>>>> was literally just querying state (because Spark watermarks can't >>>>> implement >>>>> Beam) so it could have been any state. >>>>> >>>>> Kenn >>>>> >>>>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <[email protected]> >>>>> wrote: >>>>> >>>>>> I have some use cases where I have some global-ish context I'd like >>>>>> to partition my pipeline by but that aren't based on time. Does it seem >>>>>> reasonable to use windowing to encapsulate this kind of global context >>>>>> anyways? >>>>>> >>>>>> Contrived example, imagine I have a workflow for figuring out the >>>>>> highest scoring word in scrabble based on an input set of letters. >>>>>> >>>>>> >>>>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str) >>>>>> >>>>>> Now If I want to use this pipeline for multiple input letter sets, >>>>>> I'll end up mixing together candidate words that come from different >>>>>> letter >>>>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a >>>>>> ScrabbleGameID) to partition with later, but then I'll need to propagate >>>>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its >>>>>> own keyed operations internally which then will all need to be able to >>>>>> accommodate bookkeeping for ScrabbleGameID. >>>>>> >>>>>> Generating windows that are actually based on ScrabbleGameID (e.g. >>>>>> one window per letter set) feels like a nice way to implicitly partition >>>>>> my >>>>>> pipeline so I don't have to include ScrabbleGameID into transforms that >>>>>> really don't care about it. >>>>>> >>>>>> When looking at windowing functions though, they're all very >>>>>> timestamp based which made me pause and wonder if I'm abusing the window >>>>>> abstraction or if timetamp-based windows are just a subset of windows >>>>>> that >>>>>> are just more highlighted b/c of streaming. >>>>>> >>>>>> (sorry hope this makes sense and is not just a ramble) >>>>>> >>>>>
