Thanks everyone, and thank you very much Seth! Adding @transient to the
lazy vals is what I needed.

On Mon, Oct 9, 2017 at 1:34 PM, Seth Wiesman <swies...@mediamath.com> wrote:

> A scala class contains a single lazy val it is implemented using a boolean
> flag to track if the field has been evaluated. When a class contains,
> multiple lazy val’s it is implemented as a bit mask shared amongst the
> variables. This can lead to inconsistencies as to whether serialization
> forces evaluation of the field, in general lazy val’s should always be
> marked @transient for expected behavior.
>
>
>
> Seth
>
>
>
> *From: *Stephan Ewen <se...@apache.org>
> *Date: *Monday, October 9, 2017 at 2:44 PM
> *To: *Kostas Kloudas <k.klou...@data-artisans.com>
> *Cc: *Colin Williams <colin.williams.seat...@gmail.com>, user <
> user@flink.apache.org>
> *Subject: *Re: serialization error when using multiple metrics counters
>
>
>
> Interesting, is there a quirk in Scala that using multiple lazy variables
> results possibly in eager initialization of some?
>
>
>
> On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
> Hi Colin,
>
>
>
> Are you initializing your counters from within the open() method of you
> rich function?
>
> In other words, are you calling
>
>
>
> counter = getRuntimeContext.getMetricGroup.counter(“my counter”)
>
>
>
> from within the open().
>
>
>
> The counter interface is not serializable. So if you instantiate the
> counters outside the open(),
>
> when Flink tries to ship your code to the cluster, it cannot so you get
> the exception.
>
>
>
> You can have a look at the docs for an example:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/
> metrics.html
>
>
>
> Thanks,
>
> Kostas
>
>
>
> On Oct 7, 2017, at 11:34 PM, Colin Williams <colin.williams.seattle@gmail.
> com> wrote:
>
>
>
> I've created a RichMapFunction in scala with multiple counters like:
>
>
>
>    lazy val successCounter = getRuntimeContext.getMetricGroup.counter("
> successfulParse")
>
>    lazy val failedCounter = getRuntimeContext.getMetricGroup.counter("
> failedParse")
>
>    lazy val errorCounter = getRuntimeContext.getMetricGroup.counter("
> errorParse")
>
>
>
> which I increment in the map function. While testing I noticed that I have
> no issues with using a single counter. However with multiple counters I get
> a serialization error using more than one counter.
>
>
>
> Does anyone know how I can use multiple counters from my RichMapFunction,
> or what I'm doing wrong?
>
>
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
> DataStream.java:527)
>
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(
> DataStream.scala:581)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:27)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
> ParsedResultUnwrapperTest.scala:23)
>
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> [info]   ...
>
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> [info]   at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
> [info]   at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   ...
>
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for
> a Error -> ParseResult[LineProtocol] *** FAILED ***
>
> [info]   org.apache.flink.api.common.InvalidProgramException: The
> implementation of the RichMapFunction is not serializable. The object
> probably contains or references non serializable fields.
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
> DataStream.java:527)
>
> [info]   at org.apache.flink.streaming.api.scala.DataStream.map(
> DataStream.scala:581)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
> ParsedResultUnwrapperTest.scala:37)
>
> [info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
> ParsedResultUnwrapperTest.scala:32)
>
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> [info]   ...
>
> [info]   Cause: java.io.NotSerializableException:
> org.apache.flink.metrics.SimpleCounter
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>
> [info]   at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> [info]   at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>
> [info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>
> [info]   at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>
> [info]   at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>
> [info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>
> [info]   at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>
> [info]   at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>
> [info]   at org.apache.flink.streaming.api.datastream.DataStream.
> clean(DataStream.java:183)
>
> [info]   ...
>
>
>
>
>

Reply via email to