Hi community, I have a job that consumes data from a datagen source, tries to log something in `map` operators, and sinks the result to a DiscardingSink. The full example can be found in [the repo]( https://github.com/YikSanChan/log-in-flink-operator).
The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined. BaseJob.scala ```scala import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.functions.sink.DiscardingSink import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.scala._ import org.slf4j.LoggerFactory trait BaseJob { protected final val LOG = LoggerFactory.getLogger(getClass) def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = { stream .map { a => // This LOG line throws Task not serializable // Commenting out the LOG line, then the LOG line in Job.scala works just fine LOG.info("[BaseJob] a = " + a) a } .addSink(new DiscardingSink[AnyRef]) } } ``` Job.scala ```scala import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object Job extends BaseJob { private val CreateSource = """ |CREATE TABLE source ( | a int |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '5' |) |""".stripMargin private def run(): JobExecutionResult = { val settings = EnvironmentSettings.newInstance.build val execEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(execEnv, settings) val stream = preprocess(tableEnv) process(stream) execEnv.execute("Streaming") } override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = { tableEnv.executeSql(CreateSource) val table = tableEnv.sqlQuery("SELECT a FROM source") tableEnv .toDataStream(table) .map {row => val a = row.getField("a") // This LOG line works just fine! LOG.info("[Job] a = " + a) a } } def main(args: Array[String]): Unit = { run() } } ``` It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that: ``` Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914) at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189) at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623) at BaseJob$class.process(BaseJob.scala:15) at Job$.process(Job.scala:7) at Job$.run(Job.scala:25) at Job$.main(Job.scala:42) at Job.main(Job.scala) Caused by: java.io.NotSerializableException: Job$ at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187) at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406) ... 10 more ``` I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks! Best, Yik San