Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]] Does this serialization happen only for the value state members of operators, or also other private fields? Thanks +satish On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <u...@apache.org> wrote: > There seems to be an Exception happening when Flink tries to serialize the > state of your operator (see the stack trace). > > What are you trying to store via the ValueState? Maybe you can share a > code excerpt? > > – Ufuk > > On 14 November 2016 at 10:51:06, Satish Chandra Gupta (scgupt...@gmail.com) > wrote: > > Hi, > > > > I am using Value State, backed by FsStateBackend on hdfs, as following: > > > > env.setStateBackend(new FsStateBackend(stateBackendPath)) > > env.enableCheckpointing(checkpointInterval) > > > > > > It is non-iterative job running Flink/Yarn. The job restarts at > > checkpointInterval, I have tried interval varying from 30 sec to 10 min. > > Any idea why it could be restarting. > > > > I see following exception in the log: > > > > ====== > > > > 2016-11-14 09:24:28,787 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - > > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1) > > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING > > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager > > - Status of job 03a56958263a688dc34cc8d5069aac8f > > (Processor) changed to FAILING.*java.lang.RuntimeException: Error > > triggering a checkpoint as the result of receiving checkpoint barrier* > > at org.apache.flink.streaming.runtime.tasks.StreamTask$2. > onEvent(StreamTask.java:701) > > at org.apache.flink.streaming.runtime.tasks.StreamTask$2. > onEvent(StreamTask.java:691) > > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > processBarrier(BarrierBuffer.java:203) > > at org.apache.flink.streaming.runtime.io.BarrierBuffer. > getNextNonBlocked(BarrierBuffer.java:129) > > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor. > processInput(StreamTwoInputProcessor.java:215) > > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run( > TwoInputStreamTask.java:89) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:225) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: com.esotericsoftware.kryo.KryoException: > > java.io.IOException: DataStreamer Exception: > > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) > > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.serialize(KryoSerializer.java:200) > > at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot( > AbstractFsState.java:85) > > at org.apache.flink.runtime.state.AbstractStateBackend. > snapshotPartitionedState(AbstractStateBackend.java:265) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > snapshotOperatorState(AbstractStreamOperator.java:176) > > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > snapshotOperatorState(AbstractUdfStreamOperator.java:121) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > performCheckpoint(StreamTask.java:498) > > at org.apache.flink.streaming.runtime.tasks.StreamTask$2. > onEvent(StreamTask.java:695) > > ... 8 more > > Caused by: java.io.IOException: DataStreamer Exception: > > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > run(DFSOutputStream.java:563) > > Caused by: java.lang.ExceptionInInitializerError > > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > createBlockOutputStream(DFSOutputStream.java:1322) > > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > nextBlockOutputStream(DFSOutputStream.java:1266) > > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > run(DFSOutputStream.java:449) > > Caused by: java.lang.RuntimeException: > > javax.xml.parsers.ParserConfigurationException: Feature > > 'http://apache.org/xml/features/xinclude' is not recognized. > > at org.apache.hadoop.conf.Configuration.loadResource( > Configuration.java:2648) > > at org.apache.hadoop.conf.Configuration.loadResources( > Configuration.java:2492) > > at org.apache.hadoop.conf.Configuration.getProps( > Configuration.java:2405) > > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) > > at org.apache.hadoop.conf.Configuration.getTrimmed( > Configuration.java:1031) > > at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251) > > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76) > > ... 3 more > > Caused by: javax.xml.parsers.ParserConfigurationException: Feature > > 'http://apache.org/xml/features/xinclude' is not recognized. > > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl. > newDocumentBuilder(Unknown > > Source) > > at org.apache.hadoop.conf.Configuration.loadResource( > Configuration.java:2530) > > ... 9 more > > 2016-11-14 09:24:28,789 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - > > Source: Custom Source -> Map -> Filter -> device_status_update (1/1) > > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING > > 2016-11-14 09:24:28,789 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - > > Source: Custom Source -> Map -> Filter -> Map -> Filter -> > > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched > > from RUNNING to CANCELING > > > > ====== > > > > Thanks, > > +satish > > > >