Hi John,

    As a whole, I think currently Flink does not have special mechanism for 
event-time in iteration. This means the IterationHead treats the initial input 
and the feedback input as two normal inputs and use the same mechanism with the 
tasks outside the iteration.

    This may cause disorder of the event-time inside the iteration. The event 
time relies on the watermark alignment mechanism to mark the least event-time 
of the following records. Suppose we have a watermark with event-time 10, The 
iteration head will first receive the  watermark from the initial input, and 
then receive it again from the feedback input after the first round of 
iteration. Then IterationHead will think the watermarks have aligned at the 
event-time 10, so it will emits the watermark with event-time 10 to the final 
output, which means that it will not receive and emit records whose event-time 
is less than 10. However, since the records may iterate multiple rounds, the 
IterationHead may still receive the records whose event-time is less than 10 
again in the following rounds of iteration. Then the disorder of the event-time 
occurs.

Best,
Yun Gao


------------------------------------------------------------------
From:John Tipper <john_tip...@hotmail.com>
Send Time:2019 Jun. 8 (Sat.) 21:19
To:user@flink.apache.org <user@flink.apache.org>
Subject:How are timestamps treated within an iterative DataStream loop within 
Flink?


 Hi All,
 How are timestamps treated within an iterative DataStream loop within Flink?
 For example, here is an example of a simple iterative loop within Flink where 
the feedback loop is of a different type to the input stream:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream 
= inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new 
OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = 
iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> 
out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, 
Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
 My questions revolve around how does Flink use timestamps within a feedback 
loop:
Within the ConnectedIterativeStreams, how does Flink treat ordering of the 
input objects across the streams of regular inputs and feedback objects? If I 
emit an object into the feedback loop, when will it be seen by the head of the 
loop with respect to the regular stream of input objects?
How does the behaviour change when using event time processing?

 Many thanks,

 John

 Question also posted to StackOverflow here: 
https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops

Reply via email to