Great to hear that this worked out for you :) Progression of watermarks on an empty stream is a known issue, that we are working on to resolve in the future. Usually recommended workarounds are to send a custom blank event (which should be ignored) once a while.
I have expanded the documentation: https://github.com/apache/flink/pull/6076 <https://github.com/apache/flink/pull/6076> Please check it and If you have any further suggestions you are welcome to make a comments in the PR. I hope it clarifies the behaviour. Piotrek > On 25 May 2018, at 00:03, Elias Levy <fearsome.lucid...@gmail.com> wrote: > > On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com > <mailto:fearsome.lucid...@gmail.com>> wrote: > On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > From top of my head I can imagine two solutions: > > 1. Override the default behaviour of the operator via for example > org.apache.flink.streaming.api.datastream.ConnectedStreams#transform > > That seems the safer, but more complicated path. > > As we had already implemented the business logic in a RichCoFlatMapFunction, > I ended up extending CoStreamFlatMap: > > class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: > CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper) { > > // Pass through the watermarks from the first stream > override def processWatermark1(mark: Watermark): Unit = > processWatermark(mark) > > // Ignore watermarks from the second stream > override def processWatermark2(mark: Watermark): Unit = {} > } > > > Then it was easy to replace: > > stream1 > .connect(stream2) > .flatMap( new BusinessCoFlatMapFunction(params) ) > .name("Operator") > .uid("op") > > with: > > stream1 > .connect(stream2) > .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new > BusinessCoFlatMapFunction(params))) > .uid("op") > >