Hi,

I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
watermark strategy like this:

WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))

I noticed that after a short while all the partitions seem to be marked as
idle even though there are messages coming in.

I made a copy of the class WatermarksWithIdleness and added some logging to
trace what is happening.

It seems there are 2 copies of this WatermarkGenerator created per
partition. One during SourceOperator.initializeMainOutput and another
during SourceOperator.createOutputForSplits.

When there are new messages, only the one created during
SourceOperator.createOutputForSplits has activity.

It seems the one created during SourceOperator.initializeMainOutput will
eventually output an idle mark as it has no activity and this causes the
entire partition to be marked as idle.

Is my understanding correct? If so, is this a feature or bug?

Thanks.

Reply via email to