A scala class contains a single lazy val it is implemented using a boolean flag 
to track if the field has been evaluated. When a class contains, multiple lazy 
val’s it is implemented as a bit mask shared amongst the variables. This can 
lead to inconsistencies as to whether serialization forces evaluation of the 
field, in general lazy val’s should always be marked @transient for expected 
behavior.

Seth

From: Stephan Ewen <se...@apache.org>
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas <k.klou...@data-artisans.com>
Cc: Colin Williams <colin.williams.seat...@gmail.com>, user 
<user@flink.apache.org>
Subject: Re: serialization error when using multiple metrics counters

Interesting, is there a quirk in Scala that using multiple lazy variables 
results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas 
<k.klou...@data-artisans.com<mailto:k.klou...@data-artisans.com>> wrote:
Hi Colin,

Are you initializing your counters from within the open() method of you rich 
function?
In other words, are you calling

counter = getRuntimeContext.getMetricGroup.counter(“my counter”)

from within the open().

The counter interface is not serializable. So if you instantiate the counters 
outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the 
exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams 
<colin.williams.seat...@gmail.com<mailto:colin.williams.seat...@gmail.com>> 
wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = 
getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = 
getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = 
getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no 
issues with using a single counter. However with multiple counters I get a 
serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or 
what I'm doing wrong?

[info]   
org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException:
 The implementation of the RichMapFunction is not serializable. The object 
probably contains or references non serializable fields.
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at 
org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: 
org.apache.flink.metrics.SimpleCounter
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a 
Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   
org.apache.flink.api.common.In<http://org.apache.flink.api.common.In>validProgramException:
 The implementation of the RichMapFunction is not serializable. The object 
probably contains or references non serializable fields.
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at 
org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io<http://java.io/>.NotSerializableException: 
org.apache.flink.metrics.SimpleCounter
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...


Reply via email to