Hi All, How are timestamps treated within an iterative DataStream loop within Flink?
For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream: DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction()); IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class); // define an output tag so we can emit feedback objects via a side output final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){}; // now do some processing SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() { @Override public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception { // do some processing of the stream of MyInput values // emit MyOutput values downstream by calling out.collect() out.collect(someInstanceOfMyOutput); } @Override public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception { // do some more processing on the feedback classes // emit feedback items ctx.output(outputTag, someInstanceOfMyFeedback); } }); iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag)); My questions revolve around how does Flink use timestamps within a feedback loop: * Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects? * How does the behaviour change when using event time processing? Many thanks, John Question also posted to StackOverflow here: https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops