Hi Sandeep, thanks for the example, I'll take a look into it and will get
back to you ;)

On Tue, Aug 3, 2021 at 9:44 PM Kathula, Sandeep <sandeep_kath...@intuit.com>
wrote:

> Hi David,
>                 Thanks for the reply. I tried with Beam 2.29 and Flink
> 1.12 and still getting NullPointerException like before. I changed the code
> a bit to remove all the proprietary software used in our company and able
> to reproduce the issue with local Kafka, Beam with Flink runner running
> locally.
>
>                Beam Flink runner code:
> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/processor/Processor.java
>                Local Kafka producer:
> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/kafka/SimpleKafkaProducer.java
>         Reading state using State processor API:
> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/readstate/StateReader.java
>
> Thanks,
> Sandeep
>
>
> On 7/27/21, 10:10 AM, "David Morávek" <d...@apache.org> wrote:
>
>     This email is from an external sender.
>
>
>     Hi Sandeep,
>
>     In general I'd say it will be tricky to read Beam state this way as it
>     doesn't use Flink primitives, but it's writing state in custom binary
>     format (it can be de-serialized, but it's not easy to put all of the
> pieces
>     together).
>
>     Can you please share an example code of how you're reading the state?
> Also
>     can please you try this with latest Beam / Flink versions (the ones
> you're
>     using are no longer supported)?
>
>     Best,
>     D.
>
>     On Tue, Jul 27, 2021 at 5:46 PM Kathula, Sandeep
>     <sandeep_kath...@intuit.com.invalid> wrote:
>
>     > Hi,
>     >      We have a simple Beam application like a work count running with
>     > Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value
> state. I
>     > am trying to read the state from savepoint using  Flink's State
> Processor
>     > API but getting a NullPointerException. Converted the whole code
> into Pure
>     > Flink application, created a savepoint and tried to read the state
> where we
>     > are able to read the state successfully.
>     >
>     > Exception Stack trace:
>     >
>     > Exception in thread "main"
>     > org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>     >                 at
>     >
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>     >                 at
>     >
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>     >                 at
>     >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>     >                 at
>     >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>     >                 at
>     >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>     >                 at
>     > org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>     >                 at
>     > com.intuit.spp.example.StateReader.main(StateReader.java:34)
>     > Caused by: java.io.IOException: Failed to restore state backend
>     >                 at
>     >
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
>     >                 at
>     >
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>     >                 at
>     >
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
>     >                 at
>     >
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>     >                 at
>     > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     >                 at
>     > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     >                 at java.lang.Thread.run(Thread.java:748)
>     > Caused by: java.lang.Exception: Exception while creating
>     > StreamOperatorStateContext.
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>     >                 at
>     >
> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
>     >                 ... 6 more
>     > Caused by: org.apache.flink.util.FlinkException: Could not restore
> keyed
>     > state backend for
>     >
> f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1)
>     > from any of the 1 provided restore options.
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>     >                 ... 7 more
>     > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Failed
>     > when trying to restore heap backend
>     >                 at
>     >
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>     >                 at
>     >
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>     >                 at
>     >
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>     >                 ... 9 more
>     > Caused by: java.lang.NullPointerException
>     >                 at
>     >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>     >                 at
>     >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>     >                 at
>     >
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
>     >                 at
>     >
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>     >                 ... 13 more
>     >
>     >
>     >
>     >
>     >
>     >             When I debugged, it is showing that it is throwing
>     > NullPointerException at
>     >
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
>     > metaInfoSnapshot is null. I then checked what all kvStateId values
> we are
>     > getting at
>     >
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
>     > .
>     >
>     > I also did stateMetaInfoSnapshot.getName() along with corresponding
>     > kvStateId and I saw that Beam is creating additional states
> internally. It
>     > is giving
>     >
>     > stateMetaInfoSnapshot Name
>     > kvStateId
>     > pushed-back-elements
>     > 0
>     > ivid_counts
>     > 1
>     > watermark-holds
>     > 2
>     > sortBuffer
>     > 3
>     > pending-timers
>     > 4
>     > sortBufferMinStamp
>     > 5
>     > _timer_state/event_beam-timer
>     > 6
>     > _timer_state/processing_beam-timer
>     > 7
>     >
>     > Out of which ivid_counts state is created by us and rest of the
> state is
>     > created by Beam. In one instance it is trying to retrieve kvStateId
> 11570
>     > and its not able to get any state from
>     >
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
>     > giving stateMetaInfoSnapshot as NULL.
>     >
>     > I debugged further but couldn’t find the reason why it is trying to
> read
>     > some non-existent state.
>     >
>     >
>     > Can you please help on this?
>     >
>     >
>     > Thanks,
>     > Sandeep
>     >
>     >
>
>
>

Reply via email to