Yan, I've created https://issues.apache.org/jira/browse/FLINK-28975 to
track this.

Regards,
David

On Sun, Aug 14, 2022 at 6:38 PM Yan Shen <leey...@gmail.com> wrote:

> Thanks David,
>
> I am working on a flink datastream job that does a temporal join of two
> kafka topics based on watermarks. The problem was quite obvious when I
> enabled idleness and data flowed through much faster with different results
> even though the topics were not idle.
>
> Regards.
>
> On Mon, Aug 15, 2022 at 12:12 AM David Anderson <dander...@apache.org>
> wrote:
>
>> Although I'm not very familiar with the design of the code involved, I
>> also looked at the code, and I'm inclined to agree with you that this is a
>> bug. Please do raise an issue.
>>
>> I'm wondering how you noticed this. I was thinking about how to write a
>> failing test, and I'm wondering if this has some impact that is easily
>> observed. (My first thought was "How can something this basic be broken?"
>> but then I realized that the impact is fairly subtle.)
>>
>> David
>>
>> On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> After examining the source code further, I am quite sure 
>>> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
>>> does not work with FLIP-27 sources.
>>>
>>> In org.apache.flink.streaming.api.operators.SourceOperator, there are
>>> separate instances of WatermarksWithIdleness created for each split
>>> output and the main output. There is multiplexing of watermarks between
>>> split outputs but no multiplexing between split output and main output.
>>>
>>> For a source such as org.apache.flink.connector.kafka.source.KafkaSource,
>>> there is only output from splits and no output from main. Hence the main
>>> output will (after an initial timeout) be marked as idle.
>>>
>>> The implementation of WatermarksWithIdleness is such that once an
>>> output is idle, it will periodically re-mark the output as idle. Since
>>> there is no multiplexing between split outputs and main output, the idle
>>> marks coming from main output will repeatedly set the output to idle even
>>> though there are events from the splits. Result is that the entire source
>>> is repeatedly marked as idle.
>>>
>>> I currently worked around this by implementing my own WatermarksWithIdleness
>>> which will only mark the output as idle once (until it becomes active then
>>> idle again) instead of repeatedly.
>>>
>>> I will try to raise an issue on this unless somebody can point out where
>>> I went wrong with this.
>>>
>>> Thanks.
>>>
>>> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
>>>> watermark strategy like this:
>>>>
>>>>
>>>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>>>>
>>>> I noticed that after a short while all the partitions seem to be marked
>>>> as idle even though there are messages coming in.
>>>>
>>>> I made a copy of the class WatermarksWithIdleness and added some
>>>> logging to trace what is happening.
>>>>
>>>> It seems there are 2 copies of this WatermarkGenerator created per
>>>> partition. One during SourceOperator.initializeMainOutput and another
>>>> during SourceOperator.createOutputForSplits.
>>>>
>>>> When there are new messages, only the one created during
>>>> SourceOperator.createOutputForSplits has activity.
>>>>
>>>> It seems the one created during SourceOperator.initializeMainOutput
>>>> will eventually output an idle mark as it has no activity and this causes
>>>> the entire partition to be marked as idle.
>>>>
>>>> Is my understanding correct? If so, is this a feature or bug?
>>>>
>>>> Thanks.
>>>>
>>>

Reply via email to