Although I'm not very familiar with the design of the code involved, I also looked at the code, and I'm inclined to agree with you that this is a bug. Please do raise an issue.
I'm wondering how you noticed this. I was thinking about how to write a failing test, and I'm wondering if this has some impact that is easily observed. (My first thought was "How can something this basic be broken?" but then I realized that the impact is fairly subtle.) David On Sat, Aug 13, 2022 at 11:46 PM Yan Shen <leey...@gmail.com> wrote: > Hi all, > > After examining the source code further, I am quite sure > org.apache.flink.api.common.eventtime.WatermarksWithIdleness > does not work with FLIP-27 sources. > > In org.apache.flink.streaming.api.operators.SourceOperator, there are > separate instances of WatermarksWithIdleness created for each split > output and the main output. There is multiplexing of watermarks between > split outputs but no multiplexing between split output and main output. > > For a source such as org.apache.flink.connector.kafka.source.KafkaSource, > there > is only output from splits and no output from main. Hence the main output > will (after an initial timeout) be marked as idle. > > The implementation of WatermarksWithIdleness is such that once an output > is idle, it will periodically re-mark the output as idle. Since there is no > multiplexing between split outputs and main output, the idle marks coming > from main output will repeatedly set the output to idle even though there > are events from the splits. Result is that the entire source is repeatedly > marked as idle. > > I currently worked around this by implementing my own WatermarksWithIdleness > which will only mark the output as idle once (until it becomes active then > idle again) instead of repeatedly. > > I will try to raise an issue on this unless somebody can point out where I > went wrong with this. > > Thanks. > > On Wed, Aug 10, 2022 at 1:26 PM Yan Shen <leey...@gmail.com> wrote: > >> Hi, >> >> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a >> watermark strategy like this: >> >> >> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10)) >> >> I noticed that after a short while all the partitions seem to be marked >> as idle even though there are messages coming in. >> >> I made a copy of the class WatermarksWithIdleness and added some logging >> to trace what is happening. >> >> It seems there are 2 copies of this WatermarkGenerator created per >> partition. One during SourceOperator.initializeMainOutput and another >> during SourceOperator.createOutputForSplits. >> >> When there are new messages, only the one created during >> SourceOperator.createOutputForSplits has activity. >> >> It seems the one created during SourceOperator.initializeMainOutput will >> eventually output an idle mark as it has no activity and this causes the >> entire partition to be marked as idle. >> >> Is my understanding correct? If so, is this a feature or bug? >> >> Thanks. >> >