Thanks Aljoscha,

The elements can certainly be configured to have the window size and slide as 
part of them and that will not change for the same elements.   There may a 
different class of elements with a different window or slide but those values 
will be essentially final for that class of elements.


Paul

________________________________
From: Aljoscha Krettek <aljos...@apache.org>
Sent: Wednesday, August 31, 2016 10:53:17 AM
To: user@flink.apache.org
Cc: Chad Conkright
Subject: Re: Setting EventTime window width using stream data

Just checking, all the elements that would fall into a window of length X also 
have X as a property? In that case you should be able to do something like this:

public Collection<TimeWindow> assignWindows(PojoType element, long timestamp, 
WindowAssignerContext context) {
    long size = element.windowSize;
    long slide = element.windowSlide;

    if (timestamp > Long.MIN_VALUE) {
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, 
slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no 
timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you 
forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

this is basically a copy of SlidingEventTimeWindows where assignWindows is 
changed for the above and where size/slide are not stored in the assigner but 
are read from the object. This only works if all elements of a key that should 
fall into the same windows have the same size/slide property, otherwise they 
would spawn different windows.

Cheers,
Aljoscha

On Tue, 30 Aug 2016 at 21:28 Paul Joireman 
<paul.joire...@physiq.com<mailto:paul.joire...@physiq.com>> wrote:

Hi all,


Is it possible to dynamically set the size/width of a SlidingEventTimeWindow 
based on a data coming from the stream?   Our use case is as follows.   We 
create a

stream sourced from external system and coming in as a JSON string which is 
deserialized to a stream of POJO.    The deserialized object contains an event 
timestamp,

data and details about how to analyze the contained data as well as the length 
of a time window to analyze.    It would be ideal if we could leverage the 
functionality of the

SlidingEventTimeWindows but instead of hard coding the window times, use data 
from the message to configure this on the fly.   Is this possible?


Note: the stream is keyed and timestamped with event time and the window size 
will not change for the same key.


Ultimately we need a way to dynamically change the window sizes in order to 
adjust to different timing specifications not directly controlled or even known 
before-hand by

the person writing the flink analysis program.


Paul

Reply via email to