Hi Pierre, It seems that the community is working on providing a fix with the next 1.11 bugfix release (and for 1.12). You can follow the status of the ticket here: https://issues.apache.org/jira/browse/FLINK-18934
Best, Robert On Thu, Sep 10, 2020 at 11:00 AM Pierre Bedoucha <pierre.bedou...@tv2.no> wrote: > Hi and thank you for this thread, > > I'm also experimenting the same valid bug/limitation when connecting > streams. > > I had a quick look in the annoucments but couldn't find any more > information: Would it be planned to propagate the Idle stream status to the > operator in the upcoming Flink minor versions/releases? > > If not, would there be a way around it other than emitting beats in the > idle > stream? > > Best, > Pierre > > > Aljoscha Krettek wrote > > I can only agree with Dawid, who explained it better than me... 😅 > > > > Aljoscha > > > > On 31.08.20 12:10, Dawid Wysakowicz wrote: > >> Hey Arvid, > >> > >> The problem is that the StreamStatus.IDLE is set on the Task level. It > >> is not propagated to the operator. Combining of the Watermark for a > >> TwoInputStreamOperator happens in the AbstractStreamOperator: > >> > >> public void processWatermark(Watermark mark) throws Exception { > >> if (timeServiceManager != null) { > >> timeServiceManager.advanceWatermark(mark); > >> } > >> output.emitWatermark(mark); > >> } > >> > >> public void processWatermark1(Watermark mark) throws Exception { > >> input1Watermark = mark.getTimestamp(); > >> long newMin = Math.min(input1Watermark, input2Watermark); > >> if (newMin > combinedWatermark) { > >> combinedWatermark = newMin; > >> processWatermark(new Watermark(combinedWatermark)); > >> } > >> } > >> > >> public void processWatermark2(Watermark mark) throws Exception { > >> input2Watermark = mark.getTimestamp(); > >> long newMin = Math.min(input1Watermark, input2Watermark); > >> if (newMin > combinedWatermark) { > >> combinedWatermark = newMin; > >> processWatermark(new Watermark(combinedWatermark)); > >> } > >> } > >> > >> There we do not know that e.g. the whole input 1 is idle. Therefore if > >> we do not receive any Watermarks from it (it became IDLE) we do not > >> progress the Watermark starting from any two input operator. We are > >> missing similar handling of the IDLE status from the task level which > >> works well for one input operators and multiple parallel upstream > >> instances. > >> > >> Best, > >> > >> Dawid > >> > >> > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >