Hi, I am running this following sample code to understand how iteration and broadcast works in streaming context.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); long i = 5; DataStream<Long> mainInput = env.generateSequence(2, 8); DataStream<Long> initialIterateInput = env.fromElements(i); IterativeStream.ConnectedIterativeStreams<Long, Long> iteration = mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO); DataStream<Long> iterateHead = iteration .flatMap(new CoFlatMapFunction<Long, Long, Long>() { long globalVal = 1; @Override public void flatMap1(Long value, Collector<Long> out) throws Exception { Thread.sleep(3000); System.out.println("SEEING FROM INPUT 1: " + value+", "+globalVal); //globalVal = globalVal + value; out.collect(globalVal+value); } @Override public void flatMap2(Long value, Collector<Long> out) throws Exception { Thread.sleep(1000); globalVal = value; System.out.println("SEEING FROM INPUT 2: " + value+", "+globalVal); //out.collect(value); } }); iteration.closeWith(iterateHead.broadcast()); iterateHead.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("SEEING OUTPUT FROM ITERATION: " + value); return value; } }); I was expecting that after out.collect(globalVal+value); is called the value would be broadcasted to every partition as given by the closewith statement. Also, i was expecting to get the broadcasted value to the flatmap2 function and then update the globalval in every partition. But rather than that, the values are not broadcasted and iterated properly as i was expecting and i am getting the following output, SEEING FROM INPUT 1: 2, 1 SEEING OUTPUT FROM ITERATION: 3 SEEING FROM INPUT 1: 3, 1 SEEING OUTPUT FROM ITERATION: 4 SEEING FROM INPUT 1: 4, 1 SEEING FROM INPUT 1: 5, 1 SEEING OUTPUT FROM ITERATION: 5 SEEING OUTPUT FROM ITERATION: 6 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 1: 6, 1 SEEING OUTPUT FROM ITERATION: 7 SEEING FROM INPUT 1: 7, 1 SEEING OUTPUT FROM ITERATION: 8 SEEING FROM INPUT 1: 8, 1 SEEING OUTPUT FROM ITERATION: 9 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 7, 7 Can anyone please explain why such behaviour? Why is the iteration happening after reading all the elements of the first input stream? what if it is an infinite stream, would the iteration wait for it to finish? Thanks and Regards -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.