Hi Guowei,

Thanks for pointing that out! It helps me resolve the issue.

Just a small correction: `static` identifier is not available in Scala. Its
Scala alternative is `object`.

```scala

object BaseJob {
  final val LOG = LoggerFactory.getLogger(getClass)
}

```

Then referencing the LOG object whenever I want to log. This solves my
problem.

Thank you!

Best,
Yik San

On Mon, May 24, 2021 at 3:23 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Yik San
>
> You need to change the following line:
> >>>>  protected final val LOG = LoggerFactory.getLogger(getClass)
> protected *static* final val LOG = LoggerFactory.getLogger(getClass)
>
> Best,
> Guowei
>
>
> On Mon, May 24, 2021 at 2:41 PM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> Hi community,
>>
>> I have a job that consumes data from a datagen source, tries to log
>> something in `map` operators, and sinks the result to a DiscardingSink. The
>> full example can be found in [the repo](
>> https://github.com/YikSanChan/log-in-flink-operator).
>>
>> The `Job` extends `BaseJob` where `preprocess` and `process` methods are
>> defined.
>>
>> BaseJob.scala
>> ```scala
>>
>> import org.apache.flink.streaming.api.datastream.DataStreamSink
>> import org.apache.flink.streaming.api.functions.sink.DiscardingSink
>> import org.apache.flink.streaming.api.scala.DataStream
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.api.scala._
>> import org.slf4j.LoggerFactory
>>
>> trait BaseJob {
>>   protected final val LOG = LoggerFactory.getLogger(getClass)
>>
>>   def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]
>>
>>   def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
>>     stream
>>       .map { a =>
>>         // This LOG line throws Task not serializable
>>         // Commenting out the LOG line, then the LOG line in Job.scala works 
>> just fine
>>         LOG.info("[BaseJob] a = " + a)
>>         a
>>       }
>>       .addSink(new DiscardingSink[AnyRef])
>>   }
>> }
>>
>> ```
>>
>> Job.scala
>> ```scala
>>
>> import org.apache.flink.api.common.JobExecutionResult
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.scala.{DataStream, 
>> StreamExecutionEnvironment}
>> import org.apache.flink.table.api.EnvironmentSettings
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>>
>> object Job extends BaseJob {
>>
>>   private val CreateSource =
>>     """
>>       |CREATE TABLE source (
>>       |  a int
>>       |) WITH (
>>       |  'connector' = 'datagen',
>>       |  'rows-per-second' = '5'
>>       |)
>>       |""".stripMargin
>>
>>   private def run(): JobExecutionResult = {
>>     val settings = EnvironmentSettings.newInstance.build
>>     val execEnv: StreamExecutionEnvironment =
>>       StreamExecutionEnvironment.getExecutionEnvironment
>>     val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>>     val stream = preprocess(tableEnv)
>>     process(stream)
>>     execEnv.execute("Streaming")
>>   }
>>
>>   override def preprocess(tableEnv: StreamTableEnvironment): 
>> DataStream[AnyRef] = {
>>     tableEnv.executeSql(CreateSource)
>>     val table = tableEnv.sqlQuery("SELECT a FROM source")
>>     tableEnv
>>       .toDataStream(table)
>>       .map {row =>
>>         val a = row.getField("a")
>>         // This LOG line works just fine!
>>         LOG.info("[Job] a = " + a)
>>         a
>>       }
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>>     run()
>>   }
>> }
>>
>> ```
>>
>> It is very odd that, the LOG line in Job.scala works just fine, while the
>> LOG line in BaseJob.scala complains that:
>>
>> ```
>>
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>    at 
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>>    at 
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>>    at 
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>>    at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
>>    at 
>> org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
>>    at 
>> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
>>    at BaseJob$class.process(BaseJob.scala:15)
>>    at Job$.process(Job.scala:7)
>>    at Job$.run(Job.scala:25)
>>    at Job$.main(Job.scala:42)
>>    at Job.main(Job.scala)
>> Caused by: java.io.NotSerializableException: Job$
>>    at 
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
>>    at 
>> java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
>>    at 
>> java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
>>    at 
>> java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
>>    at 
>> java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
>>    at 
>> java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
>>    at 
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>>    at 
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
>>    ... 10 more
>>
>> ```
>>
>> I wonder why, and how to resolve this issue, as I do want to LOG in the
>> BaseJob? Thanks!
>>
>> Best,
>> Yik San
>>
>

Reply via email to