Hi, I am using a org.apache.flink.connector.kafka.source.KafkaSource with a watermark strategy like this:
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10)) I noticed that after a short while all the partitions seem to be marked as idle even though there are messages coming in. I made a copy of the class WatermarksWithIdleness and added some logging to trace what is happening. It seems there are 2 copies of this WatermarkGenerator created per partition. One during SourceOperator.initializeMainOutput and another during SourceOperator.createOutputForSplits. When there are new messages, only the one created during SourceOperator.createOutputForSplits has activity. It seems the one created during SourceOperator.initializeMainOutput will eventually output an idle mark as it has no activity and this causes the entire partition to be marked as idle. Is my understanding correct? If so, is this a feature or bug? Thanks.