[ https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767012#comment-17767012 ]
Yordan Pavlov commented on FLINK-33109: --------------------------------------- Hi [~fanrui] can I ask for your help in testing the latest builds. I have modified the Dockerfile found at: https://github.com/apache/flink-docker/blob/master/1.17/scala_2.12-java11-ubuntu/Dockerfile so that it builds from source code. The branch I am using is `release-1.17`. Here is the full Dockerfile I am using, in it I am also applying my user Flink job code. [https://gist.github.com/YordanPavlov/b9d2f08370dadb5ab18a2dc096b21481] However, switching a running job (from checkpoint) from the official Flink 1.17.1 image to the one I've built from source gives me the error: {code:java} from INITIALIZING to FAILED on 10.42.209.30:44171-33acd8 @ 10-42-209-30.xrp-extractor-v4-flink-taskmanager-7.flink.svc.cluster.local (dataPort=46037). java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_439a9d1f894a5a8c69e615cc793a5c7b_(11/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) ... 11 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 common frames omitted Caused by: java.lang.NoSuchMethodError: 'java.util.List org.apache.flink.runtime.state.IncrementalKeyedStateHandle.getSharedStateHandles()' at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restorePreviousIncrementalFilesStatus(RocksDBIncrementalRestoreOperation.java:212) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:188) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:169) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:327){code} Can you hint me in what I might be doing wrong, thanks! > Watermark alignment not applied after recovery from checkpoint > -------------------------------------------------------------- > > Key: FLINK-33109 > URL: https://issues.apache.org/jira/browse/FLINK-33109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.17.1 > Reporter: Yordan Pavlov > Priority: Major > Attachments: image-2023-09-18-15-40-06-868.png, > image-2023-09-18-15-46-16-106.png > > > I am observing a problem where after recovery from a checkpoint the Kafka > source watermarks would start to diverge not honoring the watermark alignment > setting I have applied. > I have a Kafka source which reads a topic with 32 partitions. I am applying > the following watermark strategy: > {code:java} > new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => > msg.value.getTimestamp) > .withWatermarkAlignment("alignment-sources-group", > time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code} > > This works great up until my job needs to recover from checkpoint. Once the > recovery takes place, no alignment is taking place any more. This can best be > illustrated by looking at the watermark metrics for various operators in the > image: > !image-2023-09-18-15-40-06-868.png! > > You can see how the watermarks disperse after the recovery. Trying to debug > the problem I noticed that before the failure there would be calls in > > {code:java} > SourceCoordinator::announceCombinedWatermark() > {code} > after the recovery, no calls get there, so no value for > {code:java} > watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code} > is ever read. I can manually fix the problem If I stop the job, clear all > state from Zookeeper and then manually start Flink providing the last > checkpoint with > {code:java} > '–fromSavepoint'{code} > flag. This would cause the SourceCoordinator to be constructed properly and > watermark drift to be checked. Once recovery manually watermarks would again > converge to the allowed drift as seen in the metrics: > !image-2023-09-18-15-46-16-106.png! > > Let me know If I can be helpful by providing any more information. > -- This message was sent by Atlassian Jira (v8.20.10#820010)