Hello Aljoscha and Hironori, Nice initiative! I totally agree with the proposals in the document. I also left some comments and I will soon start working on some of the issues there.
Kostas > On Mar 25, 2016, at 12:53 PM, Hironori Ogibayashi <ogibaya...@gmail.com> > wrote: > > Aljoscha, > > Yes, it's reproducible as long as I tried. Here is the code and > procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a > > Thank you for your explanation about the removal of windows. I didn't > know that calling .trigger() will replace default > window trigger. > I have read your document, I personally think the current .window, > .trigger, .evictor APIs are flexible enough if they > work as expected. ( calling .trigger does not replace the > windowAssigner's default trigger/evictor, but just add additional > trigger). Although I am new to Flink and may not understand the > problem correctly. > As for "Testability and Test Coverage" section, I think it's good idea > to use internal clock instead of System clock. > It will make it easy to test my streaming jobs. > > Regards, > Hironori > > 2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> Hi, >> the output at 19:44:44.635 is indeed strange. Is this reproducible? >> >> As for the removal of windows. That is a pitfall a lot of users have fallen >> into. The timeWindowAll() call just sets up a window assigner, so in your >> case the equivalent call would be: >> >> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >> .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— >> difference is here >> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> .fold(Set[String]()){(r,i) => { r + i}} >> >> The window assigners itself does not do any cleanup or triggering of window >> processing. It does, however, come with a default Trigger which is >> ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger >> fill fire once at the end of a window and then also purge the window >> contents. By calling trigger() the default trigger is replaced and >> ContinuousProcessingTimeTrigger does not clean up (purge) windows. >> >> This is something that seems to happen for a lot of people, I therefore >> started an initiative to try and improve windows/triggers: >> https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118a-403b-b766-634908325...@apache.org%3e >> >> I created an associated doc to keep track of my proposed changes: >> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing >> >> What do you think? >> >> Cheers, >> Aljoscha >>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibaya...@gmail.com> wrote: >>> >>> Aljoscha, >>> >>> Thank you for fixing the issue. >>> I built both Flink server and job with the code you provided, and it >>> worked as almost expected. >>> The output was below. I am wondering why the value emitted at >>> 19:44:44.635 while I set >>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a >>> problem for me. >>> >>> --- >>> (2016-03-22 19:44:35.002,1) >>> (2016-03-22 19:44:44.635,2) >>> (2016-03-22 19:44:45.001,2) >>> (2016-03-22 19:45:45.001,1) >>> --- >>> >>> And regarding the removal from the window, you mean the data remains >>> in the window even if >>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)? >>> I thought that ContinuousProcessingTimeTrigger works on top of >>> timeWindowAll and timeWindowAll >>> take care of purging data from the window. >>> >>> --- >>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) >>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >>> --- >>> >>> Regards, >>> Hironori >>> >>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >>>> Hi, >>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. >>>> The timer is not correctly set. You can try it with this fixed version, >>>> that I will also update in the Flink code: >>>> https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930 >>>> >>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the >>>> window. The default EventTimeTrigger will fire a window and purge the >>>> contents while the ContinuousProcessingTimeTrigger will only ever fire for >>>> a window. This means that you will have a lot of windows hanging around in >>>> your state at some points and they will never be cleaned up. For now, if >>>> you require the behavior of continuously firing on a TimeWindow I would >>>> suggest to write a custom Trigger based on EventTimeTrigger (or >>>> ProcessingTimeTrigger) that does the firing and purging on time and also >>>> has the continuous triggering at earlier times. >>>> >>>> Let us know if you need more information about this. Kostas Kloudas also >>>> recently looked into writing custom Triggers, so maybe he has some >>>> material he could give to you. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> >>>>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I have a question about TumblingProcessingTimeWindow and >>>>> ContinuousProcessingTimeTrigger. >>>>> >>>>> The code I tried is below. Output the distinct count of the words, >>>>> counts are printed every 5 seconds and window is reset every 1 minute. >>>>> >>>>> --- >>>>> val input = >>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) >>>>> .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } >>>>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS)) >>>>> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5))) >>>>> .fold(Set[String]()){(r,i) => { r + i}} >>>>> .map{x => (new Timestamp(System.currentTimeMillis()), x.size)} >>>>> >>>>> input print >>>>> --- >>>>> >>>>> I wrote data to the input file with some interval. >>>>> >>>>> --- >>>>> echo "aaa" >> input.txt >>>>> echo "aaa" >> input.txt >>>>> sleep 10 >>>>> echo "bbb" >> input.txt >>>>> sleep 60 >>>>> echo "ccc" >> input.txt >>>>> --- >>>>> >>>>> The result I got was just 1 record. The expected output was 1 -> (10+ >>>>> sec later) 2 -> (60+ sec later) 1 . >>>>> --- >>>>> (2016-03-18 13:08:59.288,2) >>>>> --- >>>>> >>>>> Even after several minutes, I never got additional record. In my >>>>> understanding, with >>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two >>>>> operator (fold, map) in the code above will be evaluated every 5 >>>>> seconds. >>>>> Am I mis-understand something? >>>>> >>>>> Regards, >>>>> Hironori >>>> >>