I finally was able to do that. Kinda ugly, but works: https://gist.github.com/krolen/ed1344e4d7be5b2116061685268651f5
On Fri, Apr 22, 2016 at 6:14 PM, Konstantin Kulagin <kkula...@gmail.com> wrote: > I was trying to implement this (force flink to handle all values from > input) but had no success... > Probably I am not getting smth with flink windowing mechanism > I've created my 'finishing' trigger which is basically a copy of purging > trigger > > But was not able to make it work: > > https://gist.github.com/krolen/9e6ba8b14c54554bfbc10fdfa6fe7308 > > I was never able to see numbers from 30 to 34 in result. > What am I doing wrong? > > > On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> People have wondered about that a few times, yes. My opinion is that a >> stream is potentially infinite and processing only stops for anomalous >> reasons: when the job crashes, when stopping a job to later redeploy it. In >> those cases you would not want to flush out your data but keep them and >> restart from the same state when the job is restarted. >> >> You can implement the behavior by writing a custom Trigger that behaves >> like the count trigger but also fires when receiving a Long.MAX_VALUE >> watermark. A watermark of Long.MAX_VALUE signifies that a source has >> stopped processing for natural reasons. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: >> >>> Thanks, >>> >>> I wonder wouldn't it be good to have a built-in such functionality. At >>> least when incoming stream is finished - flush remaining elements. >>> >>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> yes, you can achieve this by writing a custom Trigger that can trigger >>>> both on the count or after a long-enough timeout. It would be a combination >>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you >>>> could look to those to get started. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: >>>> >>>>> I have a pretty big but final stream and I need to be able to window >>>>> it by number of elements. >>>>> In this case from my observations flink can 'skip' the latest chunk of >>>>> data if it has lower amount of elements than window size: >>>>> >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStreamSource<Long> source = env.addSource(new >>>>> SourceFunction<Long>() { >>>>> >>>>> @Override >>>>> public void run(SourceContext<Long> ctx) throws Exception { >>>>> LongStream.range(0, 35).forEach(ctx::collect); >>>>> } >>>>> >>>>> @Override >>>>> public void cancel() { >>>>> >>>>> } >>>>> }); >>>>> >>>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>>> GlobalWindow>() { >>>>> @Override >>>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>>> Collector<Long> out) throws Exception { >>>>> System.out.println(Joiner.on(',').join(values)); >>>>> } >>>>> }).print(); >>>>> >>>>> env.execute("yoyoyo"); >>>>> >>>>> >>>>> Output: >>>>> 0,1,2,3,4,5,6,7,8,9 >>>>> 10,11,12,13,14,15,16,17,18,19 >>>>> 20,21,22,23,24,25,26,27,28,29 >>>>> >>>>> I.e. elements from 10 to 35 are not being processed. >>>>> >>>>> Does it make sense to have: count OR timeout window which will evict >>>>> new window when number of elements reach a threshold OR collecting timeout >>>>> occurs? >>>>> >>>> >>> >