Hi Till, Thanks. Yes, that is what I have been doing. But accessing GUI over VPN of Flink running on a yarn cluster on EMR sometime becomes very slow (not even execution plan gets shown :-) sometime), that's why I thought of this.
Thanks, +satish On Wed, Nov 16, 2016 at 6:46 PM, Till Rohrmann <till.rohrm...@gmail.com> wrote: > Hi Satish, > > I'm afraid but I think there is no such way to configure the name of the > checkpoint file for a task at the moment. For the latest checkpoint you can > see the state sizes for the individual subtask in the web ui under > checkpoints. > > Cheers, > Till > > On Tue, Nov 15, 2016 at 10:52 PM, Satish Chandra Gupta < > scgupt...@gmail.com> wrote: > >> Hi Ufuk and Till, >> >> Thanks a lot. Both these suggestions were useful. Older version of xerces >> was being loaded from one of the dependencies, and I also fixed the >> serialization glitch in my code, and now checkpointing works. >> >> I have 5 value states apart from a custom trigger, and a custom trigger. >> Is there anyway I can configure the filename in which these checkpoints are >> saved. For example in: >> >> <configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26 >> c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be >> >> Can I specify somewhere that is included in the file id. >> >> The reason I am asking is, one checkpoint file keeps growing and I >> suspect some bug in my logic that is causing memory leak, and I want to >> identify which value state is causing this. Any suggestions? >> >> Thanks, >> +satish >> >> >> On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Satish, >>> >>> your problem seems to be more related to a problem reading Hadoop's >>> configuration. According to the internet [1,2,3] try to select a proper >>> xerces version to resolve the problem. >>> >>> [1] http://stackoverflow.com/questions/26974067/org-apache-h >>> adoop-conf-configuration-loadresource-error >>> [2] http://stackoverflow.com/questions/27860361/issue-http-apach >>> e-org-xml-features-xinclude-testing-log4j-2 >>> [3] http://dmitrypukhov.pro/apache-spark-feature-httpapache- >>> orgxmlfeaturesxinclude-is-not-recognized/ >>> >>> Cheers, >>> Till >>> >>> On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta < >>> scgupt...@gmail.com> wrote: >>> >>>> Most of the ValueState I am using are of Long or Boolean, except one >>>> which is a map of Long to Scala case class: >>>> >>>> ValueState[Map[Long, AnScalaCaseClass]] >>>> >>>> >>>> Does this serialization happen only for the value state members of >>>> operators, or also other private fields? >>>> Thanks >>>> +satish >>>> >>>> On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <u...@apache.org> wrote: >>>> >>>>> There seems to be an Exception happening when Flink tries to serialize >>>>> the state of your operator (see the stack trace). >>>>> >>>>> What are you trying to store via the ValueState? Maybe you can share a >>>>> code excerpt? >>>>> >>>>> – Ufuk >>>>> >>>>> On 14 November 2016 at 10:51:06, Satish Chandra Gupta ( >>>>> scgupt...@gmail.com) wrote: >>>>> > Hi, >>>>> > >>>>> > I am using Value State, backed by FsStateBackend on hdfs, as >>>>> following: >>>>> > >>>>> > env.setStateBackend(new FsStateBackend(stateBackendPath)) >>>>> > env.enableCheckpointing(checkpointInterval) >>>>> > >>>>> > >>>>> > It is non-iterative job running Flink/Yarn. The job restarts at >>>>> > checkpointInterval, I have tried interval varying from 30 sec to 10 >>>>> min. >>>>> > Any idea why it could be restarting. >>>>> > >>>>> > I see following exception in the log: >>>>> > >>>>> > ====== >>>>> > >>>>> > 2016-11-14 09:24:28,787 INFO >>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>> > Source: Custom Source -> Map -> Filter -> cell_users_update (1/1) >>>>> > (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to >>>>> CANCELING >>>>> > 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager >>>>> > - Status of job 03a56958263a688dc34cc8d5069aac8f >>>>> > (Processor) changed to FAILING.*java.lang.RuntimeException: Error >>>>> > triggering a checkpoint as the result of receiving checkpoint >>>>> barrier* >>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>>>> t(StreamTask.java:701) >>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>>>> t(StreamTask.java:691) >>>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.processB >>>>> arrier(BarrierBuffer.java:203) >>>>> > at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextN >>>>> onBlocked(BarrierBuffer.java:129) >>>>> > at org.apache.flink.streaming.runtime.io.StreamTwoInputProcesso >>>>> r.processInput(StreamTwoInputProcessor.java:215) >>>>> > at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask. >>>>> run(TwoInputStreamTask.java:89) >>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>>>> treamTask.java:225) >>>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> > at java.lang.Thread.run(Thread.java:745) >>>>> > Caused by: com.esotericsoftware.kryo.KryoException: >>>>> > java.io.IOException: DataStreamer Exception: >>>>> > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) >>>>> > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali >>>>> zer.serialize(KryoSerializer.java:200) >>>>> > at org.apache.flink.runtime.state.filesystem.AbstractFsState.sn >>>>> apshot(AbstractFsState.java:85) >>>>> > at org.apache.flink.runtime.state.AbstractStateBackend.snapshot >>>>> PartitionedState(AbstractStateBackend.java:265) >>>>> > at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>>>> tor.snapshotOperatorState(AbstractStreamOperator.java:176) >>>>> > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >>>>> erator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) >>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask.performC >>>>> heckpoint(StreamTask.java:498) >>>>> > at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEven >>>>> t(StreamTask.java:695) >>>>> > ... 8 more >>>>> > Caused by: java.io.IOException: DataStreamer Exception: >>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO >>>>> utputStream.java:563) >>>>> > Caused by: java.lang.ExceptionInInitializerError >>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBl >>>>> ockOutputStream(DFSOutputStream.java:1322) >>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBloc >>>>> kOutputStream(DFSOutputStream.java:1266) >>>>> > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSO >>>>> utputStream.java:449) >>>>> > Caused by: java.lang.RuntimeException: >>>>> > javax.xml.parsers.ParserConfigurationException: Feature >>>>> > 'http://apache.org/xml/features/xinclude' is not recognized. >>>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat >>>>> ion.java:2648) >>>>> > at org.apache.hadoop.conf.Configuration.loadResources(Configura >>>>> tion.java:2492) >>>>> > at org.apache.hadoop.conf.Configuration.getProps(Configuration. >>>>> java:2405) >>>>> > at org.apache.hadoop.conf.Configuration.get(Configuration.java:981) >>>>> > at org.apache.hadoop.conf.Configuration.getTrimmed(Configuratio >>>>> n.java:1031) >>>>> > at org.apache.hadoop.conf.Configuration.getInt(Configuration.ja >>>>> va:1251) >>>>> > at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants >>>>> .java:76) >>>>> > ... 3 more >>>>> > Caused by: javax.xml.parsers.ParserConfigurationException: Feature >>>>> > 'http://apache.org/xml/features/xinclude' is not recognized. >>>>> > at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumen >>>>> tBuilder(Unknown >>>>> > Source) >>>>> > at org.apache.hadoop.conf.Configuration.loadResource(Configurat >>>>> ion.java:2530) >>>>> > ... 9 more >>>>> > 2016-11-14 09:24:28,789 INFO >>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>> > Source: Custom Source -> Map -> Filter -> device_status_update (1/1) >>>>> > (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to >>>>> CANCELING >>>>> > 2016-11-14 09:24:28,789 INFO >>>>> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>> > Source: Custom Source -> Map -> Filter -> Map -> Filter -> >>>>> > cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) >>>>> switched >>>>> > from RUNNING to CANCELING >>>>> > >>>>> > ====== >>>>> > >>>>> > Thanks, >>>>> > +satish >>>>> > >>>>> >>>>> >>>> >>> >> >