Hi,
     We have a simple Beam application like a work count running with Flink 
runner (Beam 2.26 and Flink 1.9). We are using Beam’s value state. I am trying 
to read the state from savepoint using  Flink's State Processor API but getting 
a NullPointerException. Converted the whole code into Pure Flink application, 
created a savepoint and tried to read the state where we are able to read the 
state successfully.

Exception Stack trace:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
                at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
                at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
                at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
                at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
                at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
                at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
                at com.intuit.spp.example.StateReader.main(StateReader.java:34)
Caused by: java.io.IOException: Failed to restore state backend
                at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
                at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
                at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
                at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
                at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Exception while creating 
StreamOperatorStateContext.
                at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
                at 
org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
                ... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for 
f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) from 
any of the 1 provided restore options.
                at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
                at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
                at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
                ... 7 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
                at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
                at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
                at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
                at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
                at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
                ... 9 more
Caused by: java.lang.NullPointerException
                at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
                at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
                at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
                at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
                ... 13 more





            When I debugged, it is showing that it is throwing 
NullPointerException at 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
metaInfoSnapshot is null. I then checked what all kvStateId values we are 
getting at 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277.

I also did stateMetaInfoSnapshot.getName() along with corresponding kvStateId 
and I saw that Beam is creating additional states internally. It is giving

stateMetaInfoSnapshot Name
kvStateId
pushed-back-elements
0
ivid_counts
1
watermark-holds
2
sortBuffer
3
pending-timers
4
sortBufferMinStamp
5
_timer_state/event_beam-timer
6
_timer_state/processing_beam-timer
7

Out of which ivid_counts state is created by us and rest of the state is 
created by Beam. In one instance it is trying to retrieve kvStateId 11570 and 
its not able to get any state from 
https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
 giving stateMetaInfoSnapshot as NULL.

I debugged further but couldn’t find the reason why it is trying to read some 
non-existent state.


Can you please help on this?


Thanks,
Sandeep

Reply via email to