Hello,

I am currently evaluating idleness and alignment with Flink 1.17.1 and the
externalized Kafka connector. My job has 3 sources whose watermark
strategies are defined like this:

WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
            .withIdleness(idleTimeout)
            .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
Duration.ofSeconds(1L))

The max allowed drift is currently 5 seconds, and my sources have an
idleTimeout of 1, 1.5, and 5 seconds.

What I observe is that, when I restart the job, all sources publish
messages, but then 2 of them are marked as idle and never resume. I found
https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed in
1.17.1, but I don't think it's the same issue, my logs don't show negative
values:

2023-06-27 15:11:42,927 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
subTaskId=1
2023-06-27 15:11:43,009 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,091 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,116 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,298 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
2023-06-27 15:11:43,304 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,306 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,486 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,489 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,492 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]

Does anyone know if I'm missing something or this is really a bug?

Regards,
Alexis.

Reply via email to