Hi all,

After examining the source code further, I am quite sure
org.apache.flink.api.common.eventtime.WatermarksWithIdleness
does not work with FLIP-27 sources.

In org.apache.flink.streaming.api.operators.SourceOperator, there are
separate instances of WatermarksWithIdleness created for each split output
and the main output. There is multiplexing of watermarks between split
outputs but no multiplexing between split output and main output.

For a source such as org.apache.flink.connector.kafka.source.KafkaSource, there
is only output from splits and no output from main. Hence the main output
will (after an initial timeout) be marked as idle.

The implementation of WatermarksWithIdleness is such that once an output is
idle, it will periodically re-mark the output as idle. Since there is no
multiplexing between split outputs and main output, the idle marks coming
from main output will repeatedly set the output to idle even though there
are events from the splits. Result is that the entire source is repeatedly
marked as idle.

I currently worked around this by implementing my own WatermarksWithIdleness
which will only mark the output as idle once (until it becomes active then
idle again) instead of repeatedly.

I will try to raise an issue on this unless somebody can point out where I
went wrong with this.

Thanks.

On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote:

> Hi,
>
> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
> watermark strategy like this:
>
>
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>
> I noticed that after a short while all the partitions seem to be marked as
> idle even though there are messages coming in.
>
> I made a copy of the class WatermarksWithIdleness and added some logging
> to trace what is happening.
>
> It seems there are 2 copies of this WatermarkGenerator created per
> partition. One during SourceOperator.initializeMainOutput and another
> during SourceOperator.createOutputForSplits.
>
> When there are new messages, only the one created during
> SourceOperator.createOutputForSplits has activity.
>
> It seems the one created during SourceOperator.initializeMainOutput will
> eventually output an idle mark as it has no activity and this causes the
> entire partition to be marked as idle.
>
> Is my understanding correct? If so, is this a feature or bug?
>
> Thanks.
>

Reply via email to