Thanks all for the conceptual sanity check. I went ahead and tried
implementing a data-based window type. I've gotten most of the way there
but it was surprisingly difficult. Not really because of any complications
due to the window not being based on timestamps (which is what I was
worried about), but actually just because of all the runner API logic for
turning coders/windowing strategies into runner-safe coders/windowing
strategies.

To define my new window type, I first...
- Defined a new window type `class DataBasedWindow(BoundedWindow)`
- Created a new window coder for it
- Created a new WindowFn to assign data to their DataBasedWindows

I thought this would be enough, but then I ran into the following issues
when running a unit test with the python directrunner (really just the
fn_api_runner)
- The general trigger driver tried to read `window.end` but the window was
just plain bytes
- I thought fn_api_runner wasn't properly decoding windows but turns out it
does decode properly - the window-value coder it uses just uses a
bytescoder for the window
- I figured that the translations phase was turning the windowfn and
windowcoder into "safe" wfn's and coders
- I registered urns for the new windowfn and windowcoder, but a bytes coder
was _still_ being used for the window
- I realized that TransformContext doesn't actually consider just any coder
whose URN is registered as safe. Only ones defined in `common_urns.coders`.
So I had to hackily add it to both that and the python_sdk capabilities set

Did I accidentally veer off the expected path for implementing a new window
type or are new window types just not something that many users do with the
python SDK? Based on the last point, it doesn't actually even seem possible
to.


On Wed, Feb 5, 2025 at 1:01 PM Robert Bradshaw via user <
[email protected]> wrote:

> Interestingly, the very first prototypes of windows were actually called
> buckets, and we thought of applications like geographical grouping and such
> in addition to time-based aggregations. For streaming, however, event time
> in special in the sense that it's required for aggregation and omnipresent,
> and so this is what windows are centred on. But nothing prevents one from
> creating data-dependent windows (that, in batch, may not have anything to
> do with time at all) and using them as secondary keys.
>
> The idea of carrying out-of-band metadata along with elements to
> simplify/re-use transforms is certainly an interesting one; the question is
> always what to do for aggregating operations (including side inputs,
> stateful DoFns, etc. as well as the standard GroupByKey). Both treating
> them as a sub-key (essentially creating parallel pipelines, though one
> could do interesting things with merging) or providing a merging operation
> for this metadata are reasonable alternatives.
>
> - Robert
>
>
>
> On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <[email protected]> wrote:
>
>> This idea makes perfect sense conceptually.
>>
>> Conceptually, a "window" is just a key that has the ability to know if it
>> is "done" so we can stop waiting for more input and emit the aggregation.
>> To do an aggregation over unbounded input, you need to know when to stop
>> waiting for more data. In Beam, the only notion of "done" that we support
>> is event time, so you need to have a maximum timestamp - the one-method
>> interface for BoundedWindow
>> <https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
>> in the Java SDK is correct.
>>
>> In Beam we have a fairly arbitrary separation between the K in KV and the
>> Windowing, and they show up differently in all our APIs and require
>> different sorts of plumbing. This is to make it easier to explain things in
>> terms of event time, but is not essential. The true key of an aggregation
>> is the Key+Window and which data you choose to put in which place is sort
>> of up to you. The invariant is that composites should typically "just work"
>> per-window no matter what windowing the user chooses, similarly to how
>> keyed transforms should "just work" per key no matter what keying the user
>> chooses.
>>
>> Incidentally, in SQL I found it easy to explain as "at least one of the
>> GROUP BY keys must have some notion of completion". In the past, I've
>> occasionally offered the example of an election, where an aggregation per
>> locale has a clear notion of "done" but is not an interval in an event
>> time. I like an aggregation per scrabble game better. But it doesn't sound
>> like you are trying to stream data and then have the aggregation emit when
>> the game concludes so you aren't in such a state. But incidentally also our
>> Nexmark implementations use a merging window function that does something
>> like this, in which it waits for an "auction ended" event. It is a bit of a
>> hack that probably violates the Beam model but seems to work, mostly...
>>
>> In practice, there's no design / implementation / API / protocol for
>> windows with a notion of completeness that is not event time. But IIRC in
>> early Spark Runner (and maybe today?) the checking of window completeness
>> was literally just querying state (because Spark watermarks can't implement
>> Beam) so it could have been any state.
>>
>> Kenn
>>
>> On Fri, Jan 31, 2025 at 12:40 PM Joey Tran <[email protected]>
>> wrote:
>>
>>> I have some use cases where I have some global-ish context I'd like to
>>> partition my pipeline by but that aren't based on time. Does it seem
>>> reasonable to use windowing to encapsulate this kind of global context
>>> anyways?
>>>
>>> Contrived example, imagine I have a workflow for figuring out the
>>> highest scoring word in scrabble based on an input set of letters.
>>>
>>>
>>> --(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
>>>
>>> Now If I want to use this pipeline for multiple input letter sets, I'll
>>> end up mixing together candidate words that come from different letter
>>> sets. I could incorporate some kind of ID for these letter sets (e.g. a
>>> ScrabbleGameID) to partition with later, but then I'll need to propagate
>>> that key everywhere. For example, `EnumerateAllPossibleWords` may do its
>>> own keyed operations internally which then will all need to be able to
>>> accommodate bookkeeping for ScrabbleGameID.
>>>
>>> Generating windows that are actually based on ScrabbleGameID (e.g. one
>>> window per letter set) feels like a nice way to implicitly partition my
>>> pipeline so I don't have to include ScrabbleGameID into transforms that
>>> really don't care about it.
>>>
>>> When looking at windowing functions though, they're all very timestamp
>>> based which made me pause and wonder if I'm abusing the window abstraction
>>> or if timetamp-based windows are just a subset of windows that are just
>>> more highlighted b/c of streaming.
>>>
>>> (sorry hope this makes sense and is not just a ramble)
>>>
>>

Reply via email to