Hi Ufuk and Till, Thanks a lot. Both these suggestions were useful. Older version of xerces was being loaded from one of the dependencies, and I also fixed the serialization glitch in my code, and now checkpointing works.
I have 5 value states apart from a custom trigger, and a custom trigger. Is there anyway I can configure the filename in which these checkpoints are saved. For example in: <configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be Can I specify somewhere that is included in the file id. The reason I am asking is, one checkpoint file keeps growing and I suspect some bug in my logic that is causing memory leak, and I want to identify which value state is causing this. Any suggestions? Thanks, +satish On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Satish, > > your problem seems to be more related to a problem reading Hadoop's > configuration. According to the internet [1,2,3] try to select a proper > xerces version to resolve the problem. > > [1] http://stackoverflow.com/questions/26974067/org-apache- > hadoop-conf-configuration-loadresource-error > [2] http://stackoverflow.com/questions/27860361/issue-http- > apache-org-xml-features-xinclude-testing-log4j-2 > [3] http://dmitrypukhov.pro/apache-spark-feature-httpapache- > orgxmlfeaturesxinclude-is-not-recognized/ > > Cheers, > Till > > On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <scgupt...@gmail.com > > wrote: > >> Most of the ValueState I am using are of Long or Boolean, except one >> which is a map of Long to Scala case class: >> >> ValueState[Map[Long, AnScalaCaseClass]] >> >> >> Does this serialization happen only for the value state members of >> operators, or also other private fields? >> Thanks >> +satish >> >> On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <u...@apache.org> wrote: >> >>> There seems to be an Exception happening when Flink tries to serialize >>> the state of your operator (see the stack trace). >>> >>> What are you trying to store via the ValueState? Maybe you can share a >>> code excerpt? >>> >>> – Ufuk >>> >>> On 14 November 2016 at 10:51:06, Satish Chandra Gupta ( >>> scgupt...@gmail.com) wrote: >>> > Hi, >>> > >>> > I am using Value State, backed by FsStateBackend on hdfs, as following: >>> > >>> > env.setStateBackend(new FsStateBackend(stateBackendPath)) >>> > env.enableCheckpointing(checkpointInterval) >>> > >>> > >>> > It is non-iterative job running Flink/Yarn. The job restarts at >>> > checkpointInterval, I have tried interval varying from 30 sec to 10 >>> min. >>> > Any idea why it could be restarting. >>> > >>> > I see following exception in the log: >>> > >>> > ====== >>> > >>> > 2016-11-14 09:24:28,787 INFO >>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1) >>> > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING >>> > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager >>> > - Status of job 03a56958263a688dc34cc8d5069aac8f >>> > (Processor) changed to FAILING.*java.lang.RuntimeException: Error >>> > triggering a checkpoint as the result of receiving checkpoint barrier* >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>> t(StreamTask.java:701) >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>> t(StreamTask.java:691) >>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.processB >>> arrier(BarrierBuffer.java:203) >>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextN >>> onBlocked(BarrierBuffer.java:129) >>> > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso >>> r.processInput(StreamTwoInputProcessor.java:215) >>> > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask. >>> run(TwoInputStreamTask.java:89) >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>> treamTask.java:225) >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> > at java.lang.Thread.run(Thread.java:745) >>> > Caused by: com.esotericsoftware.kryo.KryoException: >>> > java.io.IOException: DataStreamer Exception: >>> > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) >>> > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >>> zer.serialize(KryoSerializer.java:200) >>> > at org.apache.flink.runtime.state.filesystem.AbstractFsState.sn >>> apshot(AbstractFsState.java:85) >>> > at org.apache.flink.runtime.state.AbstractStateBackend.snapshot >>> PartitionedState(AbstractStateBackend.java:265) >>> > at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>> tor.snapshotOperatorState(AbstractStreamOperator.java:176) >>> > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >>> erator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.performC >>> heckpoint(StreamTask.java:498) >>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>> t(StreamTask.java:695) >>> > ... 8 more >>> > Caused by: java.io.IOException: DataStreamer Exception: >>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO >>> utputStream.java:563) >>> > Caused by: java.lang.ExceptionInInitializerError >>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBl >>> ockOutputStream(DFSOutputStream.java:1322) >>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBloc >>> kOutputStream(DFSOutputStream.java:1266) >>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO >>> utputStream.java:449) >>> > Caused by: java.lang.RuntimeException: >>> > javax.xml.parsers.ParserConfigurationException: Feature >>> > 'http://apache.org/xml/features/xinclude' is not recognized. >>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat >>> ion.java:2648) >>> > at org.apache.hadoop.conf.Configuration.loadResources(Configura >>> tion.java:2492) >>> > at org.apache.hadoop.conf.Configuration.getProps(Configuration. >>> java:2405) >>> > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) >>> > at org.apache.hadoop.conf.Configuration.getTrimmed(Configuratio >>> n.java:1031) >>> > at org.apache.hadoop.conf.Configuration.getInt(Configuration.ja >>> va:1251) >>> > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants >>> .java:76) >>> > ... 3 more >>> > Caused by: javax.xml.parsers.ParserConfigurationException: Feature >>> > 'http://apache.org/xml/features/xinclude' is not recognized. >>> > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumen >>> tBuilder(Unknown >>> > Source) >>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat >>> ion.java:2530) >>> > ... 9 more >>> > 2016-11-14 09:24:28,789 INFO >>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> > Source: Custom Source -> Map -> Filter -> device_status_update (1/1) >>> > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING >>> > 2016-11-14 09:24:28,789 INFO >>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>> > Source: Custom Source -> Map -> Filter -> Map -> Filter -> >>> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched >>> > from RUNNING to CANCELING >>> > >>> > ====== >>> > >>> > Thanks, >>> > +satish >>> > >>> >>> >> >