Although I'm not very familiar with the design of the code involved, I also
looked at the code, and I'm inclined to agree with you that this is a bug.
Please do raise an issue.

I'm wondering how you noticed this. I was thinking about how to write a
failing test, and I'm wondering if this has some impact that is easily
observed. (My first thought was "How can something this basic be broken?"
but then I realized that the impact is fairly subtle.)

David

On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote:

> Hi all,
>
> After examining the source code further, I am quite sure 
> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
> does not work with FLIP-27 sources.
>
> In org.apache.flink.streaming.api.operators.SourceOperator, there are
> separate instances of WatermarksWithIdleness created for each split
> output and the main output. There is multiplexing of watermarks between
> split outputs but no multiplexing between split output and main output.
>
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> there
> is only output from splits and no output from main. Hence the main output
> will (after an initial timeout) be marked as idle.
>
> The implementation of WatermarksWithIdleness is such that once an output
> is idle, it will periodically re-mark the output as idle. Since there is no
> multiplexing between split outputs and main output, the idle marks coming
> from main output will repeatedly set the output to idle even though there
> are events from the splits. Result is that the entire source is repeatedly
> marked as idle.
>
> I currently worked around this by implementing my own WatermarksWithIdleness
> which will only mark the output as idle once (until it becomes active then
> idle again) instead of repeatedly.
>
> I will try to raise an issue on this unless somebody can point out where I
> went wrong with this.
>
> Thanks.
>
> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
>> watermark strategy like this:
>>
>>
>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>>
>> I noticed that after a short while all the partitions seem to be marked
>> as idle even though there are messages coming in.
>>
>> I made a copy of the class WatermarksWithIdleness and added some logging
>> to trace what is happening.
>>
>> It seems there are 2 copies of this WatermarkGenerator created per
>> partition. One during SourceOperator.initializeMainOutput and another
>> during SourceOperator.createOutputForSplits.
>>
>> When there are new messages, only the one created during
>> SourceOperator.createOutputForSplits has activity.
>>
>> It seems the one created during SourceOperator.initializeMainOutput will
>> eventually output an idle mark as it has no activity and this causes the
>> entire partition to be marked as idle.
>>
>> Is my understanding correct? If so, is this a feature or bug?
>>
>> Thanks.
>>
>

Reply via email to