Aljoscha,

Thank you for fixing the issue.
I built both Flink server and job with the code you provided, and it
worked as almost expected.
The output was below. I am wondering why the value emitted at
19:44:44.635 while I set
ContinuousProcessingTimeTrigger.of(Time.seconds(5)), but it's not a
problem for me.

---
(2016-03-22 19:44:35.002,1)
(2016-03-22 19:44:44.635,2)
(2016-03-22 19:44:45.001,2)
(2016-03-22 19:45:45.001,1)
---

And regarding the removal from the window, you mean the data remains
in the window even if
I use both .timeWindowAll and .trigger(ContinuousProcessingTimeTrigger)?
I thought that ContinuousProcessingTimeTrigger works on top of
timeWindowAll and timeWindowAll
take care of purging data from the window.

---
.timeWindowAll(Time.of(60, TimeUnit.SECONDS))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
---

Regards,
Hironori

2016-03-21 18:56 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> I’m afraid you discovered a bug in the ContinuousProcessingTimeTrigger. The 
> timer is not correctly set. You can try it with this fixed version, that I 
> will also update in the Flink code: 
> https://gist.github.com/aljoscha/cbdbd62932b6dd2d1930
>
> One more thing, the ContinuousProcessingTimeTrigger will never remove the 
> window. The default EventTimeTrigger will fire a window and purge the 
> contents while the ContinuousProcessingTimeTrigger will only ever fire for a 
> window. This means that you will have a lot of windows hanging around in your 
> state at some points and they will never be cleaned up. For now, if you 
> require the behavior of continuously firing on a TimeWindow I would suggest 
> to write a custom Trigger based on EventTimeTrigger (or 
> ProcessingTimeTrigger) that does the firing and purging on time and also has 
> the continuous triggering at earlier times.
>
> Let us know if you need more information about this. Kostas Kloudas also 
> recently looked into writing custom Triggers, so maybe he has some material 
> he could give to you.
>
> Cheers,
> Aljoscha
>> On 18 Mar 2016, at 05:35, Hironori Ogibayashi <ogibaya...@gmail.com> wrote:
>>
>> Hello,
>>
>> I have a question about TumblingProcessingTimeWindow and
>> ContinuousProcessingTimeTrigger.
>>
>> The code I tried is below. Output the distinct count of the words,
>> counts are printed every 5 seconds and window is reset every 1 minute.
>>
>> ---
>>    val input =
>> env.readFileStream(fileName,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>>      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
>>      .timeWindowAll(Time.of(60, TimeUnit.SECONDS))
>>      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>      .fold(Set[String]()){(r,i) => { r + i}}
>>      .map{x => (new Timestamp(System.currentTimeMillis()), x.size)}
>>
>>    input print
>> ---
>>
>> I wrote data to the input file with some interval.
>>
>> ---
>> echo "aaa" >> input.txt
>> echo "aaa" >> input.txt
>> sleep 10
>> echo "bbb" >> input.txt
>> sleep 60
>> echo "ccc" >> input.txt
>> ---
>>
>> The result I got was just 1 record. The expected output was 1 -> (10+
>> sec later) 2 -> (60+ sec later) 1 .
>> ---
>> (2016-03-18 13:08:59.288,2)
>> ---
>>
>> Even after several minutes, I never got additional record. In my
>> understanding, with
>> ContinuousProcessingTimeTrigger.of(Time.seconds(5)), the last two
>> operator (fold, map) in the code above will be evaluated every 5
>> seconds.
>> Am I mis-understand something?
>>
>> Regards,
>> Hironori
>

Reply via email to