Thanks David, I am working on a flink datastream job that does a temporal join of two kafka topics based on watermarks. The problem was quite obvious when I enabled idleness and data flowed through much faster with different results even though the topics were not idle.
Regards. On Mon, Aug 15, 2022 at 12:12 AM David Anderson <dander...@apache.org> wrote: > Although I'm not very familiar with the design of the code involved, I > also looked at the code, and I'm inclined to agree with you that this is a > bug. Please do raise an issue. > > I'm wondering how you noticed this. I was thinking about how to write a > failing test, and I'm wondering if this has some impact that is easily > observed. (My first thought was "How can something this basic be broken?" > but then I realized that the impact is fairly subtle.) > > David > > On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote: > >> Hi all, >> >> After examining the source code further, I am quite sure >> org.apache.flink.api.common.eventtime.WatermarksWithIdleness >> does not work with FLIP-27 sources. >> >> In org.apache.flink.streaming.api.operators.SourceOperator, there are >> separate instances of WatermarksWithIdleness created for each split >> output and the main output. There is multiplexing of watermarks between >> split outputs but no multiplexing between split output and main output. >> >> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, >> there is only output from splits and no output from main. Hence the main >> output will (after an initial timeout) be marked as idle. >> >> The implementation of WatermarksWithIdleness is such that once an output >> is idle, it will periodically re-mark the output as idle. Since there is no >> multiplexing between split outputs and main output, the idle marks coming >> from main output will repeatedly set the output to idle even though there >> are events from the splits. Result is that the entire source is repeatedly >> marked as idle. >> >> I currently worked around this by implementing my own WatermarksWithIdleness >> which will only mark the output as idle once (until it becomes active then >> idle again) instead of repeatedly. >> >> I will try to raise an issue on this unless somebody can point out where >> I went wrong with this. >> >> Thanks. >> >> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote: >> >>> Hi, >>> >>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a >>> watermark strategy like this: >>> >>> >>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10)) >>> >>> I noticed that after a short while all the partitions seem to be marked >>> as idle even though there are messages coming in. >>> >>> I made a copy of the class WatermarksWithIdleness and added some logging >>> to trace what is happening. >>> >>> It seems there are 2 copies of this WatermarkGenerator created per >>> partition. One during SourceOperator.initializeMainOutput and another >>> during SourceOperator.createOutputForSplits. >>> >>> When there are new messages, only the one created during >>> SourceOperator.createOutputForSplits has activity. >>> >>> It seems the one created during SourceOperator.initializeMainOutput will >>> eventually output an idle mark as it has no activity and this causes the >>> entire partition to be marked as idle. >>> >>> Is my understanding correct? If so, is this a feature or bug? >>> >>> Thanks. >>> >>