[ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767012#comment-17767012
 ] 

Yordan Pavlov commented on FLINK-33109:
---------------------------------------

Hi [~fanrui] can I ask for your help in testing the latest builds. I have 
modified the Dockerfile found at:

https://github.com/apache/flink-docker/blob/master/1.17/scala_2.12-java11-ubuntu/Dockerfile

so that it builds from source code. The branch I am using is `release-1.17`. 
Here is the full Dockerfile I am using, in it I am also applying my user Flink 
job code.
[https://gist.github.com/YordanPavlov/b9d2f08370dadb5ab18a2dc096b21481]

 

However, switching a running job (from checkpoint) from the official Flink 
1.17.1 image to the one I've built from source gives me the error:


{code:java}
from INITIALIZING to FAILED on 10.42.209.30:44171-33acd8 @ 
10-42-209-30.xrp-extractor-v4-flink-taskmanager-7.flink.svc.cluster.local 
(dataPort=46037).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_439a9d1f894a5a8c69e615cc793a5c7b_(11/32) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
    ... 11 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407)
    at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512)
    at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 common frames omitted
Caused by: java.lang.NoSuchMethodError: 'java.util.List 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.getSharedStateHandles()'
    at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restorePreviousIncrementalFilesStatus(RocksDBIncrementalRestoreOperation.java:212)
    at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:188)
    at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:169)
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:327){code}
Can you hint me in what I might be doing wrong, thanks!

> Watermark alignment not applied after recovery from checkpoint
> --------------------------------------------------------------
>
>                 Key: FLINK-33109
>                 URL: https://issues.apache.org/jira/browse/FLINK-33109
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.17.1
>            Reporter: Yordan Pavlov
>            Priority: Major
>         Attachments: image-2023-09-18-15-40-06-868.png, 
> image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to