Hi, We have a simple Beam application like a work count running with Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value state. I am trying to read the state from savepoint using Flink's State Processor API but getting a NullPointerException. Converted the whole code into Pure Flink application, created a savepoint and tried to read the state where we are able to read the state successfully.
Exception Stack trace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) at org.apache.flink.api.java.DataSet.count(DataSet.java:398) at com.intuit.spp.example.StateReader.main(StateReader.java:34) Caused by: java.io.IOException: Failed to restore state backend at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223) ... 6 more Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 7 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 9 more Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 13 more When I debugged, it is showing that it is throwing NullPointerException at https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280 metaInfoSnapshot is null. I then checked what all kvStateId values we are getting at https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277. I also did stateMetaInfoSnapshot.getName() along with corresponding kvStateId and I saw that Beam is creating additional states internally. It is giving stateMetaInfoSnapshot Name kvStateId pushed-back-elements 0 ivid_counts 1 watermark-holds 2 sortBuffer 3 pending-timers 4 sortBufferMinStamp 5 _timer_state/event_beam-timer 6 _timer_state/processing_beam-timer 7 Out of which ivid_counts state is created by us and rest of the state is created by Beam. In one instance it is trying to retrieve kvStateId 11570 and its not able to get any state from https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277 giving stateMetaInfoSnapshot as NULL. I debugged further but couldn’t find the reason why it is trying to read some non-existent state. Can you please help on this? Thanks, Sandeep