Hi Sandeep, thanks for the example, I'll take a look into it and will get back to you ;)
On Tue, Aug 3, 2021 at 9:44 PM Kathula, Sandeep <sandeep_kath...@intuit.com> wrote: > Hi David, > Thanks for the reply. I tried with Beam 2.29 and Flink > 1.12 and still getting NullPointerException like before. I changed the code > a bit to remove all the proprietary software used in our company and able > to reproduce the issue with local Kafka, Beam with Flink runner running > locally. > > Beam Flink runner code: > https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/processor/Processor.java > Local Kafka producer: > https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/kafka/SimpleKafkaProducer.java > Reading state using State processor API: > https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/readstate/StateReader.java > > Thanks, > Sandeep > > > On 7/27/21, 10:10 AM, "David Morávek" <d...@apache.org> wrote: > > This email is from an external sender. > > > Hi Sandeep, > > In general I'd say it will be tricky to read Beam state this way as it > doesn't use Flink primitives, but it's writing state in custom binary > format (it can be de-serialized, but it's not easy to put all of the > pieces > together). > > Can you please share an example code of how you're reading the state? > Also > can please you try this with latest Beam / Flink versions (the ones > you're > using are no longer supported)? > > Best, > D. > > On Tue, Jul 27, 2021 at 5:46 PM Kathula, Sandeep > <sandeep_kath...@intuit.com.invalid> wrote: > > > Hi, > > We have a simple Beam application like a work count running with > > Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value > state. I > > am trying to read the state from savepoint using Flink's State > Processor > > API but getting a NullPointerException. Converted the whole code > into Pure > > Flink application, created a savepoint and tried to read the state > where we > > are able to read the state successfully. > > > > Exception Stack trace: > > > > Exception in thread "main" > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > > at > > > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > > at > > > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631) > > at > > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222) > > at > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) > > at > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820) > > at > > org.apache.flink.api.java.DataSet.count(DataSet.java:398) > > at > > com.intuit.spp.example.StateReader.main(StateReader.java:34) > > Caused by: java.io.IOException: Failed to restore state backend > > at > > > org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231) > > at > > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) > > at > > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) > > at > > > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.Exception: Exception while creating > > StreamOperatorStateContext. > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > > at > > > org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223) > > ... 6 more > > Caused by: org.apache.flink.util.FlinkException: Could not restore > keyed > > state backend for > > > f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1) > > from any of the 1 provided restore options. > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > > ... 7 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Failed > > when trying to restore heap backend > > at > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) > > at > > > org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) > > at > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > > ... 9 more > > Caused by: java.lang.NullPointerException > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254) > > at > > > org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154) > > at > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) > > ... 13 more > > > > > > > > > > > > When I debugged, it is showing that it is throwing > > NullPointerException at > > > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280 > > metaInfoSnapshot is null. I then checked what all kvStateId values > we are > > getting at > > > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277 > > . > > > > I also did stateMetaInfoSnapshot.getName() along with corresponding > > kvStateId and I saw that Beam is creating additional states > internally. It > > is giving > > > > stateMetaInfoSnapshot Name > > kvStateId > > pushed-back-elements > > 0 > > ivid_counts > > 1 > > watermark-holds > > 2 > > sortBuffer > > 3 > > pending-timers > > 4 > > sortBufferMinStamp > > 5 > > _timer_state/event_beam-timer > > 6 > > _timer_state/processing_beam-timer > > 7 > > > > Out of which ivid_counts state is created by us and rest of the > state is > > created by Beam. In one instance it is trying to retrieve kvStateId > 11570 > > and its not able to get any state from > > > https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277 > > giving stateMetaInfoSnapshot as NULL. > > > > I debugged further but couldn’t find the reason why it is trying to > read > > some non-existent state. > > > > > > Can you please help on this? > > > > > > Thanks, > > Sandeep > > > > > > >