Hi, Yik San

You need to change the following line:
>>>>  protected final val LOG = LoggerFactory.getLogger(getClass)
protected *static* final val LOG = LoggerFactory.getLogger(getClass)

Best,
Guowei


On Mon, May 24, 2021 at 2:41 PM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> Hi community,
>
> I have a job that consumes data from a datagen source, tries to log
> something in `map` operators, and sinks the result to a DiscardingSink. The
> full example can be found in [the repo](
> https://github.com/YikSanChan/log-in-flink-operator).
>
> The `Job` extends `BaseJob` where `preprocess` and `process` methods are
> defined.
>
> BaseJob.scala
> ```scala
>
> import org.apache.flink.streaming.api.datastream.DataStreamSink
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink
> import org.apache.flink.streaming.api.scala.DataStream
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.api.scala._
> import org.slf4j.LoggerFactory
>
> trait BaseJob {
>   protected final val LOG = LoggerFactory.getLogger(getClass)
>
>   def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]
>
>   def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
>     stream
>       .map { a =>
>         // This LOG line throws Task not serializable
>         // Commenting out the LOG line, then the LOG line in Job.scala works 
> just fine
>         LOG.info("[BaseJob] a = " + a)
>         a
>       }
>       .addSink(new DiscardingSink[AnyRef])
>   }
> }
>
> ```
>
> Job.scala
> ```scala
>
> import org.apache.flink.api.common.JobExecutionResult
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.EnvironmentSettings
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>
> object Job extends BaseJob {
>
>   private val CreateSource =
>     """
>       |CREATE TABLE source (
>       |  a int
>       |) WITH (
>       |  'connector' = 'datagen',
>       |  'rows-per-second' = '5'
>       |)
>       |""".stripMargin
>
>   private def run(): JobExecutionResult = {
>     val settings = EnvironmentSettings.newInstance.build
>     val execEnv: StreamExecutionEnvironment =
>       StreamExecutionEnvironment.getExecutionEnvironment
>     val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>     val stream = preprocess(tableEnv)
>     process(stream)
>     execEnv.execute("Streaming")
>   }
>
>   override def preprocess(tableEnv: StreamTableEnvironment): 
> DataStream[AnyRef] = {
>     tableEnv.executeSql(CreateSource)
>     val table = tableEnv.sqlQuery("SELECT a FROM source")
>     tableEnv
>       .toDataStream(table)
>       .map {row =>
>         val a = row.getField("a")
>         // This LOG line works just fine!
>         LOG.info("[Job] a = " + a)
>         a
>       }
>   }
>
>   def main(args: Array[String]): Unit = {
>     run()
>   }
> }
>
> ```
>
> It is very odd that, the LOG line in Job.scala works just fine, while the
> LOG line in BaseJob.scala complains that:
>
> ```
>
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>    at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>    at 
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>    at 
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>    at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
>    at 
> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
>    at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
>    at BaseJob$class.process(BaseJob.scala:15)
>    at Job$.process(Job.scala:7)
>    at Job$.run(Job.scala:25)
>    at Job$.main(Job.scala:42)
>    at Job.main(Job.scala)
> Caused by: java.io.NotSerializableException: Job$
>    at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
>    at 
> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
>    at 
> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
>    at 
> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
>    at 
> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
>    at 
> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
>    at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>    at 
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
>    ... 10 more
>
> ```
>
> I wonder why, and how to resolve this issue, as I do want to LOG in the
> BaseJob? Thanks!
>
> Best,
> Yik San
>

Reply via email to