Hi John,
As a whole, I think currently Flink does not have special mechanism for
event-time in iteration. This means the IterationHead treats the initial input
and the feedback input as two normal inputs and use the same mechanism with the
tasks outside the iteration.
This may cause disorder of the event-time inside the iteration. The event
time relies on the watermark alignment mechanism to mark the least event-time
of the following records. Suppose we have a watermark with event-time 10, The
iteration head will first receive the watermark from the initial input, and
then receive it again from the feedback input after the first round of
iteration. Then IterationHead will think the watermarks have aligned at the
event-time 10, so it will emits the watermark with event-time 10 to the final
output, which means that it will not receive and emit records whose event-time
is less than 10. However, since the records may iterate multiple rounds, the
IterationHead may still receive the records whose event-time is less than 10
again in the following rounds of iteration. Then the disorder of the event-time
occurs.
Best,
Yun Gao
------------------------------------------------------------------
From:John Tipper <[email protected]>
Send Time:2019 Jun. 8 (Sat.) 21:19
To:[email protected] <[email protected]>
Subject:How are timestamps treated within an iterative DataStream loop within
Flink?
Hi All,
How are timestamps treated within an iterative DataStream loop within Flink?
For example, here is an example of a simple iterative loop within Flink where
the feedback loop is of a different type to the input stream:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream
= inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new
OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams =
iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
@Override
public void processElement1(MyInput value, Context ctx, Collector<MyOutput>
out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}
@Override
public void processElement2(MyFeedback value, Context ctx,
Collector<MyOutput> out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
My questions revolve around how does Flink use timestamps within a feedback
loop:
Within the ConnectedIterativeStreams, how does Flink treat ordering of the
input objects across the streams of regular inputs and feedback objects? If I
emit an object into the feedback loop, when will it be seen by the head of the
loop with respect to the regular stream of input objects?
How does the behaviour change when using event time processing?
Many thanks,
John
Question also posted to StackOverflow here:
https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops