[
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767012#comment-17767012
]
Yordan Pavlov commented on FLINK-33109:
---------------------------------------
Hi [~fanrui] can I ask for your help in testing the latest builds. I have
modified the Dockerfile found at:
https://github.com/apache/flink-docker/blob/master/1.17/scala_2.12-java11-ubuntu/Dockerfile
so that it builds from source code. The branch I am using is `release-1.17`.
Here is the full Dockerfile I am using, in it I am also applying my user Flink
job code.
[https://gist.github.com/YordanPavlov/b9d2f08370dadb5ab18a2dc096b21481]
However, switching a running job (from checkpoint) from the official Flink
1.17.1 image to the one I've built from source gives me the error:
{code:java}
from INITIALIZING to FAILED on 10.42.209.30:44171-33acd8 @
10-42-209-30.xrp-extractor-v4-flink-taskmanager-7.flink.svc.cluster.local
(dataPort=46037).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for WindowOperator_439a9d1f894a5a8c69e615cc793a5c7b_(11/32) from any of
the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
... 11 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:407)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:512)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 common frames omitted
Caused by: java.lang.NoSuchMethodError: 'java.util.List
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.getSharedStateHandles()'
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restorePreviousIncrementalFilesStatus(RocksDBIncrementalRestoreOperation.java:212)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:188)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:169)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:327){code}
Can you hint me in what I might be doing wrong, thanks!
> Watermark alignment not applied after recovery from checkpoint
> --------------------------------------------------------------
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.17.1
> Reporter: Yordan Pavlov
> Priority: Major
> Attachments: image-2023-09-18-15-40-06-868.png,
> image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka
> source watermarks would start to diverge not honoring the watermark alignment
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg =>
> msg.value.getTimestamp)
> .withWatermarkAlignment("alignment-sources-group",
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>
> This works great up until my job needs to recover from checkpoint. Once the
> recovery takes place, no alignment is taking place any more. This can best be
> illustrated by looking at the watermark metrics for various operators in the
> image:
> !image-2023-09-18-15-40-06-868.png!
>
> You can see how the watermarks disperse after the recovery. Trying to debug
> the problem I noticed that before the failure there would be calls in
>
> {code:java}
> SourceCoordinator::announceCombinedWatermark()
> {code}
> after the recovery, no calls get there, so no value for
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all
> state from Zookeeper and then manually start Flink providing the last
> checkpoint with
> {code:java}
> '–fromSavepoint'{code}
> flag. This would cause the SourceCoordinator to be constructed properly and
> watermark drift to be checked. Once recovery manually watermarks would again
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>
> Let me know If I can be helpful by providing any more information.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)