Hi Sandeep,

I've stitched together a little working example of how to read Beam State
using State Processor API. It's basically a thin wrapper around the
existing API. From the example you can see, that accessing Beam state is
far more complicated as it's another layer on top of Flink state
primitives. Another notable difference is that Beam uses custom state
namespaces.

Having a toolkit for simple reading of Beam savepoints would definitely be
a great addition to the existing Beam's Flink Runner ecosystem. I'm ccing
dev@beam..., if someone from the community would be interested in working
on this topic.

https://github.com/dmvk/beam-flink-state-processor/blob/master/src/main/java/org/apache/dmvk/beam/BeamSavepoint.java
https://github.com/dmvk/beam-flink-state-processor/blob/master/src/test/java/org/apache/dmvk/beam/BeamSavepointTest.java#L37

Best,
D.

On Wed, Aug 4, 2021 at 9:03 AM David Morávek <d...@apache.org> wrote:

> Hi Sandeep, thanks for the example, I'll take a look into it and will get
> back to you ;)
>
> On Tue, Aug 3, 2021 at 9:44 PM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi David,
>>                 Thanks for the reply. I tried with Beam 2.29 and Flink
>> 1.12 and still getting NullPointerException like before. I changed the code
>> a bit to remove all the proprietary software used in our company and able
>> to reproduce the issue with local Kafka, Beam with Flink runner running
>> locally.
>>
>>                Beam Flink runner code:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/processor/Processor.java
>>                Local Kafka producer:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/kafka/SimpleKafkaProducer.java
>>         Reading state using State processor API:
>> https://github.com/kathulasandeep/beam-flink-stateful/blob/master/src/main/java/org/sandeep/readstate/StateReader.java
>>
>> Thanks,
>> Sandeep
>>
>>
>> On 7/27/21, 10:10 AM, "David Morávek" <d...@apache.org> wrote:
>>
>>     This email is from an external sender.
>>
>>
>>     Hi Sandeep,
>>
>>     In general I'd say it will be tricky to read Beam state this way as it
>>     doesn't use Flink primitives, but it's writing state in custom binary
>>     format (it can be de-serialized, but it's not easy to put all of the
>> pieces
>>     together).
>>
>>     Can you please share an example code of how you're reading the state?
>> Also
>>     can please you try this with latest Beam / Flink versions (the ones
>> you're
>>     using are no longer supported)?
>>
>>     Best,
>>     D.
>>
>>     On Tue, Jul 27, 2021 at 5:46 PM Kathula, Sandeep
>>     <sandeep_kath...@intuit.com.invalid> wrote:
>>
>>     > Hi,
>>     >      We have a simple Beam application like a work count running
>> with
>>     > Flink runner (Beam 2.26 and Flink 1.9). We are using Beam’s value
>> state. I
>>     > am trying to read the state from savepoint using  Flink's State
>> Processor
>>     > API but getting a NullPointerException. Converted the whole code
>> into Pure
>>     > Flink application, created a savepoint and tried to read the state
>> where we
>>     > are able to read the state successfully.
>>     >
>>     > Exception Stack trace:
>>     >
>>     > Exception in thread "main"
>>     > org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>     >                 at
>>     >
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>     >                 at
>>     >
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)
>>     >                 at
>>     >
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:222)
>>     >                 at
>>     >
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
>>     >                 at
>>     >
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>>     >                 at
>>     > org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>     >                 at
>>     > com.intuit.spp.example.StateReader.main(StateReader.java:34)
>>     > Caused by: java.io.IOException: Failed to restore state backend
>>     >                 at
>>     >
>> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231)
>>     >                 at
>>     >
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>>     >                 at
>>     >
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
>>     >                 at
>>     >
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>     >                 at
>>     > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     >                 at
>>     > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     >                 at java.lang.Thread.run(Thread.java:748)
>>     > Caused by: java.lang.Exception: Exception while creating
>>     > StreamOperatorStateContext.
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>     >                 at
>>     >
>> org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223)
>>     >                 ... 6 more
>>     > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> keyed
>>     > state backend for
>>     >
>> f25cb861abbd020d3595d47c5d53d3fd_f25cb861abbd020d3595d47c5d53d3fd_(1/1)
>>     > from any of the 1 provided restore options.
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>>     >                 ... 7 more
>>     > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Failed
>>     > when trying to restore heap backend
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>     >                 at
>>     >
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>     >                 ... 9 more
>>     > Caused by: java.lang.NullPointerException
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
>>     >                 at
>>     >
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>>     >                 ... 13 more
>>     >
>>     >
>>     >
>>     >
>>     >
>>     >             When I debugged, it is showing that it is throwing
>>     > NullPointerException at
>>     >
>> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L280
>>     > metaInfoSnapshot is null. I then checked what all kvStateId values
>> we are
>>     > getting at
>>     >
>> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
>>     > .
>>     >
>>     > I also did stateMetaInfoSnapshot.getName() along with corresponding
>>     > kvStateId and I saw that Beam is creating additional states
>> internally. It
>>     > is giving
>>     >
>>     > stateMetaInfoSnapshot Name
>>     > kvStateId
>>     > pushed-back-elements
>>     > 0
>>     > ivid_counts
>>     > 1
>>     > watermark-holds
>>     > 2
>>     > sortBuffer
>>     > 3
>>     > pending-timers
>>     > 4
>>     > sortBufferMinStamp
>>     > 5
>>     > _timer_state/event_beam-timer
>>     > 6
>>     > _timer_state/processing_beam-timer
>>     > 7
>>     >
>>     > Out of which ivid_counts state is created by us and rest of the
>> state is
>>     > created by Beam. In one instance it is trying to retrieve kvStateId
>> 11570
>>     > and its not able to get any state from
>>     >
>> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L277
>>     > giving stateMetaInfoSnapshot as NULL.
>>     >
>>     > I debugged further but couldn’t find the reason why it is trying to
>> read
>>     > some non-existent state.
>>     >
>>     >
>>     > Can you please help on this?
>>     >
>>     >
>>     > Thanks,
>>     > Sandeep
>>     >
>>     >
>>
>>
>>

Reply via email to