Hi,

I guess such functionality is already there. You could be using state and timers api for than (a bit lower lvl compared to windows). Pseudo example:

class_StatefulDoFn(DoFn):
state_cell=BagStateSpec("state_cell", input_type_coder)
emit_timer_cell=TimerSpec("emit_timer_cell", TimeDomain.WATERMARK)
emit_timer_ts_cell=ReadModifyWriteStateSpec("emit_timer_ts_cell", TimestampCoder())
defprocess(
self,
element: tuple[key_type, input_type], # type: ignore
timestamp: Timestamp =DoFn.TimestampParam,
wn: IntervalWindow =DoFn.WindowParam,
emit_timer: RuntimeTimer =DoFn.TimerParam(emit_timer_cell),
emit_timer_ts: ReadModifyWriteRuntimeState =DoFn.StateParam(emit_timer_ts_cell),
bucket_state: BagRuntimeState =DoFn.StateParam(state_cell),
) -> Iterable[tuple[key_type, out_type]]:
key_value, input_value=element
# add elem to the bucket
bucket_state.add(input_value)
# logic to decide when to emit results from window
# here example to do session windows (wait till there is a gap between events)
# you can wait indefinitely too
current_timer: Timestamp |None=emit_timer_ts.read()
ifnotcurrent_timerortimestamp>current_timer:
emit_timestamp=timestamp+session_gap
emit_timer_ts.write(emit_timestamp)
emit_timer.set(emit_timestamp)
@on_timer(emit_timer_cell)
defemit(
self,
key: key_type =DoFn.KeyParam,
timestamp: Timestamp =DoFn.TimestampParam,
wn: IntervalWindow =DoFn.WindowParam,
emit_timer_ts: ReadModifyWriteRuntimeState =DoFn.StateParam(emit_timer_ts_cell),
bucket_state: BagRuntimeState =DoFn.StateParam(state_cell),
) -> Iterable[tuple[key_type, int]]:
emit_timer_ts.clear()
# aggregating and emitting the result from bucket
iter_state: Iterable[input_type] =bucket_state.read()
# example count aggregation
count=0
forxiniter_state:
count=count+1
yield(key, count)
# optionally clear up the bucket - or keep it if want to keep the state indefinitely
bucket_state.clear()
emit_timer_ts.clear()

For the simple case of counting elements probably better would be to the aggregation on the `process` method and just store the result in `ReadModifyWriteStateSpec`, but in cases if your logic depends on ordering of the elements (mine did) only option is to store all elements in a bucket and order them on `emit`.

Of course there is a case of batch processing that when back-filling all events for same key regardless of their timestamp will land in same bucket (so multiple sessions will be seen as a single session) - but this is I guess what you would like to achieve.

I did try an approach with custom merging windows too, but this yielded to be pretty complex and first do check if your runner supports this at all.

I hope this is helpful for you, and makes sense for your use case. I personally used this patter for kind of event detection where I needed to detect few trivial event patterns (like event of copy followed with paste and emit result only if such pair is detected), and did not have any clear time boundaries there, so could not really use any standard windows.

Best
Wiśniowski Piotr


On 5.02.2025 19:01, Robert Bradshaw via user wrote:
Interestingly, the very first prototypes of windows were actually called buckets, and we thought of applications like geographical grouping and such in addition to time-based aggregations. For streaming, however, event time in special in the sense that it's required for aggregation and omnipresent, and so this is what windows are centred on. But nothing prevents one from creating data-dependent windows (that, in batch, may not have anything to do with time at all) and using them as secondary keys.

The idea of carrying out-of-band metadata along with elements to simplify/re-use transforms is certainly an interesting one; the question is always what to do for aggregating operations (including side inputs, stateful DoFns, etc. as well as the standard GroupByKey). Both treating them as a sub-key (essentially creating parallel pipelines, though one could do interesting things with merging) or providing a merging operation for this metadata are reasonable alternatives.

- Robert



On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <k...@apache.org> wrote:

    This idea makes perfect sense conceptually.

    Conceptually, a "window" is just a key that has the ability to
    know if it is "done" so we can stop waiting for more input and
    emit the aggregation. To do an aggregation over unbounded input,
    you need to know when to stop waiting for more data. In Beam, the
    only notion of "done" that we support is event time, so you need
    to have a maximum timestamp - the one-method interface for
    BoundedWindow
    
<https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
    in the Java SDK is correct.

    In Beam we have a fairly arbitrary separation between the K in KV
    and the Windowing, and they show up differently in all our APIs
    and require different sorts of plumbing. This is to make it easier
    to explain things in terms of event time, but is not essential.
    The true key of an aggregation is the Key+Window and which data
    you choose to put in which place is sort of up to you. The
    invariant is that composites should typically "just work"
    per-window no matter what windowing the user chooses, similarly to
    how keyed transforms should "just work" per key no matter what
    keying the user chooses.

    Incidentally, in SQL I found it easy to explain as "at least one
    of the GROUP BY keys must have some notion of completion". In the
    past, I've occasionally offered the example of an election, where
    an aggregation per locale has a clear notion of "done" but is not
    an interval in an event time. I like an aggregation per scrabble
    game better. But it doesn't sound like you are trying to stream
    data and then have the aggregation emit when the game concludes so
    you aren't in such a state. But incidentally also our Nexmark
    implementations use a merging window function that does something
    like this, in which it waits for an "auction ended" event. It is a
    bit of a hack that probably violates the Beam model but seems to
    work, mostly...

    In practice, there's no design / implementation / API / protocol
    for windows with a notion of completeness that is not event time.
    But IIRC in early Spark Runner (and maybe today?) the checking of
    window completeness was literally just querying state (because
    Spark watermarks can't implement Beam) so it could have been any
    state.

    Kenn

    On Fri, Jan 31, 2025 at 12:40 PM Joey Tran
    <joey.t...@schrodinger.com> wrote:

        I have some use cases where I have some global-ish context I'd
        like to partition my pipeline by but that aren't based on
        time. Does it seem reasonable to use windowing to encapsulate
        this kind of global context anyways?

        Contrived example, imagine I have a workflow for figuring out
        the highest scoring word in scrabble based on an input set of
        letters.

        
--(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)

        Now If I want to use this pipeline for multiple input letter
        sets, I'll end up mixing together candidate words that come
        from different letter sets. I could incorporate some kind of
        ID for these letter sets (e.g. a ScrabbleGameID) to partition
        with later, but then I'll need to propagate that key
        everywhere. For example, `EnumerateAllPossibleWords` may do
        its own keyed operations internally which then will all need
        to be able to accommodate bookkeeping for ScrabbleGameID.

        Generating windows that are actually based on ScrabbleGameID
        (e.g. one window per letter set) feels like a nice way to
        implicitly partition my pipeline so I don't have to include
        ScrabbleGameID into transforms that really don't care about it.

        When looking at windowing functions though, they're all very
        timestamp based which made me pause and wonder if I'm abusing
        the window abstraction or if timetamp-based windows are just a
        subset of windows that are just more highlighted b/c of streaming.

        (sorry hope this makes sense and is not just a ramble)

Reply via email to