Aljoscha,

Thanks for your response.
I understood that trigger is only set when new elements arrive, but in
my previous example, trigger fired at
20:51:40.002, then new element arrived at 20:51:41, 42, 43. So why
next trigger did not set at 20:51:45?

It looks like the following situation.
- 20:51:40.002 onProcessingTime called, and the trigger fires. In the
same method, fireState was updated to 20:51:45. but
registerProcessingTimeTimer wad not called, so next timer was not
actually set.
- 20:51:41 next element comes and onElement called. Since
currentTime(21:51:41) < nextFireTimeStamp (20:51:45),
 it just return TriggerResult.CONTINUE. Next timer was not set.

I think next time should be set at 20:51:45 when an element comes at 20:51:41.
Am I mis-understanding?

Regards,
Hironori

2016-03-31 18:08 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> yes, right now this is expected behavior. But I see that it can be a bit,
> well,  unexpected.
>
> The continuous trigger is only set when new elements arrive, so only when
> you put new elements does the trigger fire again after five seconds. If you
> want it to truly continuously fire every five seconds even though no new
> elements arrived you can change the "onProcessingTime" method to this:
>
> @Override
> public TriggerResult onProcessingTime(long time, W window, TriggerContext
> ctx) throws Exception {
>
>     ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
>     long nextFireTimestamp = fireState.value();
>
>     // only fire if an element didn't already fire
>     long currentTime = System.currentTimeMillis();
>     if (currentTime > nextFireTimestamp) {
>         long start = currentTime - (currentTime % interval);
>         fireState.update(start + interval);
>         ctx.registerProcessingTimeTimer(start +  interval); // <-- I added
> this call
>         return TriggerResult.FIRE;
>     }
>     return TriggerResult.CONTINUE;
> }
>
> I hope this helps. As I mentioned in the other thread I'm currently thinking
> about how to make the triggers more intuitive since right now they are not
> very easy to comprehend because the names can also be misleading.
>
> Cheers,
> Aljoscha
>
> On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <ogibaya...@gmail.com>
> wrote:
>>
>> Hi
>>
>> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>>
>> I asked similar question before and applied this patch.
>>
>> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
>> It looked work but still I have strange behavior.
>>
>> The code is:
>>
>> ----
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val input =
>>
>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>       .fold(Set[String]()){(r,i) => { r + i}}
>>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>
>>     input print
>> ---
>>
>> This case, the base window is long, so I just expect cumulative
>> distinct count of the value every 5 seconds.
>>
>> Appended 8 strings to the input file with 1 second interval.
>>
>> ---
>> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
>> 1; done
>> Wed Mar 30 20:51:36 JST 2016
>> Wed Mar 30 20:51:37 JST 2016
>> Wed Mar 30 20:51:38 JST 2016
>> Wed Mar 30 20:51:39 JST 2016
>> Wed Mar 30 20:51:40 JST 2016
>> Wed Mar 30 20:51:41 JST 2016
>> Wed Mar 30 20:51:42 JST 2016
>> Wed Mar 30 20:51:43 JST 2016
>> ---
>>
>> But I only received 1 output event. I should receive one more event  5
>> seconds later, but actually nothing.
>>
>> (2016-03-30 20:51:40.002,4)
>>
>> Later, if I put additional line to the file. I got these events.
>>
>> (2016-03-30 21:12:05.39,9)
>> (2016-03-30 21:12:10.001,9)
>>
>> I slightly modified ContinuousProcessingTimeTrigger.java and added
>> logging in onProcessingTime method. It looks like the method was
>> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>>
>> ----
>> 2016-03-30 20:51:40,002 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 20:51:40.002
>> ...
>> 2016-03-30 21:12:10,001 INFO
>>
>> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>>  - onProcessingTime called: 2016-03-30 21:12:10.001
>> ----
>>
>> Is this an expected behavior?
>>
>> Regards,
>> Hironori

Reply via email to