Hi Peter, That's a good idea, but may not be applicable with an iteration operator. The operator can not determine when to generate the "end-of-stream message" for the feedback stream. The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has no side-effects.
Best, Xingcan On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <peter.e...@gmx.net> wrote: > Hi Xingcan! > > if a _finite_ stream would, at the end, emit a special, trailing > "End-Of-Stream Message" that floats downward the operator stream, wouldn't > this enable us to deterministically end the iteration without needing a > timeout? > > Having an arbitrary timeout that must be longer than any iteration step > takes seems really awkward. > > What you think? > > Best regards > Peter > > > Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>: > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether > the feedback stream has reached its end > (though the data source is terminated, *there may > be unknowable subsequent data*) and that's why it needs a > timeout value to make the judgement, just like many other function calls > in network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the > related code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80> > ). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote: > >> >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>: >> >> In your codes, all the the long values will subtract 1 and be sent back >> to the iterate operator, endlessly. >> >> >> >> Is this true? shouldn't >> >> val iterationResult2 = env.generateSequence(1, 4).iterate(it => { >> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump >> meaningless 'y' chars just to do anything >> }) >> iterationResult2.print() >> >> >> produce the following _feedback_ streams? >> >> initial input to #iterate(): [1 2 3 4] >> >> iteration #1 : [1 2 3] >> iteration #2 : [1 2] >> iteration #3 : [1] >> iteration #4 : [] => empty feedback stream => cause termination? (which >> actually only happens when setting a timeout value) >> >> Best regards >> Peter >> > > > Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>: > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether > the feedback stream has reached its end > (though the data source is terminated, *there may > be unknowable subsequent data*) and that's why it needs a > timeout value to make the judgement, just like many other function calls > in network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the > related code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80> > ). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net> wrote: > >> >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com>: >> >> In your codes, all the the long values will subtract 1 and be sent back >> to the iterate operator, endlessly. >> >> >> >> Is this true? shouldn't >> >> val iterationResult2 = env.generateSequence(1, 4).iterate(it => { >> (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump >> meaningless 'y' chars just to do anything >> }) >> iterationResult2.print() >> >> >> produce the following _feedback_ streams? >> >> initial input to #iterate(): [1 2 3 4] >> >> iteration #1 : [1 2 3] >> iteration #2 : [1 2] >> iteration #3 : [1] >> iteration #4 : [] => empty feedback stream => cause termination? (which >> actually only happens when setting a timeout value) >> >> Best regards >> Peter >> >> >> > >