Hi Guowei, Thanks for pointing that out! It helps me resolve the issue.
Just a small correction: `static` identifier is not available in Scala. Its Scala alternative is `object`. ```scala object BaseJob { final val LOG = LoggerFactory.getLogger(getClass) } ``` Then referencing the LOG object whenever I want to log. This solves my problem. Thank you! Best, Yik San On Mon, May 24, 2021 at 3:23 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Yik San > > You need to change the following line: > >>>> protected final val LOG = LoggerFactory.getLogger(getClass) > protected *static* final val LOG = LoggerFactory.getLogger(getClass) > > Best, > Guowei > > > On Mon, May 24, 2021 at 2:41 PM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> Hi community, >> >> I have a job that consumes data from a datagen source, tries to log >> something in `map` operators, and sinks the result to a DiscardingSink. The >> full example can be found in [the repo]( >> https://github.com/YikSanChan/log-in-flink-operator). >> >> The `Job` extends `BaseJob` where `preprocess` and `process` methods are >> defined. >> >> BaseJob.scala >> ```scala >> >> import org.apache.flink.streaming.api.datastream.DataStreamSink >> import org.apache.flink.streaming.api.functions.sink.DiscardingSink >> import org.apache.flink.streaming.api.scala.DataStream >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment >> import org.apache.flink.api.scala._ >> import org.slf4j.LoggerFactory >> >> trait BaseJob { >> protected final val LOG = LoggerFactory.getLogger(getClass) >> >> def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] >> >> def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = { >> stream >> .map { a => >> // This LOG line throws Task not serializable >> // Commenting out the LOG line, then the LOG line in Job.scala works >> just fine >> LOG.info("[BaseJob] a = " + a) >> a >> } >> .addSink(new DiscardingSink[AnyRef]) >> } >> } >> >> ``` >> >> Job.scala >> ```scala >> >> import org.apache.flink.api.common.JobExecutionResult >> import org.apache.flink.api.scala._ >> import org.apache.flink.streaming.api.scala.{DataStream, >> StreamExecutionEnvironment} >> import org.apache.flink.table.api.EnvironmentSettings >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment >> >> object Job extends BaseJob { >> >> private val CreateSource = >> """ >> |CREATE TABLE source ( >> | a int >> |) WITH ( >> | 'connector' = 'datagen', >> | 'rows-per-second' = '5' >> |) >> |""".stripMargin >> >> private def run(): JobExecutionResult = { >> val settings = EnvironmentSettings.newInstance.build >> val execEnv: StreamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment >> val tableEnv = StreamTableEnvironment.create(execEnv, settings) >> val stream = preprocess(tableEnv) >> process(stream) >> execEnv.execute("Streaming") >> } >> >> override def preprocess(tableEnv: StreamTableEnvironment): >> DataStream[AnyRef] = { >> tableEnv.executeSql(CreateSource) >> val table = tableEnv.sqlQuery("SELECT a FROM source") >> tableEnv >> .toDataStream(table) >> .map {row => >> val a = row.getField("a") >> // This LOG line works just fine! >> LOG.info("[Job] a = " + a) >> a >> } >> } >> >> def main(args: Array[String]): Unit = { >> run() >> } >> } >> >> ``` >> >> It is very odd that, the LOG line in Job.scala works just fine, while the >> LOG line in BaseJob.scala complains that: >> >> ``` >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: Task not serializable >> at >> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >> at >> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >> at >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914) >> at >> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189) >> at >> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623) >> at BaseJob$class.process(BaseJob.scala:15) >> at Job$.process(Job.scala:7) >> at Job$.run(Job.scala:25) >> at Job$.main(Job.scala:42) >> at Job.main(Job.scala) >> Caused by: java.io.NotSerializableException: Job$ >> at >> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193) >> at >> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579) >> at >> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536) >> at >> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444) >> at >> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187) >> at >> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) >> at >> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406) >> ... 10 more >> >> ``` >> >> I wonder why, and how to resolve this issue, as I do want to LOG in the >> BaseJob? Thanks! >> >> Best, >> Yik San >> >