Hi,
yes, you can achieve this by writing a custom Trigger that can trigger both
on the count or after a long-enough timeout. It would be a combination of
CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you could
look to those to get started.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote:

> I have a pretty big but final stream and I need to be able to window it by
> number of elements.
> In this case from my observations flink can 'skip' the latest chunk of
> data if it has lower amount of elements than window size:
>
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {
>
>       @Override
>       public void run(SourceContext<Long> ctx) throws Exception {
>         LongStream.range(0, 35).forEach(ctx::collect);
>       }
>
>       @Override
>       public void cancel() {
>
>       }
>     });
>
>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
> GlobalWindow>() {
>       @Override
>       public void apply(GlobalWindow window, Iterable<Long> values, 
> Collector<Long> out) throws Exception {
>         System.out.println(Joiner.on(',').join(values));
>       }
>     }).print();
>
>     env.execute("yoyoyo");
>
>
> Output:
> 0,1,2,3,4,5,6,7,8,9
> 10,11,12,13,14,15,16,17,18,19
> 20,21,22,23,24,25,26,27,28,29
>
> I.e. elements from 10 to 35 are not being processed.
>
> Does it make sense to have: count OR timeout window which will evict new
> window when number of elements reach a threshold OR collecting timeout
> occurs?
>

Reply via email to