Hi,
We have a simple Beam application like a work count running with Flink
runner (Beam 2.26 and Flink 1.9). We are using Beam’s value state. I am trying
to read the state from savepoint using Flink's State Processor API but getting
a NullPointerException. Converted the whole code into Pure Flink application,
created a savepoint and tried to read the state where we are able to read the
state successfully.
Exception Stack trace:
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at com.intuit.spp.example.StateReader.main(StateReader.java:34)
Caused by: java.io.IOException: Failed to restore state backend
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for
f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 9 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
When I debugged, it is showing that it is throwing
NullPointerException at
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
metaInfoSnapshot is null. I then checked what all kvStateId values we are
getting at
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277.
I also did stateMetaInfoSnapshot.getName() along with corresponding kvStateId
and I saw that Beam is creating additional states internally. It is giving
stateMetaInfoSnapshot Name
kvStateId
pushed-back-elements
0
ivid_counts
1
watermark-holds
2
sortBuffer
3
pending-timers
4
sortBufferMinStamp
5
_timer_state/event_beam-timer
6
_timer_state/processing_beam-timer
7
Out of which ivid_counts state is created by us and rest of the state is
created by Beam. In one instance it is trying to retrieve kvStateId 11570 and
its not able to get any state from
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
giving stateMetaInfoSnapshot as NULL.
I debugged further but couldn’t find the reason why it is trying to read some
non-existent state.
Can you please help on this?
Thanks,
Sandeep