Hi Pierre,

It seems that the community is working on providing a fix with the next
1.11 bugfix release (and for 1.12). You can follow the status of the ticket
here: https://issues.apache.org/jira/browse/FLINK-18934

Best,
Robert

On Thu, Sep 10, 2020 at 11:00 AM Pierre Bedoucha <pierre.bedou...@tv2.no>
wrote:

> Hi and thank you for this thread,
>
> I'm also experimenting the same valid bug/limitation when connecting
> streams.
>
> I had a quick look in the annoucments but couldn't find any more
> information: Would it be planned to propagate the Idle stream status to the
> operator in the upcoming Flink minor versions/releases?
>
> If not, would there be a way around it other than emitting beats in the
> idle
> stream?
>
> Best,
> Pierre
>
>
> Aljoscha Krettek wrote
> > I can only agree with Dawid, who explained it better than me... 😅
> >
> > Aljoscha
> >
> > On 31.08.20 12:10, Dawid Wysakowicz wrote:
> >> Hey Arvid,
> >>
> >> The problem is that the StreamStatus.IDLE is set on the Task level. It
> >> is not propagated to the operator. Combining of the Watermark for a
> >> TwoInputStreamOperator happens in the AbstractStreamOperator:
> >>
> >>      public void processWatermark(Watermark mark) throws Exception {
> >>          if (timeServiceManager != null) {
> >>              timeServiceManager.advanceWatermark(mark);
> >>          }
> >>          output.emitWatermark(mark);
> >>      }
> >>
> >>      public void processWatermark1(Watermark mark) throws Exception {
> >>          input1Watermark = mark.getTimestamp();
> >>          long newMin = Math.min(input1Watermark, input2Watermark);
> >>          if (newMin > combinedWatermark) {
> >>              combinedWatermark = newMin;
> >>              processWatermark(new Watermark(combinedWatermark));
> >>          }
> >>      }
> >>
> >>      public void processWatermark2(Watermark mark) throws Exception {
> >>          input2Watermark = mark.getTimestamp();
> >>          long newMin = Math.min(input1Watermark, input2Watermark);
> >>          if (newMin > combinedWatermark) {
> >>              combinedWatermark = newMin;
> >>              processWatermark(new Watermark(combinedWatermark));
> >>          }
> >>      }
> >>
> >> There we do not know that e.g. the whole input 1 is idle. Therefore if
> >> we do not receive any Watermarks from it (it became IDLE) we do not
> >> progress the Watermark starting from any two input operator. We are
> >> missing similar handling of the IDLE status from the task level which
> >> works well for one input operators and multiple parallel upstream
> >> instances.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to