Aljoscha,

Yes, it's reproducible as long as I tried. Here is the code and
procedure: https://gist.github.com/ogibayashi/402153bcd79138c35d6a

Thank you for your explanation about the removal of windows. I didn't
know that calling .trigger() will replace default
window trigger.
I have read your document, I personally think the current .window,
.trigger, .evictor APIs are flexible enough if they
work as expected. ( calling .trigger does not replace the
windowAssigner's default trigger/evictor, but just add additional
trigger). Although I am new to Flink and may not understand the
problem correctly.
As for "Testability and Test Coverage" section, I think it's good idea
to use internal clock instead of System clock.
It will make it easy to test my streaming jobs.

Regards,
Hironori

2016-03-23 18:24 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> the output at 19:44:44.635 is indeed strange. Is this reproducible?
>
> As for the removal of windows. That is a pitfall a lot of users have fallen 
> into. The timeWindowAll() call just sets up a window assigner, so in your 
> case the equivalent call would be:
>
>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) <— 
> difference is here
>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      .fold(Set[String]()){(r,i) => { r + i}}
>
> The window assigners itself does not do any cleanup or triggering of window 
> processing. It does, however, come with a default Trigger which is 
> ProcessingTimeTrigger in case of TumblingProcessingTimeWindows. This trigger 
> fill fire once at the end of a window and then also purge the window 
> contents. By calling trigger() the default trigger is replaced and 
> ContinuousProcessingTimeTrigger does not clean up (purge) windows.
>
> This is something that seems to happen for a lot of people, I therefore 
> started an initiative to try and improve windows/triggers: 
> https://mail-archives.apache.org/mod_mbox/flink-dev/201603.mbox/%3c16991435-118a-403b-b766-634908325...@apache.org%3e
>
> I created an associated doc to keep track of my proposed changes: 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
>
> What do you think?
>
> Cheers,
> Aljoscha
>> On 23 Mar 2016, at 07:52, Hironori Ogibayashi <ogibaya...@gmail.com> wrote:
>>
>> Aljoscha,
>>
>> Thank you for fixing the issue.
>> I built both Flink server and job with the code you provided, and it
>> worked as almost expected.
>> The output was below. I am wondering why the value emitted at
>> 19:44:44.635 while I set
>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
>> problem for me.
>>
>> ---
>> (2016-03-22 19:44:35.002,1)
>> (2016-03-22 19:44:44.635,2)
>> (2016-03-22 19:44:45.001,2)
>> (2016-03-22 19:45:45.001,1)
>> ---
>>
>> And regarding the removal from the window, you mean the data remains
>> in the window even if
>> I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
>> I thought that ContinuousProcessingTimeTrigger works on top of
>> timeWindowAll and timeWindowAll
>> take care of purging data from the window.
>>
>> ---
>> .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>> ---
>>
>> Regards,
>> Hironori
>>
>> 2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>>> Hi,
>>> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The 
>>> timer is not correctly set. You can try it with this fixed version, that I 
>>> will also update in the Flink code: 
>>> https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>>>
>>> One more thing, the ContinuousProcessingTimeTrigger will never remove the 
>>> window. The default EventTimeTrigger will fire a window and purge the 
>>> contents while the ContinuousProcessingTimeTrigger will only ever fire for 
>>> a window. This means that you will have a lot of windows hanging around in 
>>> your state at some points and they will never be cleaned up. For now, if 
>>> you require the behavior of continuously firing on a TimeWindow I would 
>>> suggest to write a custom Trigger based on EventTimeTrigger (or 
>>> ProcessingTimeTrigger) that does the firing and purging on time and also 
>>> has the continuous triggering at earlier times.
>>>
>>> Let us know if you need more information about this. Kostas Kloudas also 
>>> recently looked into writing custom Triggers, so maybe he has some material 
>>> he could give to you.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I have a question about TumblingProcessingTimeWindow and
>>>> ContinuousProcessingTimeTrigger.
>>>>
>>>> The code I tried is below. Output the distinct count of the words,
>>>> counts are printed every 5 seconds and window is reset every 1 minute.
>>>>
>>>> ---
>>>>   val input =
>>>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>>>     .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>>>     .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>>>     .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>>>     .fold(Set[String]()){(r,i) => { r + i}}
>>>>     .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>>>
>>>>   input print
>>>> ---
>>>>
>>>> I wrote data to the input file with some interval.
>>>>
>>>> ---
>>>> echo "aaa" >> input.txt
>>>> echo "aaa" >> input.txt
>>>> sleep 10
>>>> echo "bbb" >> input.txt
>>>> sleep 60
>>>> echo "ccc" >> input.txt
>>>> ---
>>>>
>>>> The result I got was just 1 record. The expected output was 1 -> (10+
>>>> sec later) 2 -> (60+ sec later) 1 .
>>>> ---
>>>> (2016-03-18 13:08:59.288,2)
>>>> ---
>>>>
>>>> Even after several minutes, I never got additional record. In my
>>>> understanding, with
>>>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>>>> operator (fold, map) in the code above will be evaluated every 5
>>>> seconds.
>>>> Am I mis-understand something?
>>>>
>>>> Regards,
>>>> Hironori
>>>
>

Reply via email to