Yan, I've created https://issues.apache.org/jira/browse/FLINK-28975 to track this.
Regards, David On Sun, Aug 14, 2022 at 6:38 PM Yan Shen <leey...@gmail.com> wrote: > Thanks David, > > I am working on a flink datastream job that does a temporal join of two > kafka topics based on watermarks. The problem was quite obvious when I > enabled idleness and data flowed through much faster with different results > even though the topics were not idle. > > Regards. > > On Mon, Aug 15, 2022 at 12:12 AM David Anderson <dander...@apache.org> > wrote: > >> Although I'm not very familiar with the design of the code involved, I >> also looked at the code, and I'm inclined to agree with you that this is a >> bug. Please do raise an issue. >> >> I'm wondering how you noticed this. I was thinking about how to write a >> failing test, and I'm wondering if this has some impact that is easily >> observed. (My first thought was "How can something this basic be broken?" >> but then I realized that the impact is fairly subtle.) >> >> David >> >> On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote: >> >>> Hi all, >>> >>> After examining the source code further, I am quite sure >>> org.apache.flink.api.common.eventtime.WatermarksWithIdleness >>> does not work with FLIP-27 sources. >>> >>> In org.apache.flink.streaming.api.operators.SourceOperator, there are >>> separate instances of WatermarksWithIdleness created for each split >>> output and the main output. There is multiplexing of watermarks between >>> split outputs but no multiplexing between split output and main output. >>> >>> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, >>> there is only output from splits and no output from main. Hence the main >>> output will (after an initial timeout) be marked as idle. >>> >>> The implementation of WatermarksWithIdleness is such that once an >>> output is idle, it will periodically re-mark the output as idle. Since >>> there is no multiplexing between split outputs and main output, the idle >>> marks coming from main output will repeatedly set the output to idle even >>> though there are events from the splits. Result is that the entire source >>> is repeatedly marked as idle. >>> >>> I currently worked around this by implementing my own WatermarksWithIdleness >>> which will only mark the output as idle once (until it becomes active then >>> idle again) instead of repeatedly. >>> >>> I will try to raise an issue on this unless somebody can point out where >>> I went wrong with this. >>> >>> Thanks. >>> >>> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a >>>> watermark strategy like this: >>>> >>>> >>>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10)) >>>> >>>> I noticed that after a short while all the partitions seem to be marked >>>> as idle even though there are messages coming in. >>>> >>>> I made a copy of the class WatermarksWithIdleness and added some >>>> logging to trace what is happening. >>>> >>>> It seems there are 2 copies of this WatermarkGenerator created per >>>> partition. One during SourceOperator.initializeMainOutput and another >>>> during SourceOperator.createOutputForSplits. >>>> >>>> When there are new messages, only the one created during >>>> SourceOperator.createOutputForSplits has activity. >>>> >>>> It seems the one created during SourceOperator.initializeMainOutput >>>> will eventually output an idle mark as it has no activity and this causes >>>> the entire partition to be marked as idle. >>>> >>>> Is my understanding correct? If so, is this a feature or bug? >>>> >>>> Thanks. >>>> >>>