Hi

I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.

I asked similar question before and applied this patch.
https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
It looked work but still I have strange behavior.

The code is:

----
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input =
env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}

    input print
---

This case, the base window is long, so I just expect cumulative
distinct count of the value every 5 seconds.

Appended 8 strings to the input file with 1 second interval.

---
% for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
1; done
Wed Mar 30 20:51:36 JST 2016
Wed Mar 30 20:51:37 JST 2016
Wed Mar 30 20:51:38 JST 2016
Wed Mar 30 20:51:39 JST 2016
Wed Mar 30 20:51:40 JST 2016
Wed Mar 30 20:51:41 JST 2016
Wed Mar 30 20:51:42 JST 2016
Wed Mar 30 20:51:43 JST 2016
---

But I only received 1 output event. I should receive one more event  5
seconds later, but actually nothing.

(2016-03-30 20:51:40.002,4)

Later, if I put additional line to the file. I got these events.

(2016-03-30 21:12:05.39,9)
(2016-03-30 21:12:10.001,9)

I slightly modified ContinuousProcessingTimeTrigger.java and added
logging in onProcessingTime method. It looks like the method was
called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.

----
2016-03-30 20:51:40,002 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 20:51:40.002
...
2016-03-30 21:12:10,001 INFO
org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
 - onProcessingTime called: 2016-03-30 21:12:10.001
----

Is this an expected behavior?

Regards,
Hironori

Reply via email to