qi quan created FLINK-9668: ------------------------------ Summary: In some case Trigger.onProcessingTime don't exectue Key: FLINK-9668 URL: https://issues.apache.org/jira/browse/FLINK-9668 Project: Flink Issue Type: Bug Reporter: qi quan
For example, I would like to achieve a statistical window of one day, and I want to output the result of the indicator every 1 minute. So I implemented my Trigger like this. onElement: check if valuestate has stored the nextfiretime, register the nextfiretime, onProcessingTime: Registers the nextfiretime (time+1min),update valuestate, return FIRE_AND_PURGE. (The amount of data in one day is too large. I don't want to store such a large window state.) {code:java} public class PayAmountTrigger extends Trigger<Tuple2<String, String>, TimeWindow> { private static final Logger LOGGER = LoggerFactory.getLogger(PayAmountTrigger.class); private static final Long PERIOD = 1000L * 5; ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor("fire-time", LongSerializer.INSTANCE); @Override public TriggerResult onElement(Tuple2<String, String> tuple2, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { ValueState<Long> firstTimeState = triggerContext.getPartitionedState(stateDesc); long time = triggerContext.getCurrentProcessingTime(); if (firstTimeState.value() == null) { long start = time - (time % PERIOD); long nextFireTimestamp = start + PERIOD; triggerContext.registerProcessingTimeTimer(nextFireTimestamp); firstTimeState.update(nextFireTimestamp); return TriggerResult.CONTINUE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { ValueState<Long> state = triggerContext.getPartitionedState(stateDesc); if (state.value().equals(l)) { state.clear(); state.update(l + PERIOD); triggerContext.registerProcessingTimeTimer(l + PERIOD); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { System.out.println("PayAmountTrigger_clear"); ValueState<Long> firstTimeState = triggerContext.getPartitionedState(stateDesc); long timestamp = firstTimeState.value(); triggerContext.deleteProcessingTimeTimer(timestamp); firstTimeState.clear(); } }{code} Then I found out that if there is no data in this minute, onProcessingTime will not be executed and you will miss the trigger time forever. Then I dig through the code and find in the WindowOperator.onProcessingTime {code:java} ACC contents = null; if (windowState != null) { contents = windowState.get(); } if (contents != null) { TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { emitWindowContents(triggerContext.window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } }{code} This means that if no data comes up for this minute,And I also purge the window data, triggerContext.onProcessingTime will never be executed.I think this is a bug in flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)