[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941882#comment-15941882 ]
Luke Hutchison commented on FLINK-6115: --------------------------------------- [~greghogan] versioning of durable binary formats is not only standard practice, it is important. Yes, allowing null values in Java was a big mistake, but option types didn't exist back when nullable references were added to Java. And an {{Option}} or {{Maybe}} type was the first thing I went looking for in Flink to address the non-nullable tuple field issue. Flink definitely needs an {{Option}} type until Java supports its own (just as having its own {{Tuple}} types is critical to the usefulness of Flink). (For that matter, {{flatMap}} should work over {{Option}} types, treating them as collected lists of length 0 or 1, I went looking for that too...) However, the fact that null pointers (and general lack of strong nullability analysis) have caused no manner of pain to users of Java doesn't mean that they are not crucial to the way that the language works today, or to how programmers tend to use it. Even Flink uses null values the way they're generally used in Java in place of {{Option}} types -- e.g. giving you null values on an outer join when there is no corresponding key in one dataset. Yes, throwing a NPE in tuple constructors misses the manual setting of field values, I mentioned that in an earlier comment. However, it actually highly surprised me when I noticed that the tuple fields were non-final, based on one of the first things I read in the Flink documentation: "Flink has the special classes {{DataSet}} and {{DataStream}} to represent data in a program. You can think of them as immutable collections of data that can contain duplicates." If the collections and streams that contain tuples are supposed to be thought of as immutable, why should the individual elements of those collections and streams be mutable? Perhaps tuple field values should be made final (which would of course be a breaking change for some users, and would probably especially require changes internally in the aggregation operator code). Setting aside the issue of null fields in tuples, this will surely not be the last time that the serialization format will need to change! What if, for example, Flink needs to add support for some future new Java type, or needs to support another JVM language that requires some extra metadata of some form to be stored along with its serialized objects? I strongly recommend versioning the checkpoint files. > Need more helpful error message when trying to serialize a tuple with a null > field > ---------------------------------------------------------------------------------- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)