[ https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327285#comment-16327285 ]
ASF GitHub Bot commented on FLINK-6116: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4649#discussion_r161793648 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java --- @@ -143,7 +144,8 @@ protected void initializeInputs() throws IOException, InterruptedException { 2, new LinkedList<String>(), new BroadcastPartitioner<Object>(), - null /* output tag */); + null /* output tag */, + 1); --- End diff -- nit: formatting > Watermarks don't work when unioning with same DataStream > -------------------------------------------------------- > > Key: FLINK-6116 > URL: https://issues.apache.org/jira/browse/FLINK-6116 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.3.0 > Reporter: Aljoscha Krettek > Priority: Critical > > In this example job we don't get any watermarks in the {{WatermarkObserver}}: > {code} > public class WatermarkTest { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.getConfig().setAutoWatermarkInterval(1000); > env.setParallelism(1); > DataStreamSource<String> input = env.addSource(new > SourceFunction<String>() { > @Override > public void run(SourceContext<String> ctx) throws > Exception { > while (true) { > ctx.collect("hello!"); > Thread.sleep(800); > } > } > @Override > public void cancel() { > } > }); > input.union(input) > .flatMap(new IdentityFlatMap()) > .transform("WatermarkOp", > BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); > env.execute(); > } > public static class WatermarkObserver > extends AbstractStreamOperator<String> > implements OneInputStreamOperator<String, String> { > @Override > public void processElement(StreamRecord<String> element) throws > Exception { > System.out.println("GOT ELEMENT: " + element); > } > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("GOT WATERMARK: " + mark); > } > } > private static class IdentityFlatMap > extends RichFlatMapFunction<String, String> { > @Override > public void flatMap(String value, Collector<String> out) throws > Exception { > out.collect(value); > } > } > } > {code} > When commenting out the `union` it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)