Thanks David,

I am working on a flink datastream job that does a temporal join of two
kafka topics based on watermarks. The problem was quite obvious when I
enabled idleness and data flowed through much faster with different results
even though the topics were not idle.

Regards.

On Mon, Aug 15, 2022 at 12:12 AM David Anderson <dander...@apache.org>
wrote:

> Although I'm not very familiar with the design of the code involved, I
> also looked at the code, and I'm inclined to agree with you that this is a
> bug. Please do raise an issue.
>
> I'm wondering how you noticed this. I was thinking about how to write a
> failing test, and I'm wondering if this has some impact that is easily
> observed. (My first thought was "How can something this basic be broken?"
> but then I realized that the impact is fairly subtle.)
>
> David
>
> On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote:
>
>> Hi all,
>>
>> After examining the source code further, I am quite sure 
>> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
>> does not work with FLIP-27 sources.
>>
>> In org.apache.flink.streaming.api.operators.SourceOperator, there are
>> separate instances of WatermarksWithIdleness created for each split
>> output and the main output. There is multiplexing of watermarks between
>> split outputs but no multiplexing between split output and main output.
>>
>> For a source such as org.apache.flink.connector.kafka.source.KafkaSource,
>> there is only output from splits and no output from main. Hence the main
>> output will (after an initial timeout) be marked as idle.
>>
>> The implementation of WatermarksWithIdleness is such that once an output
>> is idle, it will periodically re-mark the output as idle. Since there is no
>> multiplexing between split outputs and main output, the idle marks coming
>> from main output will repeatedly set the output to idle even though there
>> are events from the splits. Result is that the entire source is repeatedly
>> marked as idle.
>>
>> I currently worked around this by implementing my own WatermarksWithIdleness
>> which will only mark the output as idle once (until it becomes active then
>> idle again) instead of repeatedly.
>>
>> I will try to raise an issue on this unless somebody can point out where
>> I went wrong with this.
>>
>> Thanks.
>>
>> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
>>> watermark strategy like this:
>>>
>>>
>>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>>>
>>> I noticed that after a short while all the partitions seem to be marked
>>> as idle even though there are messages coming in.
>>>
>>> I made a copy of the class WatermarksWithIdleness and added some logging
>>> to trace what is happening.
>>>
>>> It seems there are 2 copies of this WatermarkGenerator created per
>>> partition. One during SourceOperator.initializeMainOutput and another
>>> during SourceOperator.createOutputForSplits.
>>>
>>> When there are new messages, only the one created during
>>> SourceOperator.createOutputForSplits has activity.
>>>
>>> It seems the one created during SourceOperator.initializeMainOutput will
>>> eventually output an idle mark as it has no activity and this causes the
>>> entire partition to be marked as idle.
>>>
>>> Is my understanding correct? If so, is this a feature or bug?
>>>
>>> Thanks.
>>>
>>

Reply via email to