Hey Fabian, it seems as simple as this ?
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based
on the timestamp of the
* elements and the date zone the offset it computed off. Windows
cannot overlap.
*
* <p>For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1),
ZoneId.of(AMERICA_NEW_YORK_ZONE)));
* } </pre>
*/
public class TimeZoneAwareTumblingEventTimeWindows extends
WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final ZoneId zoneID;
protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) {
this.size = size;
this.zoneID = zoneID;
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link
WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TimeZoneAwareTumblingEventTimeWindows of(Time size,
ZoneId zoneID) {
return new
TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID);
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
ZonedDateTime dateTime =
Instant.ofEpochMilli(timestamp).atZone(zoneID);
long start =
TimeWindow.getWindowStartWithOffset(timestamp,
dateTime.getOffset().getTotalSeconds()*1000l, size);
return Collections.singletonList(new TimeWindow(start,
start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE
timestamp (= no timestamp marker). " +
"Is the time characteristic set to
'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
@Override
public TypeSerializer<TimeWindow>
getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
On Wed, May 2, 2018 at 9:31 AM, Vishal Santoshi <[email protected]>
wrote:
> True that. Thanks. Wanted to be sure before I go down that path.
>
>
> On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <[email protected]> wrote:
>
>> Hi Vishal,
>>
>> AFAIK it is not possible with Flink's default time windows.
>> However, it should be possible to implement a custom WindowAssigner for
>> your use case.
>> I'd have a look at the TumblingEventTimeWindows class and copy/modify it
>> to your needs.
>>
>> Best, Fabian
>>
>> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <[email protected]>:
>>
>>> This does not seem possible but need some confirmation. Anyone ?
>>>
>>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
>>> [email protected]> wrote:
>>>
>>>> How do I align a Window with EDT with day light saving correction ?
>>>> The offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12
>>>> , 18 and so on but on EDT.
>>>>
>>>
>>>
>>
>