Hi,
which version of Flink are you using? Maybe there is a bug. I've tested it
on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of
parallelism if I change the source to emit 30 elements:
LongStream.range(0, 30).forEach(ctx::collect);

(The second argument of LongStream.range(start, end) is exclusive)

Cheers,
Aljoscha



On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkula...@gmail.com> wrote:

> Actually this is not true - the source emits 30 values since it is started
> with 0. If I change 29 to 33 result will be the same.
> I can get all values if I play with parallelism. I.e putting parallel 1
> before print.
> Or if I change 29 to 39 ( I have 4 cors)
> I can guess that there is smth wrong with threads. BTW in this case how
> threads are created and how data flows between?
> On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>
>> Hi,
>> this is related to your other question about count windows. The source
>> emits 29 values so we only have two count-windows with 10 elements each.
>> The last window is never triggered.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkula...@gmail.com> wrote:
>>
>>> I think it has smth to do with parallelism and I probably do not have
>>> clear understanding how parallelism works in flink but in this example:
>>>
>>>     StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     DataStreamSource<Long> source = env.addSource(new 
>>> SourceFunction<Long>() {
>>>
>>>       @Override
>>>       public void run(SourceContext<Long> ctx) throws Exception {
>>>         LongStream.range(0, 29).forEach(ctx::collect);
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>
>>>       }
>>>     });
>>>
>>>     source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, 
>>> GlobalWindow>() {
>>>       @Override
>>>       public void apply(GlobalWindow window, Iterable<Long> values, 
>>> Collector<Long> out) throws Exception {
>>>         for (Long value : values) {
>>>           if (value % 3 == 0) {
>>>             out.collect(value);
>>>           }
>>>         }
>>>       }
>>>     }).print();
>>>
>>>     env.execute("yoyoyo");
>>>
>>> Why my output is like this:
>>>
>>> 4> 9
>>> 1> 0
>>> 1> 12
>>> 3> 6
>>> 3> 18
>>> 2> 3
>>> 2> 15
>>>
>>> ? I.e. where id s value of 24 for example? I expect to see it. What am I
>>> doing wrong?
>>>
>>

Reply via email to