Yes, I'm afraid this analysis is correct. The StreamOperator, AbstractStreamOperator to be specific, computes the combined watermarks from both inputs here: https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. The operator layer is not aware of idleness so it will never notice. The idleness only works on the level of inputs but is never forwarded to an operator itself.

To fix this we would have to also make operators aware of idleness such that they can take this into account when computing the combined output watermark.

Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:
Hi Kien,

I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.

In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.

I'm also cc'ing Aljoscha who could maybe confirm my analysis.

Best,

Dawid

On 24/08/2020 16:00, Truong Duc Kien wrote:
Hi all,
We are testing the new Idleness detection feature in Flink 1.11,
however, it does not work as we expected:
When we connect two data streams, of which one is idle, the output
watermark CoProcessOperator does not increase, hence the program
cannot progress.

I've made a small project to illustrate the problem. The watermark
received by the sink does not increase at all until the idle source is
stopped.

https://github.com/kien-truong/flink-idleness-testing

Is this a bug or does the idleness detection not support this use case ?

Regards.
Kien


Reply via email to