Hello Aljoscha,

Fortunately, I found the program in Google's caches :) I've attached
below for reference. I'm stunned by how accurately you have hit the
point given the few pieces of information I left in the original text. +1

Yes, it's exactly as you explained. Can you think of a scenario where it
would lead to reasonable results if a user placed the
time-extraction/watermark-generation (directly or indirectly) after a
union operation? So far I couldn't and start believing that at least
it'd be nice to warn the user if he tries to do so.

Many thanks for you analysis and the time,

> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import java.util.Arrays;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> public class TimestampAssignmentTest {
>   public static void main(String[] args) throws Exception {
>     runTest();
>   }
>   private static void runTest() throws Exception {
>     List<Tuple2<String, Long>> left = Arrays.asList(Tuple2.of("one", 1L), 
> Tuple2.of("two", 3L));
>     List<Tuple2<String, Long>> right = Arrays.asList(Tuple2.of("three", 2L), 
> Tuple2.of("four", 4L));
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.getConfig().setAutoWatermarkInterval(10);
>     env.setParallelism(1);
>     // ~ a very fast source
>     SingleOutputStreamOperator<Tuple2<String, Long>> leftInput =
>         env.addSource(new DelayedSourceFunction<>(left, Time.milliseconds(0)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
>     // ~ a very slow source
>     SingleOutputStreamOperator<Tuple2<String, Long>> rightInput =
>         env.addSource(new DelayedSourceFunction<>(right, Time.seconds(10)))
>         .returns(new TypeHint<Tuple2<String, Long>>() {});
>     leftInput.union(rightInput)
>         .assignTimestampsAndWatermarks(new 
> EventTimeExtractor<>(Time.milliseconds(0)))
>         .map(t -> t.f0).returns(new TypeHint<String>() {})
>         .timeWindowAll(Time.milliseconds(3))
>         .reduce((ReduceFunction<String>) (s, t) -> s + "|" + t).returns(new 
> TypeHint<String>() {})
>         .print();
>     env.execute();
>   }
>   static class EventTimeExtractor<I>
>     extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<I, Long>> {
>     EventTimeExtractor(Time maxOutOfOrderness) {
>       super(maxOutOfOrderness);
>     }
>     @Override
>     public long extractTimestamp(Tuple2<I, Long> element) {
>       return element.f1;
>     }
>   }
>   static class DelayedSourceFunction<I> implements SourceFunction<I> {
>     private volatile boolean running;
>     private final List<I> elems;
>     private final long delayMillis;
>     DelayedSourceFunction(List<I> elems, Time delay) {
>       this.elems = elems;
>       this.delayMillis = delay.toMilliseconds();
>     }
>     @Override
>     public void run(SourceContext<I> ctx) throws Exception {
>       running = true;
>       for (I elem : elems) {
>         if (!running) {
>           break;
>         }
>         delay();
>         ctx.collect(elem);
>       }
>     }
>     private void delay() throws InterruptedException {
>       if (delayMillis > 0) {
>         long start = System.nanoTime();
>         while (true) {
>           long curr = System.nanoTime();
>           long waitMillis = TimeUnit.NANOSECONDS.toMillis(curr - start);
>           if (waitMillis < delayMillis) {
>             Thread.sleep(delayMillis - waitMillis);
>           } else {
>             break;
>           }
>         }
>       }
>     }
>     @Override
>     public void cancel() {
>       running = false;
>     }
>   }
> }

On 06/14/2017 04:02 PM, Aljoscha Krettek wrote:
> Hi Petr,
> I just stumbled across this (slightly older) mail. Your example on pastebin 
> is not available anymore but I’m guessing you have roughly these two 
> topologies:
> 1.
> Source1 -> Map1 -> ExtractTimestamps -| 
>                                                                 | ->  Map3 …
> Source2 -> Map2 -> ExtractTimestamps -|
> The union is not visible at the graph level, it’s implicit in the combination 
> of the two input streams.
> 2.
> Source1 -> Map1 -| 
>                             | -> ExtractTimestamps -> Map3 …
> Source2 -> Map2 -|
> The union is not visible at the graph level, it’s implicit in the combination 
> of the two input streams.
> I’m also guessing that you have a timestamp/watermark assigner where the 
> watermark is the highest-seen timestamp minus some lateness bound. I think 
> the behaviour is not necessarily an artefact of the Flink implementation 
> (with maps and extractors being fused together) but results from the graph 
> itself and how watermarks are defined and how the extractor works: in the 
> first case, each stream (before the union) has its own watermark and the 
> watermark at Map3 is the minimum over those watermarks. This explains why a 
> lower watermark on the one stream holds back the watermark in total at Map3. 
> In the second case, the two streams are unioned together before extracting a 
> timestamp/watermark and the choice of timestamp extractor (which takes the 
> highest-seen timestamp) means that the watermark now advances “faster” 
> because there is logically not a slower, separate stream anymore.
> Is that analysis correct? Does my description roughly make sense?
> Best,
> Aljoscha
>> On 6. May 2017, at 15:00, Petr Novotnik <petr.novot...@firma.seznam.cz> 
>> wrote:
>> Hello Flinkers,
>> Given this small example program:
>>> https://pastebin.com/30JbbgpH
>> I'd expect the output:
>>> one|three
>>> two|four
>> However, I consistently receive ...
>>> one
>>> two|four
>> ... due to "three" being considered a late-comer which then gets
>> discarded. When I remove `assignTimestampsAndWatermarks` after the
>> `union` and place it separately on each of the union's inputs, i.e.
>> before the `union`, I get what I expect.
>> Now, after digging through Flink's source code, this behavior actually
>> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
>> operators form one task). Though, from a user/api perspective, it is at
>> least surprising.
>> I wanted to ask whether kind of behavior is known, indented or maybe
>> something to be improved to avoid the gotcha?
>> Many thanks in advance,
>> Pete.

