Aljoscha, Thank you for fixing the issue. I built both Flink server and job with the code you provided, and it worked as almost expected. The output was below. I am wondering why the value emitted at 19:44:44.635 while I set ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a problem for me.
--- (2016-03-22 19:44:35.002,1) (2016-03-22 19:44:44.635,2) (2016-03-22 19:44:45.001,2) (2016-03-22 19:45:45.001,1) --- And regarding the removal from the window, you mean the data remains in the window even if I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)? I thought that ContinuousProcessingTimeTrigger works on top of timeWindowAll and timeWindowAll take care of purging data from the window. --- .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) --- Regards, Hironori 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The > timer is not correctly set. You can try it with this fixed version, that I > will also update in the Flink code: > https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930 > > One more thing, the ContinuousProcessingTimeTrigger will never remove the > window. The default EventTimeTrigger will fire a window and purge the > contents while the ContinuousProcessingTimeTrigger will only ever fire for a > window. This means that you will have a lot of windows hanging around in your > state at some points and they will never be cleaned up. For now, if you > require the behavior of continuously firing on a TimeWindow I would suggest > to write a custom Trigger based on EventTimeTrigger (or > ProcessingTimeTrigger) that does the firing and purging on time and also has > the continuous triggering at earlier times. > > Let us know if you need more information about this. Kostas Kloudas also > recently looked into writing custom Triggers, so maybe he has some material > he could give to you. > > Cheers, > Aljoscha >> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote: >> >> Hello, >> >> I have a question about TumblingProcessingTimeWindow and >> ContinuousProcessingTimeTrigger. >> >> The code I tried is below. Output the distinct count of the words, >> counts are printed every 5 seconds and window is reset every 1 minute. >> >> --- >> val input = >> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >> .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> .fold(Set[String]()){(r,i) => { r + i}} >> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >> >> input print >> --- >> >> I wrote data to the input file with some interval. >> >> --- >> echo "aaa" >> input.txt >> echo "aaa" >> input.txt >> sleep 10 >> echo "bbb" >> input.txt >> sleep 60 >> echo "ccc" >> input.txt >> --- >> >> The result I got was just 1 record. The expected output was 1 -> (10+ >> sec later) 2 -> (60+ sec later) 1 . >> --- >> (2016-03-18 13:08:59.288,2) >> --- >> >> Even after several minutes, I never got additional record. In my >> understanding, with >> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two >> operator (fold, map) in the code above will be evaluated every 5 >> seconds. >> Am I mis-understand something? >> >> Regards, >> Hironori >