Hi All,

How are timestamps treated within an iterative DataStream loop within Flink?

For example, here is an example of a simple iterative loop within Flink where 
the feedback loop is of a different type to the input stream:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream 
= inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new 
OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = 
iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> 
out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, 
Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

My questions revolve around how does Flink use timestamps within a feedback 
loop:

  *   Within the ConnectedIterativeStreams, how does Flink treat ordering of 
the input objects across the streams of regular inputs and feedback objects? If 
I emit an object into the feedback loop, when will it be seen by the head of 
the loop with respect to the regular stream of input objects?
  *   How does the behaviour change when using event time processing?

Many thanks,

John

Question also posted to StackOverflow here: 
https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops

Reply via email to