jinguishi created FLINK-13383: --------------------------------- Summary: Customize the problem in the trigger Key: FLINK-13383 URL: https://issues.apache.org/jira/browse/FLINK-13383 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.8.0 Environment: The development environment is idea, the flink version is 1.8 Reporter: jinguishi Fix For: 1.8.0 Attachments: WX20190723-174236.png, WechatIMG2.png
Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values, There are screenshots in the attachment。 code show as below {code:java} public class CountTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private CountTrigger(int count) { this.threshold = count; } private int count = 0; private int threshold; @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { long watermark = ctx.getCurrentWatermark(); ctx.registerEventTimeTimer(window.maxTimestamp()); if (count > threshold) { count = 0; return TriggerResult.FIRE; } else { count++; } System.out.println("onElement: " + element); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public String toString() { return "CountTrigger"; } public static <W extends Window> CountTrigger of(int threshold) { return new CountTrigger(threshold); } } {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)