Hi, Yik San You need to change the following line: >>>> protected final val LOG = LoggerFactory.getLogger(getClass) protected *static* final val LOG = LoggerFactory.getLogger(getClass)
Best, Guowei On Mon, May 24, 2021 at 2:41 PM Yik San Chan <evan.chanyik...@gmail.com> wrote: > Hi community, > > I have a job that consumes data from a datagen source, tries to log > something in `map` operators, and sinks the result to a DiscardingSink. The > full example can be found in [the repo]( > https://github.com/YikSanChan/log-in-flink-operator). > > The `Job` extends `BaseJob` where `preprocess` and `process` methods are > defined. > > BaseJob.scala > ```scala > > import org.apache.flink.streaming.api.datastream.DataStreamSink > import org.apache.flink.streaming.api.functions.sink.DiscardingSink > import org.apache.flink.streaming.api.scala.DataStream > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.api.scala._ > import org.slf4j.LoggerFactory > > trait BaseJob { > protected final val LOG = LoggerFactory.getLogger(getClass) > > def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] > > def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = { > stream > .map { a => > // This LOG line throws Task not serializable > // Commenting out the LOG line, then the LOG line in Job.scala works > just fine > LOG.info("[BaseJob] a = " + a) > a > } > .addSink(new DiscardingSink[AnyRef]) > } > } > > ``` > > Job.scala > ```scala > > import org.apache.flink.api.common.JobExecutionResult > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.EnvironmentSettings > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > > object Job extends BaseJob { > > private val CreateSource = > """ > |CREATE TABLE source ( > | a int > |) WITH ( > | 'connector' = 'datagen', > | 'rows-per-second' = '5' > |) > |""".stripMargin > > private def run(): JobExecutionResult = { > val settings = EnvironmentSettings.newInstance.build > val execEnv: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(execEnv, settings) > val stream = preprocess(tableEnv) > process(stream) > execEnv.execute("Streaming") > } > > override def preprocess(tableEnv: StreamTableEnvironment): > DataStream[AnyRef] = { > tableEnv.executeSql(CreateSource) > val table = tableEnv.sqlQuery("SELECT a FROM source") > tableEnv > .toDataStream(table) > .map {row => > val a = row.getField("a") > // This LOG line works just fine! > LOG.info("[Job] a = " + a) > a > } > } > > def main(args: Array[String]): Unit = { > run() > } > } > > ``` > > It is very odd that, the LOG line in Job.scala works just fine, while the > LOG line in BaseJob.scala complains that: > > ``` > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Task not serializable > at > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) > at > org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) > at > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914) > at > org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189) > at > org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623) > at BaseJob$class.process(BaseJob.scala:15) > at Job$.process(Job.scala:7) > at Job$.run(Job.scala:25) > at Job$.main(Job.scala:42) > at Job.main(Job.scala) > Caused by: java.io.NotSerializableException: Job$ > at > java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193) > at > java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579) > at > java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536) > at > java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444) > at > java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187) > at > java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) > at > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406) > ... 10 more > > ``` > > I wonder why, and how to resolve this issue, as I do want to LOG in the > BaseJob? Thanks! > > Best, > Yik San >