Hi,
yes, right now this is expected behavior. But I see that it can be a bit,
well,  unexpected.

The continuous trigger is only set when new elements arrive, so only when
you put new elements does the trigger fire again after five seconds. If you
want it to truly continuously fire every five seconds even though no new
elements arrived you can change the "onProcessingTime" method to this:

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext
ctx) throws Exception {

    ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
    long nextFireTimestamp = fireState.value();

    // only fire if an element didn't already fire
    long currentTime = System.currentTimeMillis();
    if (currentTime > nextFireTimestamp) {
        long start = currentTime - (currentTime % interval);
        fireState.update(start + interval);
        ctx.registerProcessingTimeTimer(start +  interval); // <-- I added
this call
        return TriggerResult.FIRE;
    }
    return TriggerResult.CONTINUE;
}

I hope this helps. As I mentioned in the other thread I'm currently
thinking about how to make the triggers more intuitive since right now they
are not very easy to comprehend because the names can also be misleading.

Cheers,
Aljoscha

On Wed, 30 Mar 2016 at 14:33 Hironori Ogibayashi <ogibaya...@gmail.com>
wrote:

> Hi
>
> I noticed that ContinuousProcessingTimeTrigger sometimes does not fire.
>
> I asked similar question before and applied this patch.
>
> https://github.com/apache/flink/commit/607892314edee95da56f4997d85610f17a0dd470#diff-19bbcb3ea1403e483327408badfcd3f8
> It looked work but still I have strange behavior.
>
> The code is:
>
> ----
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val input =
>
> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>       .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>       .windowAll(TumblingProcessingTimeWindows.of(Time.days(1)))
>       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>       .fold(Set[String]()){(r,i) => { r + i}}
>       .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>
>     input print
> ---
>
> This case, the base window is long, so I just expect cumulative
> distinct count of the value every 5 seconds.
>
> Appended 8 strings to the input file with 1 second interval.
>
> ---
> % for i in `seq 1 8`; do date; echo "aa${i}" >> ~/tmp/input.txt; sleep
> 1; done
> Wed Mar 30 20:51:36 JST 2016
> Wed Mar 30 20:51:37 JST 2016
> Wed Mar 30 20:51:38 JST 2016
> Wed Mar 30 20:51:39 JST 2016
> Wed Mar 30 20:51:40 JST 2016
> Wed Mar 30 20:51:41 JST 2016
> Wed Mar 30 20:51:42 JST 2016
> Wed Mar 30 20:51:43 JST 2016
> ---
>
> But I only received 1 output event. I should receive one more event  5
> seconds later, but actually nothing.
>
> (2016-03-30 20:51:40.002,4)
>
> Later, if I put additional line to the file. I got these events.
>
> (2016-03-30 21:12:05.39,9)
> (2016-03-30 21:12:10.001,9)
>
> I slightly modified ContinuousProcessingTimeTrigger.java and added
> logging in onProcessingTime method. It looks like the method was
> called at 20:51:40 and 21:12:10, not at 20:51:45 and  21:12:05.
>
> ----
> 2016-03-30 20:51:40,002 INFO
>
> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>  - onProcessingTime called: 2016-03-30 20:51:40.002
> ...
> 2016-03-30 21:12:10,001 INFO
>
> org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
>  - onProcessingTime called: 2016-03-30 21:12:10.001
> ----
>
> Is this an expected behavior?
>
> Regards,
> Hironori
>

Reply via email to