Maybe if it is not the first time it worth considering adding this thing as
an option? ;-)

My usecase - I have a pretty big amount of data basically for ETL. It is
finite but it is big. I see it more as a stream not as a dataset. Also I
would re-use the same code for infinite stream later...
And I do not much care about exact window size - it is just for performance
reasons I create a windows.

Anyways - that you for the responses!


On Thu, Apr 21, 2016 at 8:54 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> People have wondered about that a few times, yes. My opinion is that a
> stream is potentially infinite and processing only stops for anomalous
> reasons: when the job crashes, when stopping a job to later redeploy it. In
> those cases you would not want to flush out your data but keep them and
> restart from the same state when the job is restarted.
>
> You can implement the behavior by writing a custom Trigger that behaves
> like the count trigger but also fires when receiving a Long.MAX_VALUE
> watermark. A watermark of Long.MAX_VALUE signifies that a source has
> stopped processing for natural reasons.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:42 Kostya Kulagin <kkula...@gmail.com> wrote:
>
>> Thanks,
>>
>> I wonder wouldn't it be good to have a built-in such functionality. At
>> least when incoming stream is finished - flush remaining elements.
>>
>> On Thu, Apr 21, 2016 at 4:47 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> yes, you can achieve this by writing a custom Trigger that can trigger
>>> both on the count or after a long-enough timeout. It would be a combination
>>> of CountTrigger and EventTimeTrigger (or ProcessingTimeTrigger) so you
>>> could look to those to get started.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 23:44 Kostya Kulagin <kkula...@gmail.com> wrote:
>>>
>>>> I have a pretty big but final stream and I need to be able to window it
>>>> by number of elements.
>>>> In this case from my observations flink can 'skip' the latest chunk of
>>>> data if it has lower amount of elements than window size:
>>>>
>>>>     StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     DataStreamSource<Long> source = env.addSource(new 
>>>> SourceFunction<Long>() {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>>         LongStream.range(0, 35).forEach(ctx::collect);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>     });
>>>>
>>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>>> GlobalWindow>() {
>>>>       @Override
>>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>>> Collector<Long> out) throws Exception {
>>>>         System.out.println(Joiner.on(',').join(values));
>>>>       }
>>>>     }).print();
>>>>
>>>>     env.execute("yoyoyo");
>>>>
>>>>
>>>> Output:
>>>> 0,1,2,3,4,5,6,7,8,9
>>>> 10,11,12,13,14,15,16,17,18,19
>>>> 20,21,22,23,24,25,26,27,28,29
>>>>
>>>> I.e. elements from 10 to 35 are not being processed.
>>>>
>>>> Does it make sense to have: count OR timeout window which will evict
>>>> new window when number of elements reach a threshold OR collecting timeout
>>>> occurs?
>>>>
>>>
>>

Reply via email to