Thanks everyone, and thank you very much Seth! Adding @transient to the lazy vals is what I needed.
On Mon, Oct 9, 2017 at 1:34 PM, Seth Wiesman <swies...@mediamath.com> wrote: > A scala class contains a single lazy val it is implemented using a boolean > flag to track if the field has been evaluated. When a class contains, > multiple lazy val’s it is implemented as a bit mask shared amongst the > variables. This can lead to inconsistencies as to whether serialization > forces evaluation of the field, in general lazy val’s should always be > marked @transient for expected behavior. > > > > Seth > > > > *From: *Stephan Ewen <se...@apache.org> > *Date: *Monday, October 9, 2017 at 2:44 PM > *To: *Kostas Kloudas <k.klou...@data-artisans.com> > *Cc: *Colin Williams <colin.williams.seat...@gmail.com>, user < > user@flink.apache.org> > *Subject: *Re: serialization error when using multiple metrics counters > > > > Interesting, is there a quirk in Scala that using multiple lazy variables > results possibly in eager initialization of some? > > > > On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > > Hi Colin, > > > > Are you initializing your counters from within the open() method of you > rich function? > > In other words, are you calling > > > > counter = getRuntimeContext.getMetricGroup.counter(“my counter”) > > > > from within the open(). > > > > The counter interface is not serializable. So if you instantiate the > counters outside the open(), > > when Flink tries to ship your code to the cluster, it cannot so you get > the exception. > > > > You can have a look at the docs for an example: > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/ > metrics.html > > > > Thanks, > > Kostas > > > > On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail. > com> wrote: > > > > I've created a RichMapFunction in scala with multiple counters like: > > > > lazy val successCounter = getRuntimeContext.getMetricGroup.counter(" > successfulParse") > > lazy val failedCounter = getRuntimeContext.getMetricGroup.counter(" > failedParse") > > lazy val errorCounter = getRuntimeContext.getMetricGroup.counter(" > errorParse") > > > > which I increment in the map function. While testing I noticed that I have > no issues with using a single counter. However with multiple counters I get > a serialization error using more than one counter. > > > > Does anyone know how I can use multiple counters from my RichMapFunction, > or what I'm doing wrong? > > > > [info] org.apache.flink.api.common.InvalidProgramException: The > implementation of the RichMapFunction is not serializable. The object > probably contains or references non serializable fields. > > [info] at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:100) > > [info] at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) > > [info] at org.apache.flink.streaming.api.datastream.DataStream. > clean(DataStream.java:183) > > [info] at org.apache.flink.streaming.api.datastream.DataStream.map( > DataStream.java:527) > > [info] at org.apache.flink.streaming.api.scala.DataStream.map( > DataStream.scala:581) > > [info] at ParsedResultUnwrapperTest$$anonfun$2.apply( > ParsedResultUnwrapperTest.scala:27) > > [info] at ParsedResultUnwrapperTest$$anonfun$2.apply( > ParsedResultUnwrapperTest.scala:23) > > [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > > [info] ... > > [info] Cause: java.io.NotSerializableException: > org.apache.flink.metrics.SimpleCounter > > [info] at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1184) > > [info] at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > > [info] at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > > [info] at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > > [info] at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > > [info] at java.io.ObjectOutputStream.writeObject( > ObjectOutputStream.java:348) > > [info] at org.apache.flink.util.InstantiationUtil.serializeObject( > InstantiationUtil.java:315) > > [info] at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:81) > > [info] at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) > > [info] at org.apache.flink.streaming.api.datastream.DataStream. > clean(DataStream.java:183) > > [info] ... > > [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for > a Error -> ParseResult[LineProtocol] *** FAILED *** > > [info] org.apache.flink.api.common.InvalidProgramException: The > implementation of the RichMapFunction is not serializable. The object > probably contains or references non serializable fields. > > [info] at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:100) > > [info] at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) > > [info] at org.apache.flink.streaming.api.datastream.DataStream. > clean(DataStream.java:183) > > [info] at org.apache.flink.streaming.api.datastream.DataStream.map( > DataStream.java:527) > > [info] at org.apache.flink.streaming.api.scala.DataStream.map( > DataStream.scala:581) > > [info] at ParsedResultUnwrapperTest$$anonfun$3.apply( > ParsedResultUnwrapperTest.scala:37) > > [info] at ParsedResultUnwrapperTest$$anonfun$3.apply( > ParsedResultUnwrapperTest.scala:32) > > [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > > [info] ... > > [info] Cause: java.io.NotSerializableException: > org.apache.flink.metrics.SimpleCounter > > [info] at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1184) > > [info] at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > > [info] at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > > [info] at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > > [info] at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > > [info] at java.io.ObjectOutputStream.writeObject( > ObjectOutputStream.java:348) > > [info] at org.apache.flink.util.InstantiationUtil.serializeObject( > InstantiationUtil.java:315) > > [info] at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:81) > > [info] at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) > > [info] at org.apache.flink.streaming.api.datastream.DataStream. > clean(DataStream.java:183) > > [info] ... > > > > >