Hi,

In our project, we are using the Flink Table API and are facing the
following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can
also have invalid entries in some of the fields which we replace with null
when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and
specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the
whole job fails. (We can see this by calling collect on the resulting
DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at
org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at
org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values?
(Similar to Flink-629 <https://issues.apache.org/jira/browse/FLINK-629> )

Currently, to overcome this issue, we are ignoring all the rows which may
have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table =
table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with
task.

Is this the right way to handle this? If not please let me know how to go
about it.


Thanks,
Shiti

Reply via email to