Hi, In our project, we are using the Flink Table API and are facing the following issues,
We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row]. This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed. When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet). The error message is similar to, Job execution failed. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NullPointerException at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80) at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:724) Could this be because the RowSerializer does not support null values? (Similar to Flink-629 <https://issues.apache.org/jira/browse/FLINK-629> ) Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as, def cleanData(table:Table, relevantColumns:Seq[String]):Table = { val whereClause: String = relevantColumns.map{ cName=> s"$cName.isNotNull" }.mkString(" && ") val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause) result } Before operating on any Table, we use this method and then continue with task. Is this the right way to handle this? If not please let me know how to go about it. Thanks, Shiti