I was trying to implement this (force flink to handle all values from input) but had no success... Probably I am not getting smth with flink windowing mechanism I've created my 'finishing' trigger which is basically a copy of purging trigger
But was not able to make it work: https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 I was never able to see numbers from 30 to 34 in result. What am I doing wrong? On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > People have wondered about that a few times, yes. My opinion is that a > stream is potentially infinite and processing only stops for anomalous > reasons: when the job crashes, when stopping a job to later redeploy it. In > those cases you would not want to flush out your data but keep them and > restart from the same state when the job is restarted. > > You can implement the behavior by writing a custom Trigger that behaves > like the count trigger but also fires when receiving a Long.MAX_VALUE > watermark. A watermark of Long.MAX_VALUE signifies that a source has > stopped processing for natural reasons. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: > >> Thanks, >> >> I wonder wouldn't it be good to have a built-in such functionality. At >> least when incoming stream is finished - flush remaining elements. >> >> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> yes, you can achieve this by writing a custom Trigger that can trigger >>> both on the count or after a long-enough timeout. It would be a combination >>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you >>> could look to those to get started. >>> >>> Cheers, >>> Aljoscha >>> >>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: >>> >>>> I have a pretty big but final stream and I need to be able to window it >>>> by number of elements. >>>> In this case from my observations flink can 'skip' the latest chunk of >>>> data if it has lower amount of elements than window size: >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStreamSource<Long> source = env.addSource(new >>>> SourceFunction<Long>() { >>>> >>>> @Override >>>> public void run(SourceContext<Long> ctx) throws Exception { >>>> LongStream.range(0, 35).forEach(ctx::collect); >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> >>>> } >>>> }); >>>> >>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>> GlobalWindow>() { >>>> @Override >>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>> Collector<Long> out) throws Exception { >>>> System.out.println(Joiner.on(',').join(values)); >>>> } >>>> }).print(); >>>> >>>> env.execute("yoyoyo"); >>>> >>>> >>>> Output: >>>> 0,1,2,3,4,5,6,7,8,9 >>>> 10,11,12,13,14,15,16,17,18,19 >>>> 20,21,22,23,24,25,26,27,28,29 >>>> >>>> I.e. elements from 10 to 35 are not being processed. >>>> >>>> Does it make sense to have: count OR timeout window which will evict >>>> new window when number of elements reach a threshold OR collecting timeout >>>> occurs? >>>> >>> >>