Hi Jan

Thanks for a quick reply.

Doing stateful transformation requires re-writing the same logic which is
already defined in Flink by itself. Let's consider example from my original
message:
There can be out-of-order data -> data should be propagated to next
operator only when watermark crosses out-of-order boundaries -> all records
with 'ts < watermark' should be pre-processed (e.g. sorted)

Stateful function: all records should be stored in state, for each new
record the whole state should be traversed to understand if out-of-order
events can be propagated further. For unioned streams there should be logic
to take min ts for each stream to compare, but info about which records
goes to which stream is already lost. State should be persisted, and this
adds some footprint during checkpoints.
Flink windows handle all these duties under the hood.

So I think Flink Windows (merging one for this particular case) interface
is a perfect fit for such kind of activities when pre-processing should be
done at first place.



пн, 17 июн. 2019 г. в 11:35, Jan Lukavský <je...@seznam.cz>:

> Hi Eugene,
>
> I'd say that what you want essentially is not "sort in windows", because
> (as you mention), you want to emit elements from windows as soon as
> watermark passes some timestamp. Maybe a better approach would be to
> implement this using stateful processing, where you keep a buffer of
> (unsorted) inputs and setup a timer for minimal time of elements in the
> buffer (plus allowed lateness), and the sort elements with timestamp <=
> the timer (very ofter single elements). I'm actually working on this for
> Apache Beam (design doc [1]), but this is still a work-in-progress.
>
> Another drawback is that something like "sorted map state" will probably
> be needed in order to efficiently query the state for minimal timestamp.
> A less efficient implementation might work with ListState as well.
>
> Jan
>
> [1]
>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
>
> On 6/14/19 3:58 PM, Евгений Юшин wrote:
> > Hi folks
> >
> > I want to sort stream based on event time field derived from events. To
> do
> > this I can use one of the existing windows like TimeWindow to collect
> > events in a window of a particular size, or SlidingWindow to run sort
> logic
> > more often (and sort within slide).
> > Ideally, I want to sort events as fast as they pass watermark (with
> > out-of-order ts extractor). None of the current windows allow me to do
> > this. And I think to implement custom merging window similar to
> > SlidingWindow. Each element will be assigned to Window(event_ts,
> > event_ts+1), and then all windows with 'start < watermark' will be
> merged.
> > To implement this I need time service available in
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> > Unfortunately, 'getCurrentProcessingTime'is only there for now.
> >
> > I can pass function to extract timestamp to my new window extractor, but
> in
> > this case logic for calculation min watermark for
> > parallel/unioned/co-joined streams won't simply work.
> >
> > @devs would you mind if I extend WindowAssignerContext with
> >   getCurrentWatermark or the whole time service reference?
> >
> > Would be really glad to hear ypur concerns.
> >
> > Regards,
> > Eugene
> >
>

Reply via email to