
I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.

I asked similar question before and applied this patch.
It looked work but still I have strange behavior.

The code is:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input =
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .fold(Set[String]()){(r,i) => { r + i}}
      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}

    input print

This case, the base window is long, so I just expect cumulative
distinct count of the value every 5 seconds.

Appended 8 strings to the input file with 1 second interval.

% for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
1; done
Wed Mar 30 20:51:36 JST 2016
Wed Mar 30 20:51:37 JST 2016
Wed Mar 30 20:51:38 JST 2016
Wed Mar 30 20:51:39 JST 2016
Wed Mar 30 20:51:40 JST 2016
Wed Mar 30 20:51:41 JST 2016
Wed Mar 30 20:51:42 JST 2016
Wed Mar 30 20:51:43 JST 2016

But I only received 1 output event. I should receive one more event  5
seconds later, but actually nothing.

(2016-03-30 20:51:40.002,4)

Later, if I put additional line to the file. I got these events.

(2016-03-30 21:12:05.39,9)
(2016-03-30 21:12:10.001,9)

I slightly modified ContinuousProcessingTimeTrigger.java and added
logging in onProcessingTime method. It looks like the method was
called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.

2016-03-30 20:51:40,002 INFO
 - onProcessingTime called: 2016-03-30 20:51:40.002
2016-03-30 21:12:10,001 INFO
 - onProcessingTime called: 2016-03-30 21:12:10.001

Is this an expected behavior?


Reply via email to