Hello Aljoscha, Fortunately, I found the program in Google's caches :) I've attached below for reference. I'm stunned by how accurately you have hit the point given the few pieces of information I left in the original text. +1
Yes, it's exactly as you explained. Can you think of a scenario where it would lead to reasonable results if a user placed the time-extraction/watermark-generation (directly or indirectly) after a union operation? So far I couldn't and start believing that at least it'd be nice to warn the user if he tries to do so. Many thanks for you analysis and the time, Pete. > import org.apache.flink.api.common.functions.ReduceFunction; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.windowing.time.Time; > > import java.util.Arrays; > import java.util.List; > import java.util.concurrent.TimeUnit; > > public class TimestampAssignmentTest { > > public static void main(String[] args) throws Exception { > runTest(); > } > > private static void runTest() throws Exception { > List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L), > Tuple2.of("two", 3L)); > List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three", 2L), > Tuple2.of("four", 4L)); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(10); > env.setParallelism(1); > > // ~ a very fast source > SingleOutputStreamOperator<Tuple2<String, Long>> leftInput = > env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0))) > .returns(new TypeHint<Tuple2<String, Long>>() {}); > // ~ a very slow source > SingleOutputStreamOperator<Tuple2<String, Long>> rightInput = > env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10))) > .returns(new TypeHint<Tuple2<String, Long>>() {}); > > leftInput.union(rightInput) > .assignTimestampsAndWatermarks(new > EventTimeExtractor<>(Time.milliseconds(0))) > .map(t -> t.f0).returns(new TypeHint<String>() {}) > .timeWindowAll(Time.milliseconds(3)) > .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new > TypeHint<String>() {}) > .print(); > > env.execute(); > } > > static class EventTimeExtractor<I> > extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> { > > EventTimeExtractor(Time maxOutOfOrderness) { > super(maxOutOfOrderness); > } > > @Override > public long extractTimestamp(Tuple2<I, Long> element) { > return element.f1; > } > } > > static class DelayedSourceFunction<I> implements SourceFunction<I> { > private volatile boolean running; > private final List<I> elems; > private final long delayMillis; > > DelayedSourceFunction(List<I> elems, Time delay) { > this.elems = elems; > this.delayMillis = delay.toMilliseconds(); > } > > @Override > public void run(SourceContext<I> ctx) throws Exception { > running = true; > for (I elem : elems) { > if (!running) { > break; > } > delay(); > ctx.collect(elem); > } > } > > private void delay() throws InterruptedException { > if (delayMillis > 0) { > long start = System.nanoTime(); > while (true) { > long curr = System.nanoTime(); > long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start); > if (waitMillis < delayMillis) { > Thread.sleep(delayMillis - waitMillis); > } else { > break; > } > } > } > } > > @Override > public void cancel() { > running = false; > } > } > } On 06/14/2017 04:02 PM, Aljoscha Krettek wrote: > Hi Petr, > > I just stumbled across this (slightly older) mail. Your example on pastebin > is not available anymore but I’m guessing you have roughly these two > topologies: > > 1. > > Source1 -> Map1 -> ExtractTimestamps -| > | -> Map3 … > Source2 -> Map2 -> ExtractTimestamps -| > > The union is not visible at the graph level, it’s implicit in the combination > of the two input streams. > > 2. > > Source1 -> Map1 -| > | -> ExtractTimestamps -> Map3 … > Source2 -> Map2 -| > > The union is not visible at the graph level, it’s implicit in the combination > of the two input streams. > > I’m also guessing that you have a timestamp/watermark assigner where the > watermark is the highest-seen timestamp minus some lateness bound. I think > the behaviour is not necessarily an artefact of the Flink implementation > (with maps and extractors being fused together) but results from the graph > itself and how watermarks are defined and how the extractor works: in the > first case, each stream (before the union) has its own watermark and the > watermark at Map3 is the minimum over those watermarks. This explains why a > lower watermark on the one stream holds back the watermark in total at Map3. > In the second case, the two streams are unioned together before extracting a > timestamp/watermark and the choice of timestamp extractor (which takes the > highest-seen timestamp) means that the watermark now advances “faster” > because there is logically not a slower, separate stream anymore. > > Is that analysis correct? Does my description roughly make sense? > > Best, > Aljoscha > >> On 6. May 2017, at 15:00, Petr Novotnik <petr.novot...@firma.seznam.cz> >> wrote: >> >> Hello Flinkers, >> >> Given this small example program: >> >>> https://pastebin.com/30JbbgpH >> >> I'd expect the output: >> >>> one|three >>> two|four >> >> However, I consistently receive ... >> >>> one >>> two|four >> >> ... due to "three" being considered a late-comer which then gets >> discarded. When I remove `assignTimestampsAndWatermarks` after the >> `union` and place it separately on each of the union's inputs, i.e. >> before the `union`, I get what I expect. >> >> Now, after digging through Flink's source code, this behavior actually >> seems logical to me (since the `assignTimestampsAndWatermarks` and `map` >> operators form one task). Though, from a user/api perspective, it is at >> least surprising. >> >> I wanted to ask whether kind of behavior is known, indented or maybe >> something to be improved to avoid the gotcha? >> >> Many thanks in advance, >> Pete. >> >
signature.asc
Description: OpenPGP digital signature