Aljoscha, Yes, it's reproducible as long as I tried. Here is the code and procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a
Thank you for your explanation about the removal of windows. I didn't know that calling .trigger() will replace default window trigger. I have read your document, I personally think the current .window, .trigger, .evictor APIs are flexible enough if they work as expected. ( calling .trigger does not replace the windowAssigner's default trigger/evictor, but just add additional trigger). Although I am new to Flink and may not understand the problem correctly. As for "Testability and Test Coverage" section, I think it's good idea to use internal clock instead of System clock. It will make it easy to test my streaming jobs. Regards, Hironori 2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > the output at 19:44:44.635 is indeed strange. Is this reproducible? > > As for the removal of windows. That is a pitfall a lot of users have fallen > into. The timeWindowAll() call just sets up a window assigner, so in your > case the equivalent call would be: > > .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } > .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— > difference is here > .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) > .fold(Set[String]()){(r,i) => { r + i}} > > The window assigners itself does not do any cleanup or triggering of window > processing. It does, however, come with a default Trigger which is > ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger > fill fire once at the end of a window and then also purge the window > contents. By calling trigger() the default trigger is replaced and > ContinuousProcessingTimeTrigger does not clean up (purge) windows. > > This is something that seems to happen for a lot of people, I therefore > started an initiative to try and improve windows/triggers: > https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118a-403b-b766-634908325...@apache.org%3e > > I created an associated doc to keep track of my proposed changes: > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing > > What do you think? > > Cheers, > Aljoscha >> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibaya...@gmail.com> wrote: >> >> Aljoscha, >> >> Thank you for fixing the issue. >> I built both Flink server and job with the code you provided, and it >> worked as almost expected. >> The output was below. I am wondering why the value emitted at >> 19:44:44.635 while I set >> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a >> problem for me. >> >> --- >> (2016-03-22 19:44:35.002,1) >> (2016-03-22 19:44:44.635,2) >> (2016-03-22 19:44:45.001,2) >> (2016-03-22 19:45:45.001,1) >> --- >> >> And regarding the removal from the window, you mean the data remains >> in the window even if >> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)? >> I thought that ContinuousProcessingTimeTrigger works on top of >> timeWindowAll and timeWindowAll >> take care of purging data from the window. >> >> --- >> .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> --- >> >> Regards, >> Hironori >> >> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >>> Hi, >>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The >>> timer is not correctly set. You can try it with this fixed version, that I >>> will also update in the Flink code: >>> https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930 >>> >>> One more thing, the ContinuousProcessingTimeTrigger will never remove the >>> window. The default EventTimeTrigger will fire a window and purge the >>> contents while the ContinuousProcessingTimeTrigger will only ever fire for >>> a window. This means that you will have a lot of windows hanging around in >>> your state at some points and they will never be cleaned up. For now, if >>> you require the behavior of continuously firing on a TimeWindow I would >>> suggest to write a custom Trigger based on EventTimeTrigger (or >>> ProcessingTimeTrigger) that does the firing and purging on time and also >>> has the continuous triggering at earlier times. >>> >>> Let us know if you need more information about this. Kostas Kloudas also >>> recently looked into writing custom Triggers, so maybe he has some material >>> he could give to you. >>> >>> Cheers, >>> Aljoscha >>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote: >>>> >>>> Hello, >>>> >>>> I have a question about TumblingProcessingTimeWindow and >>>> ContinuousProcessingTimeTrigger. >>>> >>>> The code I tried is below. Output the distinct count of the words, >>>> counts are printed every 5 seconds and window is reset every 1 minute. >>>> >>>> --- >>>> val input = >>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >>>> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >>>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) >>>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >>>> .fold(Set[String]()){(r,i) => { r + i}} >>>> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >>>> >>>> input print >>>> --- >>>> >>>> I wrote data to the input file with some interval. >>>> >>>> --- >>>> echo "aaa" >> input.txt >>>> echo "aaa" >> input.txt >>>> sleep 10 >>>> echo "bbb" >> input.txt >>>> sleep 60 >>>> echo "ccc" >> input.txt >>>> --- >>>> >>>> The result I got was just 1 record. The expected output was 1 -> (10+ >>>> sec later) 2 -> (60+ sec later) 1 . >>>> --- >>>> (2016-03-18 13:08:59.288,2) >>>> --- >>>> >>>> Even after several minutes, I never got additional record. In my >>>> understanding, with >>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two >>>> operator (fold, map) in the code above will be evaluated every 5 >>>> seconds. >>>> Am I mis-understand something? >>>> >>>> Regards, >>>> Hironori >>> >