It is not different! I am using beam version 2.50 and it looks like the changes associated with that test were in 2.56. Were custom window types not supported before then?
On Fri, Feb 7, 2025 at 3:44 PM Robert Bradshaw <[email protected]> wrote: > On Thu, Feb 6, 2025 at 8:39 AM Joey Tran <[email protected]> > wrote: > >> Thanks all for the conceptual sanity check. I went ahead and tried >> implementing a data-based window type. I've gotten most of the way there >> but it was surprisingly difficult. Not really because of any complications >> due to the window not being based on timestamps (which is what I was >> worried about), but actually just because of all the runner API logic for >> turning coders/windowing strategies into runner-safe coders/windowing >> strategies. >> >> To define my new window type, I first... >> - Defined a new window type `class DataBasedWindow(BoundedWindow)` >> - Created a new window coder for it >> - Created a new WindowFn to assign data to their DataBasedWindows >> >> I thought this would be enough, but then I ran into the following issues >> when running a unit test with the python directrunner (really just the >> fn_api_runner) >> - The general trigger driver tried to read `window.end` but the window >> was just plain bytes >> - I thought fn_api_runner wasn't properly decoding windows but turns out >> it does decode properly - the window-value coder it uses just uses a >> bytescoder for the window >> - I figured that the translations phase was turning the windowfn and >> windowcoder into "safe" wfn's and coders >> - I registered urns for the new windowfn and windowcoder, but a bytes >> coder was _still_ being used for the window >> - I realized that TransformContext doesn't actually consider just any >> coder whose URN is registered as safe. Only ones defined in >> `common_urns.coders`. So I had to hackily add it to both that and the >> python_sdk capabilities set >> >> Did I accidentally veer off the expected path for implementing a new >> window type or are new window types just not something that many users do >> with the python SDK? Based on the last point, it doesn't actually even seem >> possible to. >> > > Well, you're definitely off the beaten path, but this should still be > supported. E.g. > https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1009 > was > made to handle this scenario. > > I'm curious how your case is different than > https://github.com/apache/beam/blob/release-2.60.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1120 > > > >> >> >> On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user < >> [email protected]> wrote: >> >>> Interestingly, the very first prototypes of windows were actually called >>> buckets, and we thought of applications like geographical grouping and such >>> in addition to time-based aggregations. For streaming, however, event time >>> in special in the sense that it's required for aggregation and omnipresent, >>> and so this is what windows are centred on. But nothing prevents one from >>> creating data-dependent windows (that, in batch, may not have anything to >>> do with time at all) and using them as secondary keys. >>> >>> The idea of carrying out-of-band metadata along with elements to >>> simplify/re-use transforms is certainly an interesting one; the question is >>> always what to do for aggregating operations (including side inputs, >>> stateful DoFns, etc. as well as the standard GroupByKey). Both treating >>> them as a sub-key (essentially creating parallel pipelines, though one >>> could do interesting things with merging) or providing a merging operation >>> for this metadata are reasonable alternatives. >>> >>> - Robert >>> >>> >>> >>> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <[email protected]> wrote: >>> >>>> This idea makes perfect sense conceptually. >>>> >>>> Conceptually, a "window" is just a key that has the ability to know if >>>> it is "done" so we can stop waiting for more input and emit the >>>> aggregation. To do an aggregation over unbounded input, you need to know >>>> when to stop waiting for more data. In Beam, the only notion of "done" that >>>> we support is event time, so you need to have a maximum timestamp - the >>>> one-method interface for BoundedWindow >>>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78> >>>> in the Java SDK is correct. >>>> >>>> In Beam we have a fairly arbitrary separation between the K in KV and >>>> the Windowing, and they show up differently in all our APIs and require >>>> different sorts of plumbing. This is to make it easier to explain things in >>>> terms of event time, but is not essential. The true key of an aggregation >>>> is the Key+Window and which data you choose to put in which place is sort >>>> of up to you. The invariant is that composites should typically "just work" >>>> per-window no matter what windowing the user chooses, similarly to how >>>> keyed transforms should "just work" per key no matter what keying the user >>>> chooses. >>>> >>>> Incidentally, in SQL I found it easy to explain as "at least one of >>>> the GROUP BY keys must have some notion of completion". In the past, I've >>>> occasionally offered the example of an election, where an aggregation per >>>> locale has a clear notion of "done" but is not an interval in an event >>>> time. I like an aggregation per scrabble game better. But it doesn't sound >>>> like you are trying to stream data and then have the aggregation emit when >>>> the game concludes so you aren't in such a state. But incidentally also our >>>> Nexmark implementations use a merging window function that does something >>>> like this, in which it waits for an "auction ended" event. It is a bit of a >>>> hack that probably violates the Beam model but seems to work, mostly... >>>> >>>> In practice, there's no design / implementation / API / protocol for >>>> windows with a notion of completeness that is not event time. But IIRC in >>>> early Spark Runner (and maybe today?) the checking of window completeness >>>> was literally just querying state (because Spark watermarks can't implement >>>> Beam) so it could have been any state. >>>> >>>> Kenn >>>> >>>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <[email protected]> >>>> wrote: >>>> >>>>> I have some use cases where I have some global-ish context I'd like to >>>>> partition my pipeline by but that aren't based on time. Does it seem >>>>> reasonable to use windowing to encapsulate this kind of global context >>>>> anyways? >>>>> >>>>> Contrived example, imagine I have a workflow for figuring out the >>>>> highest scoring word in scrabble based on an input set of letters. >>>>> >>>>> >>>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str) >>>>> >>>>> Now If I want to use this pipeline for multiple input letter sets, >>>>> I'll end up mixing together candidate words that come from different >>>>> letter >>>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a >>>>> ScrabbleGameID) to partition with later, but then I'll need to propagate >>>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its >>>>> own keyed operations internally which then will all need to be able to >>>>> accommodate bookkeeping for ScrabbleGameID. >>>>> >>>>> Generating windows that are actually based on ScrabbleGameID (e.g. one >>>>> window per letter set) feels like a nice way to implicitly partition my >>>>> pipeline so I don't have to include ScrabbleGameID into transforms that >>>>> really don't care about it. >>>>> >>>>> When looking at windowing functions though, they're all very timestamp >>>>> based which made me pause and wonder if I'm abusing the window abstraction >>>>> or if timetamp-based windows are just a subset of windows that are just >>>>> more highlighted b/c of streaming. >>>>> >>>>> (sorry hope this makes sense and is not just a ramble) >>>>> >>>>
