There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).
What are you trying to store via the ValueState? Maybe you can share a code excerpt? – Ufuk On 14 November 2016 at 10:51:06, Satish Chandra Gupta (scgupt...@gmail.com) wrote: > Hi, > > I am using Value State, backed by FsStateBackend on hdfs, as following: > > env.setStateBackend(new FsStateBackend(stateBackendPath)) > env.enableCheckpointing(checkpointInterval) > > > It is non-iterative job running Flink/Yarn. The job restarts at > checkpointInterval, I have tried interval varying from 30 sec to 10 min. > Any idea why it could be restarting. > > I see following exception in the log: > > ====== > > 2016-11-14 09:24:28,787 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1) > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager > - Status of job 03a56958263a688dc34cc8d5069aac8f > (Processor) changed to FAILING.*java.lang.RuntimeException: Error > triggering a checkpoint as the result of receiving checkpoint barrier* > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691) > > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215) > > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: com.esotericsoftware.kryo.KryoException: > java.io.IOException: DataStreamer Exception: > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200) > > at > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) > > at > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695) > > ... 8 more > Caused by: java.io.IOException: DataStreamer Exception: > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563) > > Caused by: java.lang.ExceptionInInitializerError > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) > > Caused by: java.lang.RuntimeException: > javax.xml.parsers.ParserConfigurationException: Feature > 'http://apache.org/xml/features/xinclude' is not recognized. > at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648) > > at > org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492) > at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405) > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) > at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031) > at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251) > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76) > ... 3 more > Caused by: javax.xml.parsers.ParserConfigurationException: Feature > 'http://apache.org/xml/features/xinclude' is not recognized. > at > org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown > Source) > at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530) > > ... 9 more > 2016-11-14 09:24:28,789 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: Custom Source -> Map -> Filter -> device_status_update (1/1) > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING > 2016-11-14 09:24:28,789 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: Custom Source -> Map -> Filter -> Map -> Filter -> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched > from RUNNING to CANCELING > > ====== > > Thanks, > +satish >