Yordan Pavlov created FLINK-33109:
-------------------------------------

             Summary: Watermark alignment not applied after recovery from 
checkpoint
                 Key: FLINK-33109
                 URL: https://issues.apache.org/jira/browse/FLINK-33109
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.17.1
            Reporter: Yordan Pavlov
         Attachments: image-2023-09-18-15-40-06-868.png, 
image-2023-09-18-15-46-16-106.png

I am observing a problem where after recovery from a checkpoint the Kafka 
source watermarks would start to diverge not honoring the watermark alignment 
setting I have applied.

I have a Kafka source which reads a topic with 32 partitions. I am applying the 
following watermark strategy:


{code:java}
new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
msg.value.getTimestamp)
      .withWatermarkAlignment("alignment-sources-group", 
time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
 

This works great up until my job needs to recover from checkpoint. Once the 
recovery takes place, no alignment is taking place any more. This can best be 
illustrated by looking at the watermark metrics for various operators in the 
image:

!image-2023-09-18-15-40-06-868.png!

 

You can see how the watermarks disperse after the recovery. Trying to debug the 
problem I noticed that before the failure there would be calls in

 
{code:java}
SourceCoordinator::announceCombinedWatermark() 
{code}
after the recovery, no calls get there, so no value for 
{code:java}
watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
is ever read. I can manually fix the problem If I stop the job, clear all state 
from Zookeeper and then manually start Flink providing the last checkpoint with 
{code:java}
'–fromSavepoint'{code}
 flag. This would cause the SourceCoordinator to be constructed properly and 
watermark drift to be checked. Once recovery manually watermarks would again 
converge to the allowed drift as seen in the metrics:

!image-2023-09-18-15-46-16-106.png!

 

Let me know If I can be helpful by providing any more information.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to