Aljoscha Krettek created FLINK-6116: ---------------------------------------
Summary: Watermarks don't work when unioning with same DataStream Key: FLINK-6116 URL: https://issues.apache.org/jira/browse/FLINK-6116 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.2.0, 1.3.0 Reporter: Aljoscha Krettek In this example job we don't get any watermarks in the {{WatermarkObserver}}: {code} public class WatermarkTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.getConfig().setAutoWatermarkInterval(1000); env.setParallelism(1); DataStreamSource<String> input = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { ctx.collect("hello!"); Thread.sleep(800); } } @Override public void cancel() { } }); input.union(input) .flatMap(new IdentityFlatMap()) .transform("WatermarkOp", BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); env.execute(); } public static class WatermarkObserver extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord<String> element) throws Exception { System.out.println("GOT ELEMENT: " + element); } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); System.out.println("GOT WATERMARK: " + mark); } } private static class IdentityFlatMap extends RichFlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) throws Exception { out.collect(value); } } } {code} When commenting out the `union` it works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)