Most of the ValueState I am using are of Long or Boolean, except one which
is a map of Long to Scala case class:

ValueState[Map[Long, AnScalaCaseClass]]


Does this serialization happen only for the value state members of
operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <u...@apache.org> wrote:

> There seems to be an Exception happening when Flink tries to serialize the
> state of your operator (see the stack trace).
>
> What are you trying to store via the ValueState? Maybe you can share a
> code excerpt?
>
> – Ufuk
>
> On 14 November 2016 at 10:51:06, Satish Chandra Gupta (scgupt...@gmail.com)
> wrote:
> > Hi,
> >
> > I am using Value State, backed by FsStateBackend on hdfs, as following:
> >
> > env.setStateBackend(new FsStateBackend(stateBackendPath))
> > env.enableCheckpointing(checkpointInterval)
> >
> >
> > It is non-iterative job running Flink/Yarn. The job restarts at
> > checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> > Any idea why it could be restarting.
> >
> > I see following exception in the log:
> >
> > ======
> >
> > 2016-11-14 09:24:28,787 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
> > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> > - Status of job 03a56958263a688dc34cc8d5069aac8f
> > (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> > triggering a checkpoint as the result of receiving checkpoint barrier*
> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.
> onEvent(StreamTask.java:701)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.
> onEvent(StreamTask.java:691)
> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBarrier(BarrierBuffer.java:203)
> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> getNextNonBlocked(BarrierBuffer.java:129)
> > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.
> processInput(StreamTwoInputProcessor.java:215)
> > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(
> TwoInputStreamTask.java:89)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:225)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: com.esotericsoftware.kryo.KryoException:
> > java.io.IOException: DataStreamer Exception:
> > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> > at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.serialize(KryoSerializer.java:200)
> > at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(
> AbstractFsState.java:85)
> > at org.apache.flink.runtime.state.AbstractStateBackend.
> snapshotPartitionedState(AbstractStateBackend.java:265)
> > at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> snapshotOperatorState(AbstractStreamOperator.java:176)
> > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:498)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.
> onEvent(StreamTask.java:695)
> > ... 8 more
> > Caused by: java.io.IOException: DataStreamer Exception:
> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> run(DFSOutputStream.java:563)
> > Caused by: java.lang.ExceptionInInitializerError
> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> createBlockOutputStream(DFSOutputStream.java:1322)
> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> nextBlockOutputStream(DFSOutputStream.java:1266)
> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> run(DFSOutputStream.java:449)
> > Caused by: java.lang.RuntimeException:
> > javax.xml.parsers.ParserConfigurationException: Feature
> > 'http://apache.org/xml/features/xinclude' is not recognized.
> > at org.apache.hadoop.conf.Configuration.loadResource(
> Configuration.java:2648)
> > at org.apache.hadoop.conf.Configuration.loadResources(
> Configuration.java:2492)
> > at org.apache.hadoop.conf.Configuration.getProps(
> Configuration.java:2405)
> > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> > at org.apache.hadoop.conf.Configuration.getTrimmed(
> Configuration.java:1031)
> > at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> > ... 3 more
> > Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> > 'http://apache.org/xml/features/xinclude' is not recognized.
> > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.
> newDocumentBuilder(Unknown
> > Source)
> > at org.apache.hadoop.conf.Configuration.loadResource(
> Configuration.java:2530)
> > ... 9 more
> > 2016-11-14 09:24:28,789 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
> > Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> > 2016-11-14 09:24:28,789 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph -
> > Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> > from RUNNING to CANCELING
> >
> > ======
> >
> > Thanks,
> > +satish
> >
>
>

Reply via email to