I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = getRuntimeContext.
getMetricGroup.counter("successfulParse")
   lazy val failedCounter = getRuntimeContext.
getMetricGroup.counter("failedParse")
   lazy val errorCounter = getRuntimeContext.
getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have
no issues with using a single counter. However with multiple counters I get
a serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction,
or what I'm doing wrong?

[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:27)
[info]   at ParsedResultUnwrapperTest$$anonfun$2.apply(
ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a
Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   org.apache.flink.api.common.InvalidProgramException: The
implementation of the RichMapFunction is not serializable. The object
probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:100)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.map(
DataStream.java:527)
[info]   at org.apache.flink.streaming.api.scala.DataStream.map(
DataStream.scala:581)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:37)
[info]   at ParsedResultUnwrapperTest$$anonfun$3.apply(
ParsedResultUnwrapperTest.scala:32)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.apache.flink.metrics.
SimpleCounter
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1184)
[info]   at java.io.ObjectOutputStream.defaultWriteFields(
ObjectOutputStream.java:1548)
[info]   at java.io.ObjectOutputStream.writeSerialData(
ObjectOutputStream.java:1509)
[info]   at java.io.ObjectOutputStream.writeOrdinaryObject(
ObjectOutputStream.java:1432)
[info]   at java.io.ObjectOutputStream.writeObject0(
ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(
ObjectOutputStream.java:348)
[info]   at org.apache.flink.util.InstantiationUtil.serializeObject(
InstantiationUtil.java:315)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(
ClosureCleaner.java:81)
[info]   at org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.
clean(DataStream.java:183)
[info]   ...

Reply via email to