Hi,
I guess such functionality is already there. You could be using state
and timers api for than (a bit lower lvl compared to windows). Pseudo
example:
class_StatefulDoFn(DoFn):
state_cell=BagStateSpec("state_cell", input_type_coder)
emit_timer_cell=TimerSpec("emit_timer_cell", TimeDomain.WATERMARK)
emit_timer_ts_cell=ReadModifyWriteStateSpec("emit_timer_ts_cell",
TimestampCoder())
defprocess(
self,
element: tuple[key_type, input_type], # type: ignore
timestamp: Timestamp =DoFn.TimestampParam,
wn: IntervalWindow =DoFn.WindowParam,
emit_timer: RuntimeTimer =DoFn.TimerParam(emit_timer_cell),
emit_timer_ts: ReadModifyWriteRuntimeState
=DoFn.StateParam(emit_timer_ts_cell),
bucket_state: BagRuntimeState =DoFn.StateParam(state_cell),
) -> Iterable[tuple[key_type, out_type]]:
key_value, input_value=element
# add elem to the bucket
bucket_state.add(input_value)
# logic to decide when to emit results from window
# here example to do session windows (wait till there is a gap between
events)
# you can wait indefinitely too
current_timer: Timestamp |None=emit_timer_ts.read()
ifnotcurrent_timerortimestamp>current_timer:
emit_timestamp=timestamp+session_gap
emit_timer_ts.write(emit_timestamp)
emit_timer.set(emit_timestamp)
@on_timer(emit_timer_cell)
defemit(
self,
key: key_type =DoFn.KeyParam,
timestamp: Timestamp =DoFn.TimestampParam,
wn: IntervalWindow =DoFn.WindowParam,
emit_timer_ts: ReadModifyWriteRuntimeState
=DoFn.StateParam(emit_timer_ts_cell),
bucket_state: BagRuntimeState =DoFn.StateParam(state_cell),
) -> Iterable[tuple[key_type, int]]:
emit_timer_ts.clear()
# aggregating and emitting the result from bucket
iter_state: Iterable[input_type] =bucket_state.read()
# example count aggregation
count=0
forxiniter_state:
count=count+1
yield(key, count)
# optionally clear up the bucket - or keep it if want to keep the state
indefinitely
bucket_state.clear()
emit_timer_ts.clear()
For the simple case of counting elements probably better would be to the
aggregation on the `process` method and just store the result in
`ReadModifyWriteStateSpec`, but in cases if your logic depends on
ordering of the elements (mine did) only option is to store all elements
in a bucket and order them on `emit`.
Of course there is a case of batch processing that when back-filling all
events for same key regardless of their timestamp will land in same
bucket (so multiple sessions will be seen as a single session) - but
this is I guess what you would like to achieve.
I did try an approach with custom merging windows too, but this yielded
to be pretty complex and first do check if your runner supports this at all.
I hope this is helpful for you, and makes sense for your use case. I
personally used this patter for kind of event detection where I needed
to detect few trivial event patterns (like event of copy followed with
paste and emit result only if such pair is detected), and did not have
any clear time boundaries there, so could not really use any standard
windows.
Best
Wiśniowski Piotr
On 5.02.2025 19:01, Robert Bradshaw via user wrote:
Interestingly, the very first prototypes of windows were
actually called buckets, and we thought of applications like
geographical grouping and such in addition to time-based aggregations.
For streaming, however, event time in special in the sense that it's
required for aggregation and omnipresent, and so this is what windows
are centred on. But nothing prevents one from creating data-dependent
windows (that, in batch, may not have anything to do with time at all)
and using them as secondary keys.
The idea of carrying out-of-band metadata along with elements to
simplify/re-use transforms is certainly an interesting one; the
question is always what to do for aggregating operations (including
side inputs, stateful DoFns, etc. as well as the standard GroupByKey).
Both treating them as a sub-key (essentially creating parallel
pipelines, though one could do interesting things with merging) or
providing a merging operation for this metadata are reasonable
alternatives.
- Robert
On Mon, Feb 3, 2025 at 7:15 AM Kenneth Knowles <k...@apache.org> wrote:
This idea makes perfect sense conceptually.
Conceptually, a "window" is just a key that has the ability to
know if it is "done" so we can stop waiting for more input and
emit the aggregation. To do an aggregation over unbounded input,
you need to know when to stop waiting for more data. In Beam, the
only notion of "done" that we support is event time, so you need
to have a maximum timestamp - the one-method interface for
BoundedWindow
<https://github.com/apache/beam/blob/df13ffe96d6eac081751b7c2771f51dc367a24dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L78>
in the Java SDK is correct.
In Beam we have a fairly arbitrary separation between the K in KV
and the Windowing, and they show up differently in all our APIs
and require different sorts of plumbing. This is to make it easier
to explain things in terms of event time, but is not essential.
The true key of an aggregation is the Key+Window and which data
you choose to put in which place is sort of up to you. The
invariant is that composites should typically "just work"
per-window no matter what windowing the user chooses, similarly to
how keyed transforms should "just work" per key no matter what
keying the user chooses.
Incidentally, in SQL I found it easy to explain as "at least one
of the GROUP BY keys must have some notion of completion". In the
past, I've occasionally offered the example of an election, where
an aggregation per locale has a clear notion of "done" but is not
an interval in an event time. I like an aggregation per scrabble
game better. But it doesn't sound like you are trying to stream
data and then have the aggregation emit when the game concludes so
you aren't in such a state. But incidentally also our Nexmark
implementations use a merging window function that does something
like this, in which it waits for an "auction ended" event. It is a
bit of a hack that probably violates the Beam model but seems to
work, mostly...
In practice, there's no design / implementation / API / protocol
for windows with a notion of completeness that is not event time.
But IIRC in early Spark Runner (and maybe today?) the checking of
window completeness was literally just querying state (because
Spark watermarks can't implement Beam) so it could have been any
state.
Kenn
On Fri, Jan 31, 2025 at 12:40 PM Joey Tran
<joey.t...@schrodinger.com> wrote:
I have some use cases where I have some global-ish context I'd
like to partition my pipeline by but that aren't based on
time. Does it seem reasonable to use windowing to encapsulate
this kind of global context anyways?
Contrived example, imagine I have a workflow for figuring out
the highest scoring word in scrabble based on an input set of
letters.
--(set[str])-->[EnumerateAllPossibleWords]-->(str)-->[KeepTopNWords]-->(str)
Now If I want to use this pipeline for multiple input letter
sets, I'll end up mixing together candidate words that come
from different letter sets. I could incorporate some kind of
ID for these letter sets (e.g. a ScrabbleGameID) to partition
with later, but then I'll need to propagate that key
everywhere. For example, `EnumerateAllPossibleWords` may do
its own keyed operations internally which then will all need
to be able to accommodate bookkeeping for ScrabbleGameID.
Generating windows that are actually based on ScrabbleGameID
(e.g. one window per letter set) feels like a nice way to
implicitly partition my pipeline so I don't have to include
ScrabbleGameID into transforms that really don't care about it.
When looking at windowing functions though, they're all very
timestamp based which made me pause and wonder if I'm abusing
the window abstraction or if timetamp-based windows are just a
subset of windows that are just more highlighted b/c of streaming.
(sorry hope this makes sense and is not just a ramble)