Aljoscha, Thank you. That change looks good. I will try.
Regards, Hironori 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Oh I see what you mean now. I think the problem is that onProcessingTime > changes nextFireTimestamp without actually setting a Trigger, as you said. > > I think changing onProcessingTime to this should have the correct result: > > @Override > public TriggerResult onProcessingTime(long time, W window, TriggerContext > ctx) throws Exception { > > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); > long nextFireTimestamp = fireState.value(); > > // only fire if an element didn't already fire > long currentTime = System.currentTimeMillis(); > if (currentTime > nextFireTimestamp) { > fireState.update(0); // <- set to 0 so that onElement will set a > timer > return TriggerResult.FIRE; > } > return TriggerResult.CONTINUE; > } > > What do you think? This should have the behavior that it continuously fires, > but only if new elements arrive. > > Cheers, > Aljoscha > > On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <ogibaya...@gmail.com> > wrote: >> >> Aljoscha, >> >> Thanks for your response. >> I understood that trigger is only set when new elements arrive, but in >> my previous example, trigger fired at >> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why >> next trigger did not set at 20:51:45? >> >> It looks like the following situation. >> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the >> same method, fireState was updated to 20:51:45. but >> registerProcessingTimeTimer wad not called, so next timer was not >> actually set. >> - 20:51:41 next element comes and onElement called. Since >> currentTime(21:51:41) < nextFireTimeStamp (20:51:45), >> it just return TriggerResult.CONTINUE. Next timer was not set. >> >> I think next time should be set at 20:51:45 when an element comes at >> 20:51:41. >> Am I mis-understanding? >> >> Regards, >> Hironori >> >> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> > Hi, >> > yes, right now this is expected behavior. But I see that it can be a >> > bit, >> > well, unexpected. >> > >> > The continuous trigger is only set when new elements arrive, so only >> > when >> > you put new elements does the trigger fire again after five seconds. If >> > you >> > want it to truly continuously fire every five seconds even though no new >> > elements arrived you can change the "onProcessingTime" method to this: >> > >> > @Override >> > public TriggerResult onProcessingTime(long time, W window, >> > TriggerContext >> > ctx) throws Exception { >> > >> > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc); >> > long nextFireTimestamp = fireState.value(); >> > >> > // only fire if an element didn't already fire >> > long currentTime = System.currentTimeMillis(); >> > if (currentTime > nextFireTimestamp) { >> > long start = currentTime - (currentTime % interval); >> > fireState.update(start + interval); >> > ctx.registerProcessingTimeTimer(start + interval); // <-- I >> > added >> > this call >> > return TriggerResult.FIRE; >> > } >> > return TriggerResult.CONTINUE; >> > } >> > >> > I hope this helps. As I mentioned in the other thread I'm currently >> > thinking >> > about how to make the triggers more intuitive since right now they are >> > not >> > very easy to comprehend because the names can also be misleading. >> > >> > Cheers, >> > Aljoscha >> > >> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <ogibaya...@gmail.com> >> > wrote: >> >> >> >> Hi >> >> >> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire. >> >> >> >> I asked similar question before and applied this patch. >> >> >> >> >> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8 >> >> It looked work but still I have strange behavior. >> >> >> >> The code is: >> >> >> >> ---- >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> val input = >> >> >> >> >> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >> >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) >> >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> >> .fold(Set[String]()){(r,i) => { r + i}} >> >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >> >> >> >> input print >> >> --- >> >> >> >> This case, the base window is long, so I just expect cumulative >> >> distinct count of the value every 5 seconds. >> >> >> >> Appended 8 strings to the input file with 1 second interval. >> >> >> >> --- >> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep >> >> 1; done >> >> Wed Mar 30 20:51:36 JST 2016 >> >> Wed Mar 30 20:51:37 JST 2016 >> >> Wed Mar 30 20:51:38 JST 2016 >> >> Wed Mar 30 20:51:39 JST 2016 >> >> Wed Mar 30 20:51:40 JST 2016 >> >> Wed Mar 30 20:51:41 JST 2016 >> >> Wed Mar 30 20:51:42 JST 2016 >> >> Wed Mar 30 20:51:43 JST 2016 >> >> --- >> >> >> >> But I only received 1 output event. I should receive one more event 5 >> >> seconds later, but actually nothing. >> >> >> >> (2016-03-30 20:51:40.002,4) >> >> >> >> Later, if I put additional line to the file. I got these events. >> >> >> >> (2016-03-30 21:12:05.39,9) >> >> (2016-03-30 21:12:10.001,9) >> >> >> >> I slightly modified ContinuousProcessingTimeTrigger.java and added >> >> logging in onProcessingTime method. It looks like the method was >> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05. >> >> >> >> ---- >> >> 2016-03-30 20:51:40,002 INFO >> >> >> >> >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> >> - onProcessingTime called: 2016-03-30 20:51:40.002 >> >> ... >> >> 2016-03-30 21:12:10,001 INFO >> >> >> >> >> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger >> >> - onProcessingTime called: 2016-03-30 21:12:10.001 >> >> ---- >> >> >> >> Is this an expected behavior? >> >> >> >> Regards, >> >> Hironori