Oh I see what you mean now. I think the problem is that onProcessingTime
changes nextFireTimestamp without actually setting a Trigger, as you said.
I think changing onProcessingTime to this should have the correct result:
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext
ctx) throws Exception {
ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
long nextFireTimestamp = fireState.value();
// only fire if an element didn't already fire
long currentTime = System.currentTimeMillis();
if (currentTime > nextFireTimestamp) {
fireState.update(0); // <- set to 0 so that onElement will set a
timer
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
What do you think? This should have the behavior that it continuously
fires, but only if new elements arrive.
Cheers,
Aljoscha
On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <[email protected]>
wrote:
> Aljoscha,
>
> Thanks for your response.
> I understood that trigger is only set when new elements arrive, but in
> my previous example, trigger fired at
> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
> next trigger did not set at 20:51:45?
>
> It looks like the following situation.
> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the
> same method, fireState was updated to 20:51:45. but
> registerProcessingTimeTimer wad not called, so next timer was not
> actually set.
> - 20:51:41 next element comes and onElement called. Since
> currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
> it just return TriggerResult.CONTINUE. Next timer was not set.
>
> I think next time should be set at 20:51:45 when an element comes at
> 20:51:41.
> Am I mis-understanding?
>
> Regards,
> Hironori
>
> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <[email protected]>:
> > Hi,
> > yes, right now this is expected behavior. But I see that it can be a bit,
> > well, unexpected.
> >
> > The continuous trigger is only set when new elements arrive, so only when
> > you put new elements does the trigger fire again after five seconds. If
> you
> > want it to truly continuously fire every five seconds even though no new
> > elements arrived you can change the "onProcessingTime" method to this:
> >
> > @Override
> > public TriggerResult onProcessingTime(long time, W window, TriggerContext
> > ctx) throws Exception {
> >
> > ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
> > long nextFireTimestamp = fireState.value();
> >
> > // only fire if an element didn't already fire
> > long currentTime = System.currentTimeMillis();
> > if (currentTime > nextFireTimestamp) {
> > long start = currentTime - (currentTime % interval);
> > fireState.update(start + interval);
> > ctx.registerProcessingTimeTimer(start + interval); // <-- I
> added
> > this call
> > return TriggerResult.FIRE;
> > }
> > return TriggerResult.CONTINUE;
> > }
> >
> > I hope this helps. As I mentioned in the other thread I'm currently
> thinking
> > about how to make the triggers more intuitive since right now they are
> not
> > very easy to comprehend because the names can also be misleading.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <[email protected]>
> > wrote:
> >>
> >> Hi
> >>
> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
> >>
> >> I asked similar question before and applied this patch.
> >>
> >>
> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
> >> It looked work but still I have strange behavior.
> >>
> >> The code is:
> >>
> >> ----
> >> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> val input =
> >>
> >>
> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
> >> .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
> >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> >> .fold(Set[String]()){(r,i) => { r + i}}
> >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
> >>
> >> input print
> >> ---
> >>
> >> This case, the base window is long, so I just expect cumulative
> >> distinct count of the value every 5 seconds.
> >>
> >> Appended 8 strings to the input file with 1 second interval.
> >>
> >> ---
> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
> >> 1; done
> >> Wed Mar 30 20:51:36 JST 2016
> >> Wed Mar 30 20:51:37 JST 2016
> >> Wed Mar 30 20:51:38 JST 2016
> >> Wed Mar 30 20:51:39 JST 2016
> >> Wed Mar 30 20:51:40 JST 2016
> >> Wed Mar 30 20:51:41 JST 2016
> >> Wed Mar 30 20:51:42 JST 2016
> >> Wed Mar 30 20:51:43 JST 2016
> >> ---
> >>
> >> But I only received 1 output event. I should receive one more event 5
> >> seconds later, but actually nothing.
> >>
> >> (2016-03-30 20:51:40.002,4)
> >>
> >> Later, if I put additional line to the file. I got these events.
> >>
> >> (2016-03-30 21:12:05.39,9)
> >> (2016-03-30 21:12:10.001,9)
> >>
> >> I slightly modified ContinuousProcessingTimeTrigger.java and added
> >> logging in onProcessingTime method. It looks like the method was
> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and 21:12:05.
> >>
> >> ----
> >> 2016-03-30 20:51:40,002 INFO
> >>
> >>
> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
> >> - onProcessingTime called: 2016-03-30 20:51:40.002
> >> ...
> >> 2016-03-30 21:12:10,001 INFO
> >>
> >>
> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
> >> - onProcessingTime called: 2016-03-30 21:12:10.001
> >> ----
> >>
> >> Is this an expected behavior?
> >>
> >> Regards,
> >> Hironori
>