Nico Kruber created FLINK-16576:
-----------------------------------
Summary: State inconsistency on restore with memory state backends
Key: FLINK-16576
URL: https://issues.apache.org/jira/browse/FLINK-16576
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.10.0, 1.9.2
Reporter: Nico Kruber
Fix For: 1.9.3, 1.10.1, 1.11.0
I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}}
example in Flink. Restore would fail with either of these causes, but only for
the memory state backends and only with some combinations of parallelism I took
the savepoint with and parallelism I restore the job with:
{code:java}
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64,
endKeyGroup=95} does not contain key group 97 {code}
or
{code:java}
java.lang.NullPointerException
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
{code}
or
{code:java}
java.io.IOException: Corrupt stream, found tag: 8
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
{code}
I managed to make it reproducible in a test that I quickly hacked together in
[https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java]
(please checkout the whole repository since I had to change some dependencies).
In a bit more detail, this is what I discovered before, also with a manual
savepoint on S3:
Savepoint that was taken with parallelism 2 (p=2) and shows the restore failure
in three different ways (all running in Flink 1.10.0; but I also see it in
Flink 1.9):
* first of all, if I try to restore with p=2, everything is fine
* if I restore with p=4 I get an exception like the one mentioned above:
{code:java}
2020-03-11 15:53:35,149 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4)
(2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64,
endKeyGroup=95} does not contain key group 97
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
{code}
* if I restore with p=3, I get the following exception instead:
{code:java}
2020-03-11 21:40:28,390 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/3)
(2fb8acde321f6cbfec56c300153d6dea) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/3) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
{code}
While the latter error somewhat indicates that the savepoint may be corrupt (or
that the reader/deserializer messed up), it remains a mystery to me why I can
successfully restore with p=2.
* occasionally, for p=4, I also see this exception instead:
{code:java}
10:23:12,046 WARN
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception
while restoring keyed state backend for
EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from alternative
(1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to
restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Corrupt stream, found tag: 8
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
at
org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more{code}
I narrowed the cause down to the state access inside {{DeltaTrigger}}: Removing
the cleanup in
{{org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger#clear()}} did
not change anything but removing state access in its {{onElement}} (pictured
below) seems to resolve the bug:
{code:java}
@Override
public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
if (lastElementState.value() == null) {
lastElementState.update(element);
return TriggerResult.CONTINUE;
}
if (deltaFunction.getDelta(lastElementState.value(), element) >
this.threshold) {
lastElementState.update(element);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)