A scala class contains a single lazy val it is implemented using a boolean flag to track if the field has been evaluated. When a class contains, multiple lazy val’s it is implemented as a bit mask shared amongst the variables. This can lead to inconsistencies as to whether serialization forces evaluation of the field, in general lazy val’s should always be marked @transient for expected behavior.
Seth From: Stephan Ewen <se...@apache.org> Date: Monday, October 9, 2017 at 2:44 PM To: Kostas Kloudas <k.klou...@data-artisans.com> Cc: Colin Williams <colin.williams.seat...@gmail.com>, user <user@flink.apache.org> Subject: Re: serialization error when using multiple metrics counters Interesting, is there a quirk in Scala that using multiple lazy variables results possibly in eager initialization of some? On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <k.klou...@data-artisans.com<mailto:k.klou...@data-artisans.com>> wrote: Hi Colin, Are you initializing your counters from within the open() method of you rich function? In other words, are you calling counter = getRuntimeContext.getMetricGroup.counter(“my counter”) from within the open(). The counter interface is not serializable. So if you instantiate the counters outside the open(), when Flink tries to ship your code to the cluster, it cannot so you get the exception. You can have a look at the docs for an example: https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html Thanks, Kostas On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seat...@gmail.com<mailto:colin.williams.seat...@gmail.com>> wrote: I've created a RichMapFunction in scala with multiple counters like: lazy val successCounter = getRuntimeContext.getMetricGroup.counter("successfulParse") lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("failedParse") lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("errorParse") which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter. Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong? [info] org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields. [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) [info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) [info] at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527) [info] at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581) [info] at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27) [info] at ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] ... [info] Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) [info] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) [info] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) [info] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) [info] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) [info] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) [info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) [info] ... [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED *** [info] org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields. [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) [info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) [info] at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527) [info] at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581) [info] at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37) [info] at ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] ... [info] Cause: java.io<http://java.io/>.NotSerializableException: org.apache.flink.metrics.SimpleCounter [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) [info] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) [info] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) [info] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) [info] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) [info] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) [info] at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) [info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) [info] ...