Hi, if you are doing the windows not for their actual semantics I would suggest not using count based windows and also not using the *All windows. The *All windows are all non-parallel, i.e. you always only get one parallel instance of your window operator even if you have a huge cluster.
Also, in most cases it is better to not use a plain WindowFunction with apply because all elements have to be buffered so that they can be passed as an Iterable, Iterable<Long> in your example. If you can, I would suggest to use a ReduceFunction or FoldFunction or an apply() with an incremental aggregation function: apply(ReduceFunction, WindowFunction) or apply(FoldFunction, WindowFunction). These allow incremental aggregation of the result as elements arrive and don't require buffering of all elements until the window fires. Cheers, Aljoscha On Thu, 21 Apr 2016 at 16:53 Kostya Kulagin <kkula...@gmail.com> wrote: > Maybe if it is not the first time it worth considering adding this thing > as an option? ;-) > > My usecase - I have a pretty big amount of data basically for ETL. It is > finite but it is big. I see it more as a stream not as a dataset. Also I > would re-use the same code for infinite stream later... > And I do not much care about exact window size - it is just for > performance reasons I create a windows. > > Anyways - that you for the responses! > > > On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> People have wondered about that a few times, yes. My opinion is that a >> stream is potentially infinite and processing only stops for anomalous >> reasons: when the job crashes, when stopping a job to later redeploy it. In >> those cases you would not want to flush out your data but keep them and >> restart from the same state when the job is restarted. >> >> You can implement the behavior by writing a custom Trigger that behaves >> like the count trigger but also fires when receiving a Long.MAX_VALUE >> watermark. A watermark of Long.MAX_VALUE signifies that a source has >> stopped processing for natural reasons. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote: >> >>> Thanks, >>> >>> I wonder wouldn't it be good to have a built-in such functionality. At >>> least when incoming stream is finished - flush remaining elements. >>> >>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> yes, you can achieve this by writing a custom Trigger that can trigger >>>> both on the count or after a long-enough timeout. It would be a combination >>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you >>>> could look to those to get started. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote: >>>> >>>>> I have a pretty big but final stream and I need to be able to window >>>>> it by number of elements. >>>>> In this case from my observations flink can 'skip' the latest chunk of >>>>> data if it has lower amount of elements than window size: >>>>> >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStreamSource<Long> source = env.addSource(new >>>>> SourceFunction<Long>() { >>>>> >>>>> @Override >>>>> public void run(SourceContext<Long> ctx) throws Exception { >>>>> LongStream.range(0, 35).forEach(ctx::collect); >>>>> } >>>>> >>>>> @Override >>>>> public void cancel() { >>>>> >>>>> } >>>>> }); >>>>> >>>>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>>>> GlobalWindow>() { >>>>> @Override >>>>> public void apply(GlobalWindow window, Iterable<Long> values, >>>>> Collector<Long> out) throws Exception { >>>>> System.out.println(Joiner.on(',').join(values)); >>>>> } >>>>> }).print(); >>>>> >>>>> env.execute("yoyoyo"); >>>>> >>>>> >>>>> Output: >>>>> 0,1,2,3,4,5,6,7,8,9 >>>>> 10,11,12,13,14,15,16,17,18,19 >>>>> 20,21,22,23,24,25,26,27,28,29 >>>>> >>>>> I.e. elements from 10 to 35 are not being processed. >>>>> >>>>> Does it make sense to have: count OR timeout window which will evict >>>>> new window when number of elements reach a threshold OR collecting timeout >>>>> occurs? >>>>> >>>> >>> >