Hi, yes, you can achieve this by writing a custom Trigger that can trigger both on the count or after a long-enough timeout. It would be a combination of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could look to those to get started.
Cheers, Aljoscha On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: > I have a pretty big but final stream and I need to be able to window it by > number of elements. > In this case from my observations flink can 'skip' the latest chunk of > data if it has lower amount of elements than window size: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() { > > @Override > public void run(SourceContext<Long> ctx) throws Exception { > LongStream.range(0, 35).forEach(ctx::collect); > } > > @Override > public void cancel() { > > } > }); > > source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, > GlobalWindow>() { > @Override > public void apply(GlobalWindow window, Iterable<Long> values, > Collector<Long> out) throws Exception { > System.out.println(Joiner.on(',').join(values)); > } > }).print(); > > env.execute("yoyoyo"); > > > Output: > 0,1,2,3,4,5,6,7,8,9 > 10,11,12,13,14,15,16,17,18,19 > 20,21,22,23,24,25,26,27,28,29 > > I.e. elements from 10 to 35 are not being processed. > > Does it make sense to have: count OR timeout window which will evict new > window when number of elements reach a threshold OR collecting timeout > occurs? >