I'll try my code on a more recent release to see if it works. Thanks for
pointing that fix out

On Fri, Feb 7, 2025 at 3:58 PM Joey Tran <[email protected]> wrote:

> It is not different! I am using beam version 2.50 and it looks like the
> changes associated with that test were in 2.56. Were custom window types
> not supported before then?
>
> On Fri, Feb 7, 2025 at 3:44 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> On Thu, Feb 6, 2025 at 8:39 AM Joey Tran <[email protected]>
>> wrote:
>>
>>> Thanks all for the conceptual sanity check. I went ahead and tried
>>> implementing a data-based window type. I've gotten most of the way there
>>> but it was surprisingly difficult. Not really because of any complications
>>> due to the window not being based on timestamps (which is what I was
>>> worried about), but actually just because of all the runner API logic for
>>> turning coders/windowing strategies into runner-safe coders/windowing
>>> strategies.
>>>
>>> To define my new window type, I first...
>>> - Defined a new window type `class DataBasedWindow(BoundedWindow)`
>>> - Created a new window coder for it
>>> - Created a new WindowFn to assign data to their DataBasedWindows
>>>
>>> I thought this would be enough, but then I ran into the following issues
>>> when running a unit test with the python directrunner (really just the
>>> fn_api_runner)
>>> - The general trigger driver tried to read `window.end` but the window
>>> was just plain bytes
>>> - I thought fn_api_runner wasn't properly decoding windows but turns out
>>> it does decode properly - the window-value coder it uses just uses a
>>> bytescoder for the window
>>> - I figured that the translations phase was turning the windowfn and
>>> windowcoder into "safe" wfn's and coders
>>> - I registered urns for the new windowfn and windowcoder, but a bytes
>>> coder was _still_ being used for the window
>>> - I realized that TransformContext doesn't actually consider just any
>>> coder whose URN is registered as safe. Only ones defined in
>>> `common_urns.coders`. So I had to hackily add it to both that and the
>>> python_sdk capabilities set
>>>
>>> Did I accidentally veer off the expected path for implementing a new
>>> window type or are new window types just not something that many users do
>>> with the python SDK? Based on the last point, it doesn't actually even seem
>>> possible to.
>>>
>>
>> Well, you're definitely off the beaten path, but this should still be
>> supported. E.g.
>> https://github.com/apache/beam/blob/release-2.51.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1009
>>  was
>> made to handle this scenario.
>>
>> I'm curious how your case is different than
>> https://github.com/apache/beam/blob/release-2.60.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1120
>>
>>
>>
>>>
>>>
>>> On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user <
>>> [email protected]> wrote:
>>>
>>>> Interestingly, the very first prototypes of windows were
>>>> actually called buckets, and we thought of applications like geographical
>>>> grouping and such in addition to time-based aggregations. For streaming,
>>>> however, event time in special in the sense that it's required for
>>>> aggregation and omnipresent, and so this is what windows are centred on.
>>>> But nothing prevents one from creating data-dependent windows (that, in
>>>> batch, may not have anything to do with time at all) and using them as
>>>> secondary keys.
>>>>
>>>> The idea of carrying out-of-band metadata along with elements to
>>>> simplify/re-use transforms is certainly an interesting one; the question is
>>>> always what to do for aggregating operations (including side inputs,
>>>> stateful DoFns, etc. as well as the standard GroupByKey). Both treating
>>>> them as a sub-key (essentially creating parallel pipelines, though one
>>>> could do interesting things with merging) or providing a merging operation
>>>> for this metadata are reasonable alternatives.
>>>>
>>>> - Robert
>>>>
>>>>
>>>>
>>>> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> This idea makes perfect sense conceptually.
>>>>>
>>>>> Conceptually, a "window" is just a key that has the ability to know if
>>>>> it is "done" so we can stop waiting for more input and emit the
>>>>> aggregation. To do an aggregation over unbounded input, you need to know
>>>>> when to stop waiting for more data. In Beam, the only notion of "done" 
>>>>> that
>>>>> we support is event time, so you need to have a maximum timestamp - the
>>>>> one-method interface for BoundedWindow
>>>>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
>>>>> in the Java SDK is correct.
>>>>>
>>>>> In Beam we have a fairly arbitrary separation between the K in KV and
>>>>> the Windowing, and they show up differently in all our APIs and require
>>>>> different sorts of plumbing. This is to make it easier to explain things 
>>>>> in
>>>>> terms of event time, but is not essential. The true key of an aggregation
>>>>> is the Key+Window and which data you choose to put in which place is sort
>>>>> of up to you. The invariant is that composites should typically "just 
>>>>> work"
>>>>> per-window no matter what windowing the user chooses, similarly to how
>>>>> keyed transforms should "just work" per key no matter what keying the user
>>>>> chooses.
>>>>>
>>>>> Incidentally, in SQL I found it easy to explain as "at least one of
>>>>> the GROUP BY keys must have some notion of completion". In the past, I've
>>>>> occasionally offered the example of an election, where an aggregation per
>>>>> locale has a clear notion of "done" but is not an interval in an event
>>>>> time. I like an aggregation per scrabble game better. But it doesn't sound
>>>>> like you are trying to stream data and then have the aggregation emit when
>>>>> the game concludes so you aren't in such a state. But incidentally also 
>>>>> our
>>>>> Nexmark implementations use a merging window function that does something
>>>>> like this, in which it waits for an "auction ended" event. It is a bit of 
>>>>> a
>>>>> hack that probably violates the Beam model but seems to work, mostly...
>>>>>
>>>>> In practice, there's no design / implementation / API / protocol for
>>>>> windows with a notion of completeness that is not event time. But IIRC in
>>>>> early Spark Runner (and maybe today?) the checking of window completeness
>>>>> was literally just querying state (because Spark watermarks can't 
>>>>> implement
>>>>> Beam) so it could have been any state.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I have some use cases where I have some global-ish context I'd like
>>>>>> to partition my pipeline by but that aren't based on time. Does it seem
>>>>>> reasonable to use windowing to encapsulate this kind of global context
>>>>>> anyways?
>>>>>>
>>>>>> Contrived example, imagine I have a workflow for figuring out the
>>>>>> highest scoring word in scrabble based on an input set of letters.
>>>>>>
>>>>>>
>>>>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
>>>>>>
>>>>>> Now If I want to use this pipeline for multiple input letter sets,
>>>>>> I'll end up mixing together candidate words that come from different 
>>>>>> letter
>>>>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a
>>>>>> ScrabbleGameID) to partition with later, but then I'll need to propagate
>>>>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its
>>>>>> own keyed operations internally which then will all need to be able to
>>>>>> accommodate bookkeeping for ScrabbleGameID.
>>>>>>
>>>>>> Generating windows that are actually based on ScrabbleGameID (e.g.
>>>>>> one window per letter set) feels like a nice way to implicitly partition 
>>>>>> my
>>>>>> pipeline so I don't have to include ScrabbleGameID into transforms that
>>>>>> really don't care about it.
>>>>>>
>>>>>> When looking at windowing functions though, they're all very
>>>>>> timestamp based which made me pause and wonder if I'm abusing the window
>>>>>> abstraction or if timetamp-based windows are just a subset of windows 
>>>>>> that
>>>>>> are just more highlighted b/c of streaming.
>>>>>>
>>>>>> (sorry hope this makes sense and is not just a ramble)
>>>>>>
>>>>>

Reply via email to