I've created a RichMapFunction in scala with multiple counters like: lazy val successCounter = getRuntimeContext. getMetricGroup.counter("successfulParse") lazy val failedCounter = getRuntimeContext. getMetricGroup.counter("failedParse") lazy val errorCounter = getRuntimeContext. getMetricGroup.counter("errorParse")
which I increment in the map function. While testing I noticed that I have no issues with using a single counter. However with multiple counters I get a serialization error using more than one counter. Does anyone know how I can use multiple counters from my RichMapFunction, or what I'm doing wrong? [info] org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields. [info] at org.apache.flink.api.java.ClosureCleaner.clean( ClosureCleaner.java:100) [info] at org.apache.flink.streaming.api.environment. StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream. clean(DataStream.java:183) [info] at org.apache.flink.streaming.api.datastream.DataStream.map( DataStream.java:527) [info] at org.apache.flink.streaming.api.scala.DataStream.map( DataStream.scala:581) [info] at ParsedResultUnwrapperTest$$anonfun$2.apply( ParsedResultUnwrapperTest.scala:27) [info] at ParsedResultUnwrapperTest$$anonfun$2.apply( ParsedResultUnwrapperTest.scala:23) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] ... [info] Cause: java.io.NotSerializableException: org.apache.flink.metrics. SimpleCounter [info] at java.io.ObjectOutputStream.writeObject0( ObjectOutputStream.java:1184) [info] at java.io.ObjectOutputStream.defaultWriteFields( ObjectOutputStream.java:1548) [info] at java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1509) [info] at java.io.ObjectOutputStream.writeOrdinaryObject( ObjectOutputStream.java:1432) [info] at java.io.ObjectOutputStream.writeObject0( ObjectOutputStream.java:1178) [info] at java.io.ObjectOutputStream.writeObject( ObjectOutputStream.java:348) [info] at org.apache.flink.util.InstantiationUtil.serializeObject( InstantiationUtil.java:315) [info] at org.apache.flink.api.java.ClosureCleaner.clean( ClosureCleaner.java:81) [info] at org.apache.flink.streaming.api.environment. StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream. clean(DataStream.java:183) [info] ... [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a Error -> ParseResult[LineProtocol] *** FAILED *** [info] org.apache.flink.api.common.InvalidProgramException: The implementation of the RichMapFunction is not serializable. The object probably contains or references non serializable fields. [info] at org.apache.flink.api.java.ClosureCleaner.clean( ClosureCleaner.java:100) [info] at org.apache.flink.streaming.api.environment. StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream. clean(DataStream.java:183) [info] at org.apache.flink.streaming.api.datastream.DataStream.map( DataStream.java:527) [info] at org.apache.flink.streaming.api.scala.DataStream.map( DataStream.scala:581) [info] at ParsedResultUnwrapperTest$$anonfun$3.apply( ParsedResultUnwrapperTest.scala:37) [info] at ParsedResultUnwrapperTest$$anonfun$3.apply( ParsedResultUnwrapperTest.scala:32) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] ... [info] Cause: java.io.NotSerializableException: org.apache.flink.metrics. SimpleCounter [info] at java.io.ObjectOutputStream.writeObject0( ObjectOutputStream.java:1184) [info] at java.io.ObjectOutputStream.defaultWriteFields( ObjectOutputStream.java:1548) [info] at java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1509) [info] at java.io.ObjectOutputStream.writeOrdinaryObject( ObjectOutputStream.java:1432) [info] at java.io.ObjectOutputStream.writeObject0( ObjectOutputStream.java:1178) [info] at java.io.ObjectOutputStream.writeObject( ObjectOutputStream.java:348) [info] at org.apache.flink.util.InstantiationUtil.serializeObject( InstantiationUtil.java:315) [info] at org.apache.flink.api.java.ClosureCleaner.clean( ClosureCleaner.java:81) [info] at org.apache.flink.streaming.api.environment. StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) [info] at org.apache.flink.streaming.api.datastream.DataStream. clean(DataStream.java:183) [info] ...