It worked as expected. One thing I also need to modify was the condition in onProcessingTime and onElement
if (currentTime > nextFireTimestamp) { to if (currentTime >= nextFireTimestamp) { Because there was a case when currentTime and nextFireTimestamp was equal, so the trigger did not fire. Thanks a lot for your help! Regards, Hironori 2016-04-01 0:15 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>: > Aljoscha, > > Thank you. That change looks good. I will try. > > Regards, > Hironori > > 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> Oh I see what you mean now. I think the problem is that onProcessingTime >> changes nextFireTimestamp without actually setting a Trigger, as you said. >> >> I think changing onProcessingTime to this should have the correct result: >> >> @Override >> public TriggerResult onProcessingTime(long time, W window, TriggerContext >> ctx) throws Exception { >> >> ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >> long nextFireTimestamp = fireState.value(); >> >> // only fire if an element didn't already fire >> long currentTime = System.currentTimeMillis(); >> if (currentTime > nextFireTimestamp) { >> fireState.update(0); // <- set to 0 so that onElement will set a >> timer >> return TriggerResult.FIRE; >> } >> return TriggerResult.CONTINUE; >> } >> >> What do you think? This should have the behavior that it continuously fires, >> but only if new elements arrive. >> >> Cheers, >> Aljoscha >> >> On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <ogibaya...@gmail.com> >> wrote: >>> >>> Aljoscha, >>> >>> Thanks for your response. >>> I understood that trigger is only set when new elements arrive, but in >>> my previous example, trigger fired at >>> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why >>> next trigger did not set at 20:51:45? >>> >>> It looks like the following situation. >>> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the >>> same method, fireState was updated to 20:51:45. but >>> registerProcessingTimeTimer wad not called, so next timer was not >>> actually set. >>> - 20:51:41 next element comes and onElement called. Since >>> currentTime(21:51:41) < nextFireTimeStamp (20:51:45), >>> it just return TriggerResult.CONTINUE. Next timer was not set. >>> >>> I think next time should be set at 20:51:45 when an element comes at >>> 20:51:41. >>> Am I mis-understanding? >>> >>> Regards, >>> Hironori >>> >>> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >>> > Hi, >>> > yes, right now this is expected behavior. But I see that it can be a >>> > bit, >>> > well, unexpected. >>> > >>> > The continuous trigger is only set when new elements arrive, so only >>> > when >>> > you put new elements does the trigger fire again after five seconds. If >>> > you >>> > want it to truly continuously fire every five seconds even though no new >>> > elements arrived you can change the "onProcessingTime" method to this: >>> > >>> > @Override >>> > public TriggerResult onProcessingTime(long time, W window, >>> > TriggerContext >>> > ctx) throws Exception { >>> > >>> > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >>> > long nextFireTimestamp = fireState.value(); >>> > >>> > // only fire if an element didn't already fire >>> > long currentTime = System.currentTimeMillis(); >>> > if (currentTime > nextFireTimestamp) { >>> > long start = currentTime - (currentTime % interval); >>> > fireState.update(start + interval); >>> > ctx.registerProcessingTimeTimer(start + interval); // <-- I >>> > added >>> > this call >>> > return TriggerResult.FIRE; >>> > } >>> > return TriggerResult.CONTINUE; >>> > } >>> > >>> > I hope this helps. As I mentioned in the other thread I'm currently >>> > thinking >>> > about how to make the triggers more intuitive since right now they are >>> > not >>> > very easy to comprehend because the names can also be misleading. >>> > >>> > Cheers, >>> > Aljoscha >>> > >>> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <ogibaya...@gmail.com> >>> > wrote: >>> >> >>> >> Hi >>> >> >>> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. >>> >> >>> >> I asked similar question before and applied this patch. >>> >> >>> >> >>> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 >>> >> It looked work but still I have strange behavior. >>> >> >>> >> The code is: >>> >> >>> >> ---- >>> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >> val input = >>> >> >>> >> >>> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >>> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >>> >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) >>> >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >>> >> .fold(Set[String]()){(r,i) => { r + i}} >>> >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >>> >> >>> >> input print >>> >> --- >>> >> >>> >> This case, the base window is long, so I just expect cumulative >>> >> distinct count of the value every 5 seconds. >>> >> >>> >> Appended 8 strings to the input file with 1 second interval. >>> >> >>> >> --- >>> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep >>> >> 1; done >>> >> Wed Mar 30 20:51:36 JST 2016 >>> >> Wed Mar 30 20:51:37 JST 2016 >>> >> Wed Mar 30 20:51:38 JST 2016 >>> >> Wed Mar 30 20:51:39 JST 2016 >>> >> Wed Mar 30 20:51:40 JST 2016 >>> >> Wed Mar 30 20:51:41 JST 2016 >>> >> Wed Mar 30 20:51:42 JST 2016 >>> >> Wed Mar 30 20:51:43 JST 2016 >>> >> --- >>> >> >>> >> But I only received 1 output event. I should receive one more event 5 >>> >> seconds later, but actually nothing. >>> >> >>> >> (2016-03-30 20:51:40.002,4) >>> >> >>> >> Later, if I put additional line to the file. I got these events. >>> >> >>> >> (2016-03-30 21:12:05.39,9) >>> >> (2016-03-30 21:12:10.001,9) >>> >> >>> >> I slightly modified ContinuousProcessingTimeTrigger.java and added >>> >> logging in onProcessingTime method. It looks like the method was >>> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. >>> >> >>> >> ---- >>> >> 2016-03-30 20:51:40,002 INFO >>> >> >>> >> >>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >>> >> - onProcessingTime called: 2016-03-30 20:51:40.002 >>> >> ... >>> >> 2016-03-30 21:12:10,001 INFO >>> >> >>> >> >>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >>> >> - onProcessingTime called: 2016-03-30 21:12:10.001 >>> >> ---- >>> >> >>> >> Is this an expected behavior? >>> >> >>> >> Regards, >>> >> Hironori