[ https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-6116: ---------------------------------- Priority: Critical (was: Blocker) > Watermarks don't work when unioning with same DataStream > -------------------------------------------------------- > > Key: FLINK-6116 > URL: https://issues.apache.org/jira/browse/FLINK-6116 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.3.0 > Reporter: Aljoscha Krettek > Priority: Critical > > In this example job we don't get any watermarks in the {{WatermarkObserver}}: > {code} > public class WatermarkTest { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.getConfig().setAutoWatermarkInterval(1000); > env.setParallelism(1); > DataStreamSource<String> input = env.addSource(new > SourceFunction<String>() { > @Override > public void run(SourceContext<String> ctx) throws > Exception { > while (true) { > ctx.collect("hello!"); > Thread.sleep(800); > } > } > @Override > public void cancel() { > } > }); > input.union(input) > .flatMap(new IdentityFlatMap()) > .transform("WatermarkOp", > BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); > env.execute(); > } > public static class WatermarkObserver > extends AbstractStreamOperator<String> > implements OneInputStreamOperator<String, String> { > @Override > public void processElement(StreamRecord<String> element) throws > Exception { > System.out.println("GOT ELEMENT: " + element); > } > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("GOT WATERMARK: " + mark); > } > } > private static class IdentityFlatMap > extends RichFlatMapFunction<String, String> { > @Override > public void flatMap(String value, Collector<String> out) throws > Exception { > out.collect(value); > } > } > } > {code} > When commenting out the `union` it works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)