Hi, which version of Flink are you using? Maybe there is a bug. I've tested it on the git master (1.1-SNAPSHOT) and it works fine with varying degrees of parallelism if I change the source to emit 30 elements: LongStream.range(0, 30).forEach(ctx::collect);
(The second argument of LongStream.range(start, end) is exclusive) Cheers, Aljoscha On Thu, 21 Apr 2016 at 11:44 Kostya Kulagin <kkula...@gmail.com> wrote: > Actually this is not true - the source emits 30 values since it is started > with 0. If I change 29 to 33 result will be the same. > I can get all values if I play with parallelism. I.e putting parallel 1 > before print. > Or if I change 29 to 39 ( I have 4 cors) > I can guess that there is smth wrong with threads. BTW in this case how > threads are created and how data flows between? > On Apr 21, 2016 4:50 AM, "Aljoscha Krettek" <aljos...@apache.org> wrote: > >> Hi, >> this is related to your other question about count windows. The source >> emits 29 values so we only have two count-windows with 10 elements each. >> The last window is never triggered. >> >> Cheers, >> Aljoscha >> >> On Wed, 20 Apr 2016 at 23:52 Kostya Kulagin <kkula...@gmail.com> wrote: >> >>> I think it has smth to do with parallelism and I probably do not have >>> clear understanding how parallelism works in flink but in this example: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStreamSource<Long> source = env.addSource(new >>> SourceFunction<Long>() { >>> >>> @Override >>> public void run(SourceContext<Long> ctx) throws Exception { >>> LongStream.range(0, 29).forEach(ctx::collect); >>> } >>> >>> @Override >>> public void cancel() { >>> >>> } >>> }); >>> >>> source.countWindowAll(10).apply(new AllWindowFunction<Long, Long, >>> GlobalWindow>() { >>> @Override >>> public void apply(GlobalWindow window, Iterable<Long> values, >>> Collector<Long> out) throws Exception { >>> for (Long value : values) { >>> if (value % 3 == 0) { >>> out.collect(value); >>> } >>> } >>> } >>> }).print(); >>> >>> env.execute("yoyoyo"); >>> >>> Why my output is like this: >>> >>> 4> 9 >>> 1> 0 >>> 1> 12 >>> 3> 6 >>> 3> 18 >>> 2> 3 >>> 2> 15 >>> >>> ? I.e. where id s value of 24 for example? I expect to see it. What am I >>> doing wrong? >>> >>