It worked as expected.
One thing I also need to modify was the condition in onProcessingTime
and onElement

          if (currentTime > nextFireTimestamp) {

to

          if (currentTime >= nextFireTimestamp) {

Because there was a case when currentTime and nextFireTimestamp was
equal, so the trigger did not fire.

Thanks a lot for your help!

Regards,
Hironori


2016-04-01 0:15 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>:
> Aljoscha,
>
> Thank you. That change looks good. I will try.
>
> Regards,
> Hironori
>
> 2016-03-31 22:20 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> Oh I see what you mean now. I think the problem is that onProcessingTime
>> changes nextFireTimestamp without actually setting a Trigger, as you said.
>>
>> I think changing onProcessingTime to this should have the correct result:
>>
>> @Override
>> public TriggerResult onProcessingTime(long time, W window, TriggerContext
>> ctx) throws Exception {
>>
>>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>>     long nextFireTimestamp = fireState.value();
>>
>>     // only fire if an element didn't already fire
>>     long currentTime = System.currentTimeMillis();
>>     if (currentTime > nextFireTimestamp) {
>>         fireState.update(0); // <- set to 0 so that onElement will set a
>> timer
>>         return TriggerResult.FIRE;
>>     }
>>     return TriggerResult.CONTINUE;
>> }
>>
>> What do you think? This should have the behavior that it continuously fires,
>> but only if new elements arrive.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 31 Mar 2016 at 14:46 Hironori Ogibayashi <ogibaya...@gmail.com>
>> wrote:
>>>
>>> Aljoscha,
>>>
>>> Thanks for your response.
>>> I understood that trigger is only set when new elements arrive, but in
>>> my previous example, trigger fired at
>>> 20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
>>> next trigger did not set at 20:51:45?
>>>
>>> It looks like the following situation.
>>> - 20:51:40.002 onProcessingTime called, and the trigger fires. In the
>>> same method, fireState was updated to 20:51:45. but
>>> registerProcessingTimeTimer wad not called, so next timer was not
>>> actually set.
>>> - 20:51:41 next element comes and onElement called. Since
>>> currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
>>>  it just return TriggerResult.CONTINUE. Next timer was not set.
>>>
>>> I think next time should be set at 20:51:45 when an element comes at
>>> 20:51:41.
>>> Am I mis-understanding?
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>>> > Hi,
>>> > yes, right now this is expected behavior. But I see that it can be a
>>> > bit,
>>> > well,  unexpected.
>>> >
>>> > The continuous trigger is only set when new elements arrive, so only
>>> > when
>>> > you put new elements does the trigger fire again after five seconds. If
>>> > you
>>> > want it to truly continuously fire every five seconds even though no new
>>> > elements arrived you can change the "onProcessingTime" method to this:
>>> >
>>> > @Override
>>> > public TriggerResult onProcessingTime(long time, W window,
>>> > TriggerContext
>>> > ctx) throws Exception {
>>> >
>>> >     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>>> >     long nextFireTimestamp = fireState.value();
>>> >
>>> >     // only fire if an element didn't already fire
>>> >     long currentTime = System.currentTimeMillis();
>>> >     if (currentTime > nextFireTimestamp) {
>>> >         long start = currentTime - (currentTime % interval);
>>> >         fireState.update(start + interval);
>>> >         ctx.registerProcessingTimeTimer(start +  interval); // <-- I
>>> > added
>>> > this call
>>> >         return TriggerResult.FIRE;
>>> >     }
>>> >     return TriggerResult.CONTINUE;
>>> > }
>>> >
>>> > I hope this helps. As I mentioned in the other thread I'm currently
>>> > thinking
>>> > about how to make the triggers more intuitive since right now they are
>>> > not
>>> > very easy to comprehend because the names can also be misleading.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <ogibaya...@gmail.com>
>>> > wrote:
>>> >>
>>> >> Hi
>>> >>
>>> >> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>>> >>
>>> >> I asked similar question before and applied this patch.
>>> >>
>>> >>
>>> >> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>>> >> It looked work but still I have strange behavior.
>>> >>
>>> >> The code is:
>>> >>
>>> >> ----
>>> >>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> >>     val input =
>>> >>
>>> >>
>>> >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>> >>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>> >>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>>> >>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>> >>       .fold(Set[String]()){(r,i) => { r + i}}
>>> >>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>> >>
>>> >>     input print
>>> >> ---
>>> >>
>>> >> This case, the base window is long, so I just expect cumulative
>>> >> distinct count of the value every 5 seconds.
>>> >>
>>> >> Appended 8 strings to the input file with 1 second interval.
>>> >>
>>> >> ---
>>> >> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>>> >> 1; done
>>> >> Wed Mar 30 20:51:36 JST 2016
>>> >> Wed Mar 30 20:51:37 JST 2016
>>> >> Wed Mar 30 20:51:38 JST 2016
>>> >> Wed Mar 30 20:51:39 JST 2016
>>> >> Wed Mar 30 20:51:40 JST 2016
>>> >> Wed Mar 30 20:51:41 JST 2016
>>> >> Wed Mar 30 20:51:42 JST 2016
>>> >> Wed Mar 30 20:51:43 JST 2016
>>> >> ---
>>> >>
>>> >> But I only received 1 output event. I should receive one more event  5
>>> >> seconds later, but actually nothing.
>>> >>
>>> >> (2016-03-30 20:51:40.002,4)
>>> >>
>>> >> Later, if I put additional line to the file. I got these events.
>>> >>
>>> >> (2016-03-30 21:12:05.39,9)
>>> >> (2016-03-30 21:12:10.001,9)
>>> >>
>>> >> I slightly modified ContinuousProcessingTimeTrigger.java and added
>>> >> logging in onProcessingTime method. It looks like the method was
>>> >> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>>> >>
>>> >> ----
>>> >> 2016-03-30 20:51:40,002 INFO
>>> >>
>>> >>
>>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>> >>  - onProcessingTime called: 2016-03-30 20:51:40.002
>>> >> ...
>>> >> 2016-03-30 21:12:10,001 INFO
>>> >>
>>> >>
>>> >> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>> >>  - onProcessingTime called: 2016-03-30 21:12:10.001
>>> >> ----
>>> >>
>>> >> Is this an expected behavior?
>>> >>
>>> >> Regards,
>>> >> Hironori

Reply via email to