Hi Peter, Let me try to explain this.
As you shown in the examples, the iterate method takes a function, which "split" the initial stream into two separate streams, i.e., initialStream => (stream1, stream2). The stream2 works as the output stream, whose results will be emitted to the successor operators (PrintSink in your example), while the stream1 works as a feedback stream, whose results will be resent to the iterate operator. In your codes, all the the long values will subtract 1 and be sent back to the iterate operator, endlessly. Try replacing your first map function to (_ + 1) and you'll see the infinite results. For more information, you can refer to this <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#iterations> or read the javadoc. Hope that helps. Best, Xingcan On Fri, Sep 1, 2017 at 5:29 PM, Peter Ertl <peter.e...@gmx.net> wrote: > Hi folks, > > I was doing some experiments with DataStream#iterate and what felt strange > to me is the fact that #iterate() does not terminate on it's own when > consuming a _finite_ stream. > > I think this is awkward und unexpected. Only thing that "helped" was > setting an arbitrary and meaningless timeout on iterate. > > Imho this should not be necessary (maybe sent an internal "poison message" > downward the iteration stream to signal shutdown of the streaming task?) > > example: > > // --------------------------------------------------- > > // does terminate by introducing a meaningless timeout > // --------------------------------------------------- > val iterationResult1 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump > meaningless 'x' chars just to do anything > }, 1000, keepPartitioning = false) > > iterationResult1.print() > > // --------------------------------------------------- > // does NEVER terminate > // --------------------------------------------------- > val iterationResult2 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump > meaningless 'y' chars just to do anything > }) > iterationResult2.print() > > > Can someone elaborate on this - should I file a ticket? > > Regards > Peter >