Hi community,

I have a job that consumes data from a datagen source, tries to log
something in `map` operators, and sinks the result to a DiscardingSink. The
full example can be found in [the repo](
https://github.com/YikSanChan/log-in-flink-operator).

The `Job` extends `BaseJob` where `preprocess` and `process` methods are
defined.

BaseJob.scala
```scala

import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory

trait BaseJob {
  protected final val LOG = LoggerFactory.getLogger(getClass)

  def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]

  def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
    stream
      .map { a =>
        // This LOG line throws Task not serializable
        // Commenting out the LOG line, then the LOG line in Job.scala
works just fine
        LOG.info("[BaseJob] a = " + a)
        a
      }
      .addSink(new DiscardingSink[AnyRef])
  }
}

```

Job.scala
```scala

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Job extends BaseJob {

  private val CreateSource =
    """
      |CREATE TABLE source (
      |  a int
      |) WITH (
      |  'connector' = 'datagen',
      |  'rows-per-second' = '5'
      |)
      |""".stripMargin

  private def run(): JobExecutionResult = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)
    val stream = preprocess(tableEnv)
    process(stream)
    execEnv.execute("Streaming")
  }

  override def preprocess(tableEnv: StreamTableEnvironment):
DataStream[AnyRef] = {
    tableEnv.executeSql(CreateSource)
    val table = tableEnv.sqlQuery("SELECT a FROM source")
    tableEnv
      .toDataStream(table)
      .map {row =>
        val a = row.getField("a")
        // This LOG line works just fine!
        LOG.info("[Job] a = " + a)
        a
      }
  }

  def main(args: Array[String]): Unit = {
    run()
  }
}

```

It is very odd that, the LOG line in Job.scala works just fine, while the
LOG line in BaseJob.scala complains that:

```

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Task not
serializable
   at 
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
   at 
org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
   at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
   at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
   at 
org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
   at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
   at BaseJob$class.process(BaseJob.scala:15)
   at Job$.process(Job.scala:7)
   at Job$.run(Job.scala:25)
   at Job$.main(Job.scala:42)
   at Job.main(Job.scala)
Caused by: java.io.NotSerializableException: Job$
   at 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
   at 
java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
   at 
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
   at 
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
   at 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
   at 
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
   at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
   at 
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
   ... 10 more

```

I wonder why, and how to resolve this issue, as I do want to LOG in the
BaseJob? Thanks!

Best,
Yik San

Reply via email to