Hi Eugene,
I'd say that what you want essentially is not "sort in windows", because
(as you mention), you want to emit elements from windows as soon as
watermark passes some timestamp. Maybe a better approach would be to
implement this using stateful processing, where you keep a buffer of
(unsorted) inputs and setup a timer for minimal time of elements in the
buffer (plus allowed lateness), and the sort elements with timestamp <=
the timer (very ofter single elements). I'm actually working on this for
Apache Beam (design doc [1]), but this is still a work-in-progress.
Another drawback is that something like "sorted map state" will probably
be needed in order to efficiently query the state for minimal timestamp.
A less efficient implementation might work with ListState as well.
Jan
[1]
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
On 6/14/19 3:58 PM, Евгений Юшин wrote:
Hi folks
I want to sort stream based on event time field derived from events. To do
this I can use one of the existing windows like TimeWindow to collect
events in a window of a particular size, or SlidingWindow to run sort logic
more often (and sort within slide).
Ideally, I want to sort events as fast as they pass watermark (with
out-of-order ts extractor). None of the current windows allow me to do
this. And I think to implement custom merging window similar to
SlidingWindow. Each element will be assigned to Window(event_ts,
event_ts+1), and then all windows with 'start < watermark' will be merged.
To implement this I need time service available in
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
Unfortunately, 'getCurrentProcessingTime'is only there for now.
I can pass function to extract timestamp to my new window extractor, but in
this case logic for calculation min watermark for
parallel/unioned/co-joined streams won't simply work.
@devs would you mind if I extend WindowAssignerContext with
getCurrentWatermark or the whole time service reference?
Would be really glad to hear ypur concerns.
Regards,
Eugene