Hi All,
How are timestamps treated within an iterative DataStream loop within Flink?
For example, here is an example of a simple iterative loop within Flink where
the feedback loop is of a different type to the input stream:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream
= inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new
OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams =
iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
@Override
public void processElement1(MyInput value, Context ctx, Collector<MyOutput>
out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}
@Override
public void processElement2(MyFeedback value, Context ctx,
Collector<MyOutput> out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
My questions revolve around how does Flink use timestamps within a feedback
loop:
* Within the ConnectedIterativeStreams, how does Flink treat ordering of
the input objects across the streams of regular inputs and feedback objects? If
I emit an object into the feedback loop, when will it be seen by the head of
the loop with respect to the regular stream of input objects?
* How does the behaviour change when using event time processing?
Many thanks,
John
Question also posted to StackOverflow here:
https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops